diff --git a/nilmrun/__init__.py b/nilmrun/__init__.py index fe5308a..0e72cb5 100644 --- a/nilmrun/__init__.py +++ b/nilmrun/__init__.py @@ -1,4 +1,4 @@ -import nilmrun.threadmanager +import nilmrun.processmanager from ._version import get_versions __version__ = get_versions()['version'] diff --git a/nilmrun/processmanager.py b/nilmrun/processmanager.py new file mode 100644 index 0000000..5d3c2af --- /dev/null +++ b/nilmrun/processmanager.py @@ -0,0 +1,143 @@ +#!/usr/bin/python + +from nilmdb.utils.printf import * + +import threading +import multiprocessing +import cStringIO +import sys +import os +import time +import uuid +import traceback + +class LogReceiver(object): + """Spawn a thread that listens to a pipe for log messages, + and stores them locally.""" + def __init__(self, pipe): + self.pipe = pipe + self.log = cStringIO.StringIO() + self.thread = threading.Thread(target = self.run) + self.thread.start() + + def run(self): + try: + while True: + data = self.pipe.recv_bytes() + self.log.write(data) + except EOFError: + print "thread pipe died" + self.pipe.close() + print "thread closed" + time.sleep(1) + return + + def getvalue(self): + return self.log.getvalue() + + def clear(self): + self.log.truncate() + self.log.seek(0) + +class LogSender(object): + """File-like object that writes output to a pipe""" + def __init__(self, pipe): + self.pipe = pipe + + def close(self): + if self.pipe: + self.pipe.close() + self.pipe = None + + def write(self, data): + if self.pipe: + self.pipe.send_bytes(data) + + def flush(self, data): + pass + + def isatty(self): + return False + +class Process(object): + """Spawn and manage a running process""" + def __init__(self, name, function, parameters): + self.parameters = parameters + self.start_time = None + self.name = name + pipes = multiprocessing.Pipe(duplex = False) + self._log = LogReceiver(pipes[0]) + self._process = multiprocessing.Process(target = self._tramp, + args = (function, pipes), + name = name) + self._process.daemon = True + self._process.start() + pipes[1].close() + self.start_time = time.time() + self.pid = str(uuid.uuid1(self._process.pid or 0)) + + def _tramp(self, function, pipes): + # Remap stdio in the child before calling function + print "pid:", os.getpid() + print "pipes:", pipes + print "pipes[0].fileno():", pipes[0].fileno() + print "pipes[1].fileno():", pipes[1].fileno() + pipes[0].close() + logfile = LogSender(pipes[1]) + + saved_stdin = sys.stdin + saved_stdout = sys.stdout + saved_stderr = sys.stderr + sys.stdin = open(os.devnull, 'r') + sys.stdout = logfile + sys.stderr = logfile + + exitcode = 0 + try: + function(self.parameters) + raise Exception("oh no") + except: + traceback.print_exc() + exitcode = 1 + sys.stdin = saved_stdin + sys.stdout = saved_stdout + sys.stderr = saved_stderr + logfile.close() + sys.exit(exitcode) + + def terminate(self, timeout = 3): + self._process.join(timeout) + if self.alive: + self._process.terminate() + + def clear_log(self): + self._log.clear() + + @property + def log(self): + return self._log.getvalue() + + @property + def alive(self): + return self._process.is_alive() + + @property + def error(self): + if self._process.exitcode: + return True + return False + +class ProcessManager(object): + def __init__(self): + self.processes = {} + + def __iter__(self): + return iter(self.processes.keys()) + + def __getitem__(self, key): + return self.processes[key] + + def run(self, name, function, parameters): + new = Process(name, function, parameters) + self.processes[new.pid] = new + return new.pid diff --git a/nilmrun/server.py b/nilmrun/server.py index 2f115cf..493e4e9 100755 --- a/nilmrun/server.py +++ b/nilmrun/server.py @@ -55,50 +55,51 @@ class App(object): def version(self): return nilmrun.__version__ -class AppThread(object): +class AppProcess(object): def __init__(self, manager): self.manager = manager - def thread_status(self, pid): + def process_status(self, pid): return { "pid": pid, "alive": self.manager[pid].alive, + "error": self.manager[pid].error, "name": self.manager[pid].name, "start_time": self.manager[pid].start_time, "parameters": self.manager[pid].parameters, "log": self.manager[pid].log, } - # /thread/status + # /process/status @cherrypy.expose @cherrypy.tools.json_out() def status(self, pid, clear = False): if pid not in self.manager: - raise cherrypy.NotFound() - status = thread_status(pid) + raise cherrypy.HTTPError("404 Not Found", "No such PID") + status = self.process_status(pid) if clear: self.manager[pid].clear_log() return status - # /thread/list + # /process/list @cherrypy.expose @cherrypy.tools.json_out() def list(self): return list(self.manager) - # /thread/kill + # /process/kill @cherrypy.expose @cherrypy.tools.json_in() @cherrypy.tools.json_out() @cherrypy.tools.CORS_allow(methods = ["POST"]) def kill(self, pid): if pid not in self.manager: - raise CherryPy.NotFound() + raise cherrypy.HTTPError("404 Not Found", "No such PID") if not self.manager.terminate(pid): raise cherrypy.HTTPError("503 Service Unavailable", - "Failed to stop thread") - status = thread_status(pid) + "Failed to stop process") + status = self.process_status(pid) manager.remove(pid) class AppFilter(object): @@ -115,6 +116,7 @@ class AppFilter(object): def trainola(self, data): return self.manager.run("trainola", nilmrun.trainola.trainola, data) + class Server(object): def __init__(self, host = '127.0.0.1', port = 8080, embedded = True, # hide diagnostics and output, etc @@ -165,9 +167,9 @@ class Server(object): cherrypy._cperror._ie_friendly_error_sizes = {} # Build up the application and mount it - manager = nilmrun.threadmanager.ThreadManager() + manager = nilmrun.processmanager.ProcessManager() root = App() - root.thread = AppThread(manager) + root.process = AppProcess(manager) root.filter = AppFilter(manager) cherrypy.tree.apps = {} cherrypy.tree.mount(root, basepath, config = { "/" : app_config }) diff --git a/nilmrun/threadmanager.py b/nilmrun/threadmanager.py deleted file mode 100644 index 1cc7f1d..0000000 --- a/nilmrun/threadmanager.py +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/python - -from nilmdb.utils.printf import * - -import threading - -class Thread(object): - def __init__(self, name, function, parameters): - self.parameters = parameters - self.start_time = None - self.log = '' - self._terminate = threading.Event() - self._thread = threading.Thread(target = function, - name = name, - args = ( parameters, - self._terminate )) - self._thread.daemon = True - self._thread.start() - - def terminate(self, timeout = 60): - self._terminate.set() - self._thread.join(timeout = 60) - - @property - def pid(self): - return self._thread.ident - - @property - def name(self): - return self._thread.name - - @property - def alive(self): - return self._thread.is_alive() - -class ThreadManager(object): - def __init__(self): - self.threads = {} - - def __iter__(self): - return iter(self.threads.keys()) - - def run(self, name, function, parameters): - new = Thread(name, function, parameters) - self.threads[new.pid] = new - return new.pid diff --git a/setup.cfg b/setup.cfg index c8fbcb4..4bc75cb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -7,7 +7,7 @@ test = nosetests nocapture=1 # Comment this out to see CherryPy logs on failure: nologcapture=1 -with-coverage=1 +#with-coverage=1 cover-inclusive=1 cover-package=nilmrun cover-erase=1 diff --git a/tests/test_client.py b/tests/test_client.py index b8180db..8613e2e 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -24,6 +24,7 @@ import re import urllib2 from urllib2 import urlopen, HTTPError import requests +import pprint from testutil.helpers import * @@ -61,15 +62,17 @@ class TestClient(object): def test_client_02_manager(self): client = HTTPClient(baseurl = testurl) - eq_(client.get("/thread/list"), []) + eq_(client.get("/process/list"), []) + with assert_raises(ClientError) as e: + client.get("/process/status", { "pid": 12345 }) + in_("No such PID", str(e.exception)) with assert_raises(ClientError): - client.get("/thread/status", { "pid": 12345 }) - with assert_raises(ClientError): - client.get("/thread/kill", { "pid": 12345 }) + client.get("/process/kill", { "pid": 12345 }) + in_("No such PID", str(e.exception)) def test_client_03_trainola(self): - client = HTTPClient(baseurl = testurl) + client = HTTPClient(baseurl = testurl, post_json = True) data = { "url": "http://bucket.mit.edu/nilmdb", "stream": "/sharon/prep-a", @@ -100,4 +103,14 @@ class TestClient(object): ] } client.post("/filter/trainola", { "data": data }) - + pid = client.get("/process/list")[0] + eq_(client.get("/process/list"), [pid]) + + while True: + time.sleep(1) + status = client.get("/process/status", { "pid": pid }) + pprint.pprint(status) + print status["alive"] + if status["alive"] == False: + break + pprint.pprint(status)