From f73de35ee655224d96629ca0c485a3bb904dda12 Mon Sep 17 00:00:00 2001 From: Jim Paris Date: Fri, 5 Jul 2013 19:35:09 -0400 Subject: [PATCH] Work in progress on the process manager. Very finicky. The multiprocessing environment is really finicky. I'm seeing deadlocks in the process at exit, that are probably related to tracebacks being printed and still getting redirected to the logfile or something. --- nilmrun/__init__.py | 2 +- nilmrun/processmanager.py | 143 ++++++++++++++++++++++++++++++++++++++ nilmrun/server.py | 26 +++---- nilmrun/threadmanager.py | 46 ------------ setup.cfg | 2 +- tests/test_client.py | 25 +++++-- 6 files changed, 178 insertions(+), 66 deletions(-) create mode 100644 nilmrun/processmanager.py delete mode 100644 nilmrun/threadmanager.py diff --git a/nilmrun/__init__.py b/nilmrun/__init__.py index fe5308a..0e72cb5 100644 --- a/nilmrun/__init__.py +++ b/nilmrun/__init__.py @@ -1,4 +1,4 @@ -import nilmrun.threadmanager +import nilmrun.processmanager from ._version import get_versions __version__ = get_versions()['version'] diff --git a/nilmrun/processmanager.py b/nilmrun/processmanager.py new file mode 100644 index 0000000..5d3c2af --- /dev/null +++ b/nilmrun/processmanager.py @@ -0,0 +1,143 @@ +#!/usr/bin/python + +from nilmdb.utils.printf import * + +import threading +import multiprocessing +import cStringIO +import sys +import os +import time +import uuid +import traceback + +class LogReceiver(object): + """Spawn a thread that listens to a pipe for log messages, + and stores them locally.""" + def __init__(self, pipe): + self.pipe = pipe + self.log = cStringIO.StringIO() + self.thread = threading.Thread(target = self.run) + self.thread.start() + + def run(self): + try: + while True: + data = self.pipe.recv_bytes() + self.log.write(data) + except EOFError: + print "thread pipe died" + self.pipe.close() + print "thread closed" + time.sleep(1) + return + + def getvalue(self): + return self.log.getvalue() + + def clear(self): + self.log.truncate() + self.log.seek(0) + +class LogSender(object): + """File-like object that writes output to a pipe""" + def __init__(self, pipe): + self.pipe = pipe + + def close(self): + if self.pipe: + self.pipe.close() + self.pipe = None + + def write(self, data): + if self.pipe: + self.pipe.send_bytes(data) + + def flush(self, data): + pass + + def isatty(self): + return False + +class Process(object): + """Spawn and manage a running process""" + def __init__(self, name, function, parameters): + self.parameters = parameters + self.start_time = None + self.name = name + pipes = multiprocessing.Pipe(duplex = False) + self._log = LogReceiver(pipes[0]) + self._process = multiprocessing.Process(target = self._tramp, + args = (function, pipes), + name = name) + self._process.daemon = True + self._process.start() + pipes[1].close() + self.start_time = time.time() + self.pid = str(uuid.uuid1(self._process.pid or 0)) + + def _tramp(self, function, pipes): + # Remap stdio in the child before calling function + print "pid:", os.getpid() + print "pipes:", pipes + print "pipes[0].fileno():", pipes[0].fileno() + print "pipes[1].fileno():", pipes[1].fileno() + pipes[0].close() + logfile = LogSender(pipes[1]) + + saved_stdin = sys.stdin + saved_stdout = sys.stdout + saved_stderr = sys.stderr + sys.stdin = open(os.devnull, 'r') + sys.stdout = logfile + sys.stderr = logfile + + exitcode = 0 + try: + function(self.parameters) + raise Exception("oh no") + except: + traceback.print_exc() + exitcode = 1 + sys.stdin = saved_stdin + sys.stdout = saved_stdout + sys.stderr = saved_stderr + logfile.close() + sys.exit(exitcode) + + def terminate(self, timeout = 3): + self._process.join(timeout) + if self.alive: + self._process.terminate() + + def clear_log(self): + self._log.clear() + + @property + def log(self): + return self._log.getvalue() + + @property + def alive(self): + return self._process.is_alive() + + @property + def error(self): + if self._process.exitcode: + return True + return False + +class ProcessManager(object): + def __init__(self): + self.processes = {} + + def __iter__(self): + return iter(self.processes.keys()) + + def __getitem__(self, key): + return self.processes[key] + + def run(self, name, function, parameters): + new = Process(name, function, parameters) + self.processes[new.pid] = new + return new.pid diff --git a/nilmrun/server.py b/nilmrun/server.py index 2f115cf..493e4e9 100755 --- a/nilmrun/server.py +++ b/nilmrun/server.py @@ -55,50 +55,51 @@ class App(object): def version(self): return nilmrun.__version__ -class AppThread(object): +class AppProcess(object): def __init__(self, manager): self.manager = manager - def thread_status(self, pid): + def process_status(self, pid): return { "pid": pid, "alive": self.manager[pid].alive, + "error": self.manager[pid].error, "name": self.manager[pid].name, "start_time": self.manager[pid].start_time, "parameters": self.manager[pid].parameters, "log": self.manager[pid].log, } - # /thread/status + # /process/status @cherrypy.expose @cherrypy.tools.json_out() def status(self, pid, clear = False): if pid not in self.manager: - raise cherrypy.NotFound() - status = thread_status(pid) + raise cherrypy.HTTPError("404 Not Found", "No such PID") + status = self.process_status(pid) if clear: self.manager[pid].clear_log() return status - # /thread/list + # /process/list @cherrypy.expose @cherrypy.tools.json_out() def list(self): return list(self.manager) - # /thread/kill + # /process/kill @cherrypy.expose @cherrypy.tools.json_in() @cherrypy.tools.json_out() @cherrypy.tools.CORS_allow(methods = ["POST"]) def kill(self, pid): if pid not in self.manager: - raise CherryPy.NotFound() + raise cherrypy.HTTPError("404 Not Found", "No such PID") if not self.manager.terminate(pid): raise cherrypy.HTTPError("503 Service Unavailable", - "Failed to stop thread") - status = thread_status(pid) + "Failed to stop process") + status = self.process_status(pid) manager.remove(pid) class AppFilter(object): @@ -115,6 +116,7 @@ class AppFilter(object): def trainola(self, data): return self.manager.run("trainola", nilmrun.trainola.trainola, data) + class Server(object): def __init__(self, host = '127.0.0.1', port = 8080, embedded = True, # hide diagnostics and output, etc @@ -165,9 +167,9 @@ class Server(object): cherrypy._cperror._ie_friendly_error_sizes = {} # Build up the application and mount it - manager = nilmrun.threadmanager.ThreadManager() + manager = nilmrun.processmanager.ProcessManager() root = App() - root.thread = AppThread(manager) + root.process = AppProcess(manager) root.filter = AppFilter(manager) cherrypy.tree.apps = {} cherrypy.tree.mount(root, basepath, config = { "/" : app_config }) diff --git a/nilmrun/threadmanager.py b/nilmrun/threadmanager.py deleted file mode 100644 index 1cc7f1d..0000000 --- a/nilmrun/threadmanager.py +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/python - -from nilmdb.utils.printf import * - -import threading - -class Thread(object): - def __init__(self, name, function, parameters): - self.parameters = parameters - self.start_time = None - self.log = '' - self._terminate = threading.Event() - self._thread = threading.Thread(target = function, - name = name, - args = ( parameters, - self._terminate )) - self._thread.daemon = True - self._thread.start() - - def terminate(self, timeout = 60): - self._terminate.set() - self._thread.join(timeout = 60) - - @property - def pid(self): - return self._thread.ident - - @property - def name(self): - return self._thread.name - - @property - def alive(self): - return self._thread.is_alive() - -class ThreadManager(object): - def __init__(self): - self.threads = {} - - def __iter__(self): - return iter(self.threads.keys()) - - def run(self, name, function, parameters): - new = Thread(name, function, parameters) - self.threads[new.pid] = new - return new.pid diff --git a/setup.cfg b/setup.cfg index c8fbcb4..4bc75cb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -7,7 +7,7 @@ test = nosetests nocapture=1 # Comment this out to see CherryPy logs on failure: nologcapture=1 -with-coverage=1 +#with-coverage=1 cover-inclusive=1 cover-package=nilmrun cover-erase=1 diff --git a/tests/test_client.py b/tests/test_client.py index b8180db..8613e2e 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -24,6 +24,7 @@ import re import urllib2 from urllib2 import urlopen, HTTPError import requests +import pprint from testutil.helpers import * @@ -61,15 +62,17 @@ class TestClient(object): def test_client_02_manager(self): client = HTTPClient(baseurl = testurl) - eq_(client.get("/thread/list"), []) + eq_(client.get("/process/list"), []) + with assert_raises(ClientError) as e: + client.get("/process/status", { "pid": 12345 }) + in_("No such PID", str(e.exception)) with assert_raises(ClientError): - client.get("/thread/status", { "pid": 12345 }) - with assert_raises(ClientError): - client.get("/thread/kill", { "pid": 12345 }) + client.get("/process/kill", { "pid": 12345 }) + in_("No such PID", str(e.exception)) def test_client_03_trainola(self): - client = HTTPClient(baseurl = testurl) + client = HTTPClient(baseurl = testurl, post_json = True) data = { "url": "http://bucket.mit.edu/nilmdb", "stream": "/sharon/prep-a", @@ -100,4 +103,14 @@ class TestClient(object): ] } client.post("/filter/trainola", { "data": data }) - + pid = client.get("/process/list")[0] + eq_(client.get("/process/list"), [pid]) + + while True: + time.sleep(1) + status = client.get("/process/status", { "pid": pid }) + pprint.pprint(status) + print status["alive"] + if status["alive"] == False: + break + pprint.pprint(status)