diff --git a/.coverage b/.coverage deleted file mode 100644 index b12e882..0000000 Binary files a/.coverage and /dev/null differ diff --git a/.coveragerc b/.coveragerc index 6de8124..a56afa8 100644 --- a/.coveragerc +++ b/.coveragerc @@ -7,4 +7,4 @@ exclude_lines = pragma: no cover if 0: -omit = scripts,nilmrun/_version.py +omit = scripts,nilmrun/_version.py,nilmrun/filters/* diff --git a/nilmrun/dummyfilter.py b/nilmrun/dummyfilter.py new file mode 100644 index 0000000..40b450d --- /dev/null +++ b/nilmrun/dummyfilter.py @@ -0,0 +1,21 @@ +#!/usr/bin/python + +from nilmdb.utils.printf import * +import time +import signal +import sys + +# This is just for testing the process management. +def dummy(n): + if n < 0: # raise an exception + raise Exception("test exception") + if n == 0: # ignore SIGTERM and count to 40 + n = 40 + signal.signal(signal.SIGTERM, signal.SIG_IGN) + for x in range(n): + s = sprintf("dummy %d\n", x) + if x & 1: + sys.stdout.write(s) + else: + sys.stderr.write(s) + time.sleep(0.1) diff --git a/nilmrun/processmanager.py b/nilmrun/processmanager.py index 5d3c2af..bd12b59 100644 --- a/nilmrun/processmanager.py +++ b/nilmrun/processmanager.py @@ -9,7 +9,6 @@ import sys import os import time import uuid -import traceback class LogReceiver(object): """Spawn a thread that listens to a pipe for log messages, @@ -26,20 +25,16 @@ class LogReceiver(object): data = self.pipe.recv_bytes() self.log.write(data) except EOFError: - print "thread pipe died" self.pipe.close() - print "thread closed" - time.sleep(1) return def getvalue(self): return self.log.getvalue() def clear(self): - self.log.truncate() - self.log.seek(0) + self.log = cStringIO.StringIO() -class LogSender(object): +class LogSender(object): # pragma: no cover (runs in a different process) """File-like object that writes output to a pipe""" def __init__(self, pipe): self.pipe = pipe @@ -53,7 +48,7 @@ class LogSender(object): if self.pipe: self.pipe.send_bytes(data) - def flush(self, data): + def flush(self): pass def isatty(self): @@ -67,48 +62,32 @@ class Process(object): self.name = name pipes = multiprocessing.Pipe(duplex = False) self._log = LogReceiver(pipes[0]) - self._process = multiprocessing.Process(target = self._tramp, - args = (function, pipes), - name = name) + self._process = multiprocessing.Process( + target = self._tramp, name = name, + args = (function, pipes, parameters)) self._process.daemon = True self._process.start() pipes[1].close() self.start_time = time.time() self.pid = str(uuid.uuid1(self._process.pid or 0)) - def _tramp(self, function, pipes): + def _tramp(self, function, pipes, parameters): # pragma: no cover # Remap stdio in the child before calling function - print "pid:", os.getpid() - print "pipes:", pipes - print "pipes[0].fileno():", pipes[0].fileno() - print "pipes[1].fileno():", pipes[1].fileno() pipes[0].close() logfile = LogSender(pipes[1]) - - saved_stdin = sys.stdin - saved_stdout = sys.stdout - saved_stderr = sys.stderr sys.stdin = open(os.devnull, 'r') sys.stdout = logfile sys.stderr = logfile + function(parameters) - exitcode = 0 - try: - function(self.parameters) - raise Exception("oh no") - except: - traceback.print_exc() - exitcode = 1 - sys.stdin = saved_stdin - sys.stdout = saved_stdout - sys.stderr = saved_stderr - logfile.close() - sys.exit(exitcode) - - def terminate(self, timeout = 3): + def terminate(self, timeout = 1): self._process.join(timeout) if self.alive: self._process.terminate() + self._process.join(timeout) + if self.alive: + return False + return True def clear_log(self): self._log.clear() @@ -122,10 +101,8 @@ class Process(object): return self._process.is_alive() @property - def error(self): - if self._process.exitcode: - return True - return False + def exitcode(self): + return self._process.exitcode class ProcessManager(object): def __init__(self): @@ -141,3 +118,9 @@ class ProcessManager(object): new = Process(name, function, parameters) self.processes[new.pid] = new return new.pid + + def terminate(self, pid): + return self.processes[pid].terminate() + + def remove(self, pid): + del self.processes[pid] diff --git a/nilmrun/server.py b/nilmrun/server.py old mode 100755 new mode 100644 index 493e4e9..bcec06c --- a/nilmrun/server.py +++ b/nilmrun/server.py @@ -9,6 +9,7 @@ import decorator import psutil import traceback import argparse +import time import nilmdb from nilmdb.utils.printf import * @@ -25,6 +26,7 @@ from nilmdb.server.serverutil import ( ) import nilmrun import nilmrun.trainola +import nilmrun.dummyfilter # Add CORS_allow tool cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow) @@ -64,7 +66,7 @@ class AppProcess(object): return { "pid": pid, "alive": self.manager[pid].alive, - "error": self.manager[pid].error, + "exitcode": self.manager[pid].exitcode, "name": self.manager[pid].name, "start_time": self.manager[pid].start_time, "parameters": self.manager[pid].parameters, @@ -75,6 +77,8 @@ class AppProcess(object): @cherrypy.expose @cherrypy.tools.json_out() def status(self, pid, clear = False): + """Return status about a process. If clear = True, also clear + the log.""" if pid not in self.manager: raise cherrypy.HTTPError("404 Not Found", "No such PID") status = self.process_status(pid) @@ -86,21 +90,24 @@ class AppProcess(object): @cherrypy.expose @cherrypy.tools.json_out() def list(self): + """Return a list of processes in the manager.""" return list(self.manager) - # /process/kill + # /process/remove @cherrypy.expose @cherrypy.tools.json_in() @cherrypy.tools.json_out() @cherrypy.tools.CORS_allow(methods = ["POST"]) - def kill(self, pid): + def remove(self, pid): + """Remove a process from the manager, killing it if necessary""" if pid not in self.manager: raise cherrypy.HTTPError("404 Not Found", "No such PID") if not self.manager.terminate(pid): raise cherrypy.HTTPError("503 Service Unavailable", "Failed to stop process") status = self.process_status(pid) - manager.remove(pid) + self.manager.remove(pid) + return status class AppFilter(object): @@ -116,6 +123,14 @@ class AppFilter(object): def trainola(self, data): return self.manager.run("trainola", nilmrun.trainola.trainola, data) + # /filter/dummy + @cherrypy.expose + @cherrypy.tools.json_in() + @cherrypy.tools.json_out() + @exception_to_httperror(KeyError, ValueError) + @cherrypy.tools.CORS_allow(methods = ["POST"]) + def dummy(self, count): + return self.manager.run("dummy", nilmrun.dummyfilter.dummy, int(count)) class Server(object): def __init__(self, host = '127.0.0.1', port = 8080, diff --git a/nilmrun/trainola.py b/nilmrun/trainola.py old mode 100755 new mode 100644 diff --git a/setup.cfg b/setup.cfg index 4bc75cb..c8fbcb4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -7,7 +7,7 @@ test = nosetests nocapture=1 # Comment this out to see CherryPy logs on failure: nologcapture=1 -#with-coverage=1 +with-coverage=1 cover-inclusive=1 cover-package=nilmrun cover-erase=1 diff --git a/tests/test_client.py b/tests/test_client.py index 8613e2e..a7d5982 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -2,7 +2,7 @@ import nilmrun.server -from nilmdb.client.httpclient import HTTPClient, ClientError +from nilmdb.client.httpclient import HTTPClient, ClientError, ServerError from nilmdb.utils.printf import * @@ -33,8 +33,6 @@ testurl = "http://localhost:32181/" def setup_module(): global test_server - print dir(nilmrun) - # Start web app on a custom port test_server = nilmrun.server.Server(host = "127.0.0.1", port = 32181, @@ -68,10 +66,100 @@ class TestClient(object): client.get("/process/status", { "pid": 12345 }) in_("No such PID", str(e.exception)) with assert_raises(ClientError): - client.get("/process/kill", { "pid": 12345 }) + client.get("/process/remove", { "pid": 12345 }) in_("No such PID", str(e.exception)) - def test_client_03_trainola(self): + def test_client_03_process_basic(self): + client = HTTPClient(baseurl = testurl, post_json = True) + + # start dummy filter + pid = client.post("/filter/dummy", { "count": 30 }) + eq_(client.get("/process/list"), [pid]) + time.sleep(1) + + # Verify that status looks OK + status = client.get("/process/status", { "pid": pid, "clear": True }) + for x in [ "pid", "alive", "exitcode", "name", + "start_time", "parameters", "log" ]: + in_(x, status) + in_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"]) + eq_(status["alive"], True) + eq_(status["exitcode"], None) + + # Check that the log got cleared + status = client.get("/process/status", { "pid": pid }) + nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"]) + + # See that it ended properly + start = time.time() + while status["alive"] == True and (time.time() - start) < 5: + status = client.get("/process/status", { "pid": pid }) + in_("dummy 27\ndummy 28\ndummy 29\n", status["log"]) + eq_(status["alive"], False) + eq_(status["exitcode"], 0) + + # Remove it + killstatus = client.post("/process/remove", { "pid": pid }) + eq_(status, killstatus) + eq_(client.get("/process/list"), []) + with assert_raises(ClientError) as e: + client.post("/process/remove", { "pid": pid }) + in_("No such PID", str(e.exception)) + + def test_client_04_process_terminate(self): + client = HTTPClient(baseurl = testurl, post_json = True) + + # Trigger exception in filter + pid = client.post("/filter/dummy", { "count": -1 }) + time.sleep(0.5) + status = client.get("/process/status", { "pid": pid }) + eq_(status["alive"], False) + eq_(status["exitcode"], 1) + in_("Exception: test exception", status["log"]) + client.post("/process/remove", { "pid": pid }) + + # Kill a running filter by removing it early + newpid = client.post("/filter/dummy", { "count": 30 }) + ne_(newpid, pid) + time.sleep(0.5) + status = client.post("/process/remove", { "pid": newpid }) + eq_(status["alive"], False) + ne_(status["exitcode"], 0) + + # No more + eq_(client.get("/process/list"), []) + + # Try to remove a running filter that ignored SIGTERM + # (can't be killed) + pid = client.post("/filter/dummy", { "count": 0 }) + with assert_raises(ServerError) as e: + status = client.post("/process/remove", { "pid": pid }) + in_("503 Service Unavailable", str(e.exception)) + in_("Failed to stop process", str(e.exception)) + + # Wait for it to die, then remove it + start = time.time() + while (time.time() - start) < 5: + status = client.get("/process/status", { "pid": pid }) + if status["alive"] == False: + break + eq_(status["alive"], False) + in_("dummy 39\n", status["log"]) + eq_(status["exitcode"], 0) + + def test_client_05_trainola_simple(self): + client = HTTPClient(baseurl = testurl, post_json = True) + pid = client.post("/filter/trainola", { "data": {} }) + start = time.time() + while (time.time() - start) < 5: + status = client.get("/process/status", { "pid": pid }) + if status["alive"] == False: + break + eq_(status["alive"], False) + ne_(status["exitcode"], 0) + + @unittest.skip("needs a running nilmdb") + def test_client_06_trainola(self): client = HTTPClient(baseurl = testurl, post_json = True) data = { "url": "http://bucket.mit.edu/nilmdb", @@ -102,15 +190,26 @@ class TestClient(object): } ] } - client.post("/filter/trainola", { "data": data }) - pid = client.get("/process/list")[0] - eq_(client.get("/process/list"), [pid]) - while True: + # start trainola + pid = client.post("/filter/trainola", { "data": data }) + + # wait for it to finish + for i in range(60): time.sleep(1) - status = client.get("/process/status", { "pid": pid }) - pprint.pprint(status) - print status["alive"] + if i == 2: + status = client.get("/process/status", { "pid": pid, + "clear": True }) + in_("Loading stream data", status['log']) + elif i == 3: + status = client.get("/process/status", { "pid": pid }) + nin_("Loading stream data", status['log']) + else: + status = client.get("/process/status", { "pid": pid }) if status["alive"] == False: break - pprint.pprint(status) + else: + client.post("/process/remove", {"pid": pid }) + raise AssertionError("took too long") + if i < 3: + raise AssertionError("too fast?") diff --git a/tests/testutil/helpers.py b/tests/testutil/helpers.py index 43a57f3..b736bd6 100644 --- a/tests/testutil/helpers.py +++ b/tests/testutil/helpers.py @@ -18,6 +18,10 @@ def in_(a, b): if a not in b: raise AssertionError("%s not in %s" % (myrepr(a), myrepr(b))) +def nin_(a, b): + if a in b: + raise AssertionError("unexpected %s in %s" % (myrepr(a), myrepr(b))) + def in2_(a1, a2, b): if a1 not in b and a2 not in b: raise AssertionError("(%s or %s) not in %s" % (myrepr(a1), myrepr(a2),