@@ -7,4 +7,4 @@ | |||
exclude_lines = | |||
pragma: no cover | |||
if 0: | |||
omit = scripts,nilmrun/_version.py | |||
omit = scripts,nilmrun/_version.py,nilmrun/filters/* |
@@ -0,0 +1,21 @@ | |||
#!/usr/bin/python | |||
from nilmdb.utils.printf import * | |||
import time | |||
import signal | |||
import sys | |||
# This is just for testing the process management. | |||
def dummy(n): | |||
if n < 0: # raise an exception | |||
raise Exception("test exception") | |||
if n == 0: # ignore SIGTERM and count to 40 | |||
n = 40 | |||
signal.signal(signal.SIGTERM, signal.SIG_IGN) | |||
for x in range(n): | |||
s = sprintf("dummy %d\n", x) | |||
if x & 1: | |||
sys.stdout.write(s) | |||
else: | |||
sys.stderr.write(s) | |||
time.sleep(0.1) |
@@ -9,7 +9,6 @@ import sys | |||
import os | |||
import time | |||
import uuid | |||
import traceback | |||
class LogReceiver(object): | |||
"""Spawn a thread that listens to a pipe for log messages, | |||
@@ -26,20 +25,16 @@ class LogReceiver(object): | |||
data = self.pipe.recv_bytes() | |||
self.log.write(data) | |||
except EOFError: | |||
print "thread pipe died" | |||
self.pipe.close() | |||
print "thread closed" | |||
time.sleep(1) | |||
return | |||
def getvalue(self): | |||
return self.log.getvalue() | |||
def clear(self): | |||
self.log.truncate() | |||
self.log.seek(0) | |||
self.log = cStringIO.StringIO() | |||
class LogSender(object): | |||
class LogSender(object): # pragma: no cover (runs in a different process) | |||
"""File-like object that writes output to a pipe""" | |||
def __init__(self, pipe): | |||
self.pipe = pipe | |||
@@ -53,7 +48,7 @@ class LogSender(object): | |||
if self.pipe: | |||
self.pipe.send_bytes(data) | |||
def flush(self, data): | |||
def flush(self): | |||
pass | |||
def isatty(self): | |||
@@ -67,48 +62,32 @@ class Process(object): | |||
self.name = name | |||
pipes = multiprocessing.Pipe(duplex = False) | |||
self._log = LogReceiver(pipes[0]) | |||
self._process = multiprocessing.Process(target = self._tramp, | |||
args = (function, pipes), | |||
name = name) | |||
self._process = multiprocessing.Process( | |||
target = self._tramp, name = name, | |||
args = (function, pipes, parameters)) | |||
self._process.daemon = True | |||
self._process.start() | |||
pipes[1].close() | |||
self.start_time = time.time() | |||
self.pid = str(uuid.uuid1(self._process.pid or 0)) | |||
def _tramp(self, function, pipes): | |||
def _tramp(self, function, pipes, parameters): # pragma: no cover | |||
# Remap stdio in the child before calling function | |||
print "pid:", os.getpid() | |||
print "pipes:", pipes | |||
print "pipes[0].fileno():", pipes[0].fileno() | |||
print "pipes[1].fileno():", pipes[1].fileno() | |||
pipes[0].close() | |||
logfile = LogSender(pipes[1]) | |||
saved_stdin = sys.stdin | |||
saved_stdout = sys.stdout | |||
saved_stderr = sys.stderr | |||
sys.stdin = open(os.devnull, 'r') | |||
sys.stdout = logfile | |||
sys.stderr = logfile | |||
function(parameters) | |||
exitcode = 0 | |||
try: | |||
function(self.parameters) | |||
raise Exception("oh no") | |||
except: | |||
traceback.print_exc() | |||
exitcode = 1 | |||
sys.stdin = saved_stdin | |||
sys.stdout = saved_stdout | |||
sys.stderr = saved_stderr | |||
logfile.close() | |||
sys.exit(exitcode) | |||
def terminate(self, timeout = 3): | |||
def terminate(self, timeout = 1): | |||
self._process.join(timeout) | |||
if self.alive: | |||
self._process.terminate() | |||
self._process.join(timeout) | |||
if self.alive: | |||
return False | |||
return True | |||
def clear_log(self): | |||
self._log.clear() | |||
@@ -122,10 +101,8 @@ class Process(object): | |||
return self._process.is_alive() | |||
@property | |||
def error(self): | |||
if self._process.exitcode: | |||
return True | |||
return False | |||
def exitcode(self): | |||
return self._process.exitcode | |||
class ProcessManager(object): | |||
def __init__(self): | |||
@@ -141,3 +118,9 @@ class ProcessManager(object): | |||
new = Process(name, function, parameters) | |||
self.processes[new.pid] = new | |||
return new.pid | |||
def terminate(self, pid): | |||
return self.processes[pid].terminate() | |||
def remove(self, pid): | |||
del self.processes[pid] |
@@ -9,6 +9,7 @@ import decorator | |||
import psutil | |||
import traceback | |||
import argparse | |||
import time | |||
import nilmdb | |||
from nilmdb.utils.printf import * | |||
@@ -25,6 +26,7 @@ from nilmdb.server.serverutil import ( | |||
) | |||
import nilmrun | |||
import nilmrun.trainola | |||
import nilmrun.dummyfilter | |||
# Add CORS_allow tool | |||
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow) | |||
@@ -64,7 +66,7 @@ class AppProcess(object): | |||
return { | |||
"pid": pid, | |||
"alive": self.manager[pid].alive, | |||
"error": self.manager[pid].error, | |||
"exitcode": self.manager[pid].exitcode, | |||
"name": self.manager[pid].name, | |||
"start_time": self.manager[pid].start_time, | |||
"parameters": self.manager[pid].parameters, | |||
@@ -75,6 +77,8 @@ class AppProcess(object): | |||
@cherrypy.expose | |||
@cherrypy.tools.json_out() | |||
def status(self, pid, clear = False): | |||
"""Return status about a process. If clear = True, also clear | |||
the log.""" | |||
if pid not in self.manager: | |||
raise cherrypy.HTTPError("404 Not Found", "No such PID") | |||
status = self.process_status(pid) | |||
@@ -86,21 +90,24 @@ class AppProcess(object): | |||
@cherrypy.expose | |||
@cherrypy.tools.json_out() | |||
def list(self): | |||
"""Return a list of processes in the manager.""" | |||
return list(self.manager) | |||
# /process/kill | |||
# /process/remove | |||
@cherrypy.expose | |||
@cherrypy.tools.json_in() | |||
@cherrypy.tools.json_out() | |||
@cherrypy.tools.CORS_allow(methods = ["POST"]) | |||
def kill(self, pid): | |||
def remove(self, pid): | |||
"""Remove a process from the manager, killing it if necessary""" | |||
if pid not in self.manager: | |||
raise cherrypy.HTTPError("404 Not Found", "No such PID") | |||
if not self.manager.terminate(pid): | |||
raise cherrypy.HTTPError("503 Service Unavailable", | |||
"Failed to stop process") | |||
status = self.process_status(pid) | |||
manager.remove(pid) | |||
self.manager.remove(pid) | |||
return status | |||
class AppFilter(object): | |||
@@ -116,6 +123,14 @@ class AppFilter(object): | |||
def trainola(self, data): | |||
return self.manager.run("trainola", nilmrun.trainola.trainola, data) | |||
# /filter/dummy | |||
@cherrypy.expose | |||
@cherrypy.tools.json_in() | |||
@cherrypy.tools.json_out() | |||
@exception_to_httperror(KeyError, ValueError) | |||
@cherrypy.tools.CORS_allow(methods = ["POST"]) | |||
def dummy(self, count): | |||
return self.manager.run("dummy", nilmrun.dummyfilter.dummy, int(count)) | |||
class Server(object): | |||
def __init__(self, host = '127.0.0.1', port = 8080, | |||
@@ -7,7 +7,7 @@ test = nosetests | |||
nocapture=1 | |||
# Comment this out to see CherryPy logs on failure: | |||
nologcapture=1 | |||
#with-coverage=1 | |||
with-coverage=1 | |||
cover-inclusive=1 | |||
cover-package=nilmrun | |||
cover-erase=1 | |||
@@ -2,7 +2,7 @@ | |||
import nilmrun.server | |||
from nilmdb.client.httpclient import HTTPClient, ClientError | |||
from nilmdb.client.httpclient import HTTPClient, ClientError, ServerError | |||
from nilmdb.utils.printf import * | |||
@@ -33,8 +33,6 @@ testurl = "http://localhost:32181/" | |||
def setup_module(): | |||
global test_server | |||
print dir(nilmrun) | |||
# Start web app on a custom port | |||
test_server = nilmrun.server.Server(host = "127.0.0.1", | |||
port = 32181, | |||
@@ -68,10 +66,100 @@ class TestClient(object): | |||
client.get("/process/status", { "pid": 12345 }) | |||
in_("No such PID", str(e.exception)) | |||
with assert_raises(ClientError): | |||
client.get("/process/kill", { "pid": 12345 }) | |||
client.get("/process/remove", { "pid": 12345 }) | |||
in_("No such PID", str(e.exception)) | |||
def test_client_03_trainola(self): | |||
def test_client_03_process_basic(self): | |||
client = HTTPClient(baseurl = testurl, post_json = True) | |||
# start dummy filter | |||
pid = client.post("/filter/dummy", { "count": 30 }) | |||
eq_(client.get("/process/list"), [pid]) | |||
time.sleep(1) | |||
# Verify that status looks OK | |||
status = client.get("/process/status", { "pid": pid, "clear": True }) | |||
for x in [ "pid", "alive", "exitcode", "name", | |||
"start_time", "parameters", "log" ]: | |||
in_(x, status) | |||
in_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"]) | |||
eq_(status["alive"], True) | |||
eq_(status["exitcode"], None) | |||
# Check that the log got cleared | |||
status = client.get("/process/status", { "pid": pid }) | |||
nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"]) | |||
# See that it ended properly | |||
start = time.time() | |||
while status["alive"] == True and (time.time() - start) < 5: | |||
status = client.get("/process/status", { "pid": pid }) | |||
in_("dummy 27\ndummy 28\ndummy 29\n", status["log"]) | |||
eq_(status["alive"], False) | |||
eq_(status["exitcode"], 0) | |||
# Remove it | |||
killstatus = client.post("/process/remove", { "pid": pid }) | |||
eq_(status, killstatus) | |||
eq_(client.get("/process/list"), []) | |||
with assert_raises(ClientError) as e: | |||
client.post("/process/remove", { "pid": pid }) | |||
in_("No such PID", str(e.exception)) | |||
def test_client_04_process_terminate(self): | |||
client = HTTPClient(baseurl = testurl, post_json = True) | |||
# Trigger exception in filter | |||
pid = client.post("/filter/dummy", { "count": -1 }) | |||
time.sleep(0.5) | |||
status = client.get("/process/status", { "pid": pid }) | |||
eq_(status["alive"], False) | |||
eq_(status["exitcode"], 1) | |||
in_("Exception: test exception", status["log"]) | |||
client.post("/process/remove", { "pid": pid }) | |||
# Kill a running filter by removing it early | |||
newpid = client.post("/filter/dummy", { "count": 30 }) | |||
ne_(newpid, pid) | |||
time.sleep(0.5) | |||
status = client.post("/process/remove", { "pid": newpid }) | |||
eq_(status["alive"], False) | |||
ne_(status["exitcode"], 0) | |||
# No more | |||
eq_(client.get("/process/list"), []) | |||
# Try to remove a running filter that ignored SIGTERM | |||
# (can't be killed) | |||
pid = client.post("/filter/dummy", { "count": 0 }) | |||
with assert_raises(ServerError) as e: | |||
status = client.post("/process/remove", { "pid": pid }) | |||
in_("503 Service Unavailable", str(e.exception)) | |||
in_("Failed to stop process", str(e.exception)) | |||
# Wait for it to die, then remove it | |||
start = time.time() | |||
while (time.time() - start) < 5: | |||
status = client.get("/process/status", { "pid": pid }) | |||
if status["alive"] == False: | |||
break | |||
eq_(status["alive"], False) | |||
in_("dummy 39\n", status["log"]) | |||
eq_(status["exitcode"], 0) | |||
def test_client_05_trainola_simple(self): | |||
client = HTTPClient(baseurl = testurl, post_json = True) | |||
pid = client.post("/filter/trainola", { "data": {} }) | |||
start = time.time() | |||
while (time.time() - start) < 5: | |||
status = client.get("/process/status", { "pid": pid }) | |||
if status["alive"] == False: | |||
break | |||
eq_(status["alive"], False) | |||
ne_(status["exitcode"], 0) | |||
@unittest.skip("needs a running nilmdb") | |||
def test_client_06_trainola(self): | |||
client = HTTPClient(baseurl = testurl, post_json = True) | |||
data = { "url": "http://bucket.mit.edu/nilmdb", | |||
@@ -102,15 +190,26 @@ class TestClient(object): | |||
} | |||
] | |||
} | |||
client.post("/filter/trainola", { "data": data }) | |||
pid = client.get("/process/list")[0] | |||
eq_(client.get("/process/list"), [pid]) | |||
while True: | |||
# start trainola | |||
pid = client.post("/filter/trainola", { "data": data }) | |||
# wait for it to finish | |||
for i in range(60): | |||
time.sleep(1) | |||
status = client.get("/process/status", { "pid": pid }) | |||
pprint.pprint(status) | |||
print status["alive"] | |||
if i == 2: | |||
status = client.get("/process/status", { "pid": pid, | |||
"clear": True }) | |||
in_("Loading stream data", status['log']) | |||
elif i == 3: | |||
status = client.get("/process/status", { "pid": pid }) | |||
nin_("Loading stream data", status['log']) | |||
else: | |||
status = client.get("/process/status", { "pid": pid }) | |||
if status["alive"] == False: | |||
break | |||
pprint.pprint(status) | |||
else: | |||
client.post("/process/remove", {"pid": pid }) | |||
raise AssertionError("took too long") | |||
if i < 3: | |||
raise AssertionError("too fast?") |
@@ -18,6 +18,10 @@ def in_(a, b): | |||
if a not in b: | |||
raise AssertionError("%s not in %s" % (myrepr(a), myrepr(b))) | |||
def nin_(a, b): | |||
if a in b: | |||
raise AssertionError("unexpected %s in %s" % (myrepr(a), myrepr(b))) | |||
def in2_(a1, a2, b): | |||
if a1 not in b and a2 not in b: | |||
raise AssertionError("(%s or %s) not in %s" % (myrepr(a1), myrepr(a2), | |||