Compare commits
4 Commits
nilmrun-0.
...
nilmrun-0.
Author | SHA1 | Date | |
---|---|---|---|
6d9ee7b405 | |||
721d6c4936 | |||
973d328e1e | |||
0338d40226 |
@@ -10,8 +10,9 @@ import os
|
|||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
import subprocess
|
|
||||||
import psutil
|
import psutil
|
||||||
|
import imp
|
||||||
|
import traceback
|
||||||
|
|
||||||
class LogReceiver(object):
|
class LogReceiver(object):
|
||||||
"""Spawn a thread that listens to a pipe for log messages,
|
"""Spawn a thread that listens to a pipe for log messages,
|
||||||
@@ -96,8 +97,9 @@ class Process(object):
|
|||||||
# Don't need this extra fd
|
# Don't need this extra fd
|
||||||
os.close(wpipe)
|
os.close(wpipe)
|
||||||
|
|
||||||
# Ready to go -- call the function
|
# Ready to go -- call the function, exit when it's done
|
||||||
func(param)
|
func(param)
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
def terminate(self, timeout = 1.0):
|
def terminate(self, timeout = 1.0):
|
||||||
"""Terminate a process, and all of its children that are in the same
|
"""Terminate a process, and all of its children that are in the same
|
||||||
@@ -152,6 +154,49 @@ class Process(object):
|
|||||||
def exitcode(self):
|
def exitcode(self):
|
||||||
return self._process.exitcode
|
return self._process.exitcode
|
||||||
|
|
||||||
|
def _exec_user_code(codeargs): # pragma: no cover (runs in subprocess)
|
||||||
|
"""Execute 'code' as if it were placed into a file and executed"""
|
||||||
|
(code, args) = codeargs
|
||||||
|
# This is split off into a separate function because the Python3
|
||||||
|
# syntax of "exec" triggers a SyntaxError in Python2, if it's within
|
||||||
|
# a nested function.
|
||||||
|
imp.acquire_lock()
|
||||||
|
try:
|
||||||
|
module = imp.new_module("__main__")
|
||||||
|
finally:
|
||||||
|
imp.release_lock()
|
||||||
|
module.__file__ = "<user-code>"
|
||||||
|
sys.argv = [''] + args
|
||||||
|
# Wrap the compile and exec in a try/except so we can format the
|
||||||
|
# exception more nicely.
|
||||||
|
try:
|
||||||
|
codeobj = compile(code, '<user-code>', 'exec',
|
||||||
|
flags = 0, dont_inherit = 1)
|
||||||
|
exec(codeobj, module.__dict__, {})
|
||||||
|
except:
|
||||||
|
# Pull out the exception
|
||||||
|
info = sys.exc_info()
|
||||||
|
tblist = traceback.extract_tb(info[2])
|
||||||
|
|
||||||
|
# First entry is probably this code; get rid of it
|
||||||
|
if len(tblist) and tblist[0][2] == '_exec_user_code':
|
||||||
|
tblist = tblist[1:]
|
||||||
|
|
||||||
|
# Add the user's source code to every line that's missing it
|
||||||
|
lines = code.splitlines()
|
||||||
|
for (n, (name, line, func, text)) in enumerate(tblist):
|
||||||
|
if name == '<user-code>' and text is None and line <= len(lines):
|
||||||
|
tblist[n] = (name, line, func, lines[line-1].strip())
|
||||||
|
|
||||||
|
# Print it to stderr in the usual format
|
||||||
|
out = ['Traceback (most recent call last):\n']
|
||||||
|
out.extend(traceback.format_list(tblist))
|
||||||
|
out.extend(traceback.format_exception_only(info[0], info[1]))
|
||||||
|
sys.stderr.write("".join(out))
|
||||||
|
sys.stderr.flush()
|
||||||
|
sys.exit(1)
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
class ProcessManager(object):
|
class ProcessManager(object):
|
||||||
"""Track and manage a collection of Process objects"""
|
"""Track and manage a collection of Process objects"""
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@@ -163,16 +208,32 @@ class ProcessManager(object):
|
|||||||
def __getitem__(self, key):
|
def __getitem__(self, key):
|
||||||
return self.processes[key]
|
return self.processes[key]
|
||||||
|
|
||||||
def run_python(self, name, function, parameters):
|
def run_function(self, procname, function, parameters):
|
||||||
new = Process(name, function, parameters)
|
"""Run a Python function that already exists"""
|
||||||
|
new = Process(procname, function, parameters)
|
||||||
self.processes[new.pid] = new
|
self.processes[new.pid] = new
|
||||||
return new.pid
|
return new.pid
|
||||||
|
|
||||||
def run_command(self, name, args):
|
def run_code(self, procname, code, args):
|
||||||
def spwan_user_command(args): # pragma: no cover (runs in subprocess)
|
"""Evaluate 'code' as if it were placed into a Python file and
|
||||||
p = subprocess.Popen(args, close_fds = True, cwd = "/tmp")
|
executed. The arguments will be accessible in the code as
|
||||||
sys.exit(p.wait())
|
sys.argv[1:]."""
|
||||||
return self.run_python(name, spwan_user_command, args)
|
return self.run_function(procname, _exec_user_code, (code, args))
|
||||||
|
|
||||||
|
def run_command(self, procname, argv):
|
||||||
|
"""Execute a command line program"""
|
||||||
|
def spwan_user_command(argv): # pragma: no cover (runs in subprocess)
|
||||||
|
try:
|
||||||
|
maxfd = os.sysconf("SC_OPEN_MAX")
|
||||||
|
except Exception:
|
||||||
|
maxfd = 256
|
||||||
|
os.closerange(3, maxfd)
|
||||||
|
try:
|
||||||
|
os.chdir("/tmp")
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
os.execvp(argv[0], argv)
|
||||||
|
return self.run_function(procname, spwan_user_command, argv)
|
||||||
|
|
||||||
def terminate(self, pid):
|
def terminate(self, pid):
|
||||||
return self.processes[pid].terminate()
|
return self.processes[pid].terminate()
|
||||||
|
@@ -109,40 +109,50 @@ class AppProcess(object):
|
|||||||
self.manager.remove(pid)
|
self.manager.remove(pid)
|
||||||
return status
|
return status
|
||||||
|
|
||||||
# /process/command
|
class AppRun(object):
|
||||||
|
def __init__(self, manager):
|
||||||
|
self.manager = manager
|
||||||
|
|
||||||
|
# /run/command
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
@cherrypy.tools.json_in()
|
@cherrypy.tools.json_in()
|
||||||
@cherrypy.tools.json_out()
|
@cherrypy.tools.json_out()
|
||||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||||
def command(self, args):
|
def command(self, argv):
|
||||||
"""Execute an arbitrary program on the server. 'args' is the
|
"""Execute an arbitrary program on the server. argv is a
|
||||||
argument list, with 'args[0]' being the program and 'args[1]',
|
list of the program and its arguments: 'argv[0]' is the program
|
||||||
'args[2]', etc as arguments."""
|
and 'argv[1:]' are arguments"""
|
||||||
return self.manager.run_command("command", args)
|
return self.manager.run_command("command", argv)
|
||||||
|
|
||||||
class AppFilter(object):
|
@cherrypy.expose
|
||||||
|
@cherrypy.tools.json_in()
|
||||||
|
@cherrypy.tools.json_out()
|
||||||
|
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||||
|
def code(self, code, args):
|
||||||
|
"""Execute arbitrary Python code. 'code' is a formatted string.
|
||||||
|
It will be run as if it were written into a Python file and
|
||||||
|
executed, with the arguments in 'args' passed on the command line
|
||||||
|
(i.e., they end up in sys.argv[1:])"""
|
||||||
|
return self.manager.run_code("usercode", code, args)
|
||||||
|
|
||||||
def __init__(self, manager):
|
# /run/trainola
|
||||||
self.manager = manager
|
|
||||||
|
|
||||||
# /filter/trainola
|
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
@cherrypy.tools.json_in()
|
@cherrypy.tools.json_in()
|
||||||
@cherrypy.tools.json_out()
|
@cherrypy.tools.json_out()
|
||||||
@exception_to_httperror(KeyError, ValueError)
|
@exception_to_httperror(KeyError, ValueError)
|
||||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||||
def trainola(self, data):
|
def trainola(self, data):
|
||||||
return self.manager.run_python(
|
return self.manager.run_function(
|
||||||
"trainola", nilmrun.filters.trainola.filterfunc, data)
|
"trainola", nilmrun.filters.trainola.filterfunc, data)
|
||||||
|
|
||||||
# /filter/dummy
|
# /run/dummy
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
@cherrypy.tools.json_in()
|
@cherrypy.tools.json_in()
|
||||||
@cherrypy.tools.json_out()
|
@cherrypy.tools.json_out()
|
||||||
@exception_to_httperror(KeyError, ValueError)
|
@exception_to_httperror(KeyError, ValueError)
|
||||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||||
def dummy(self, data):
|
def dummy(self, data):
|
||||||
return self.manager.run_python(
|
return self.manager.run_function(
|
||||||
"dummy", nilmrun.filters.dummy.filterfunc, data)
|
"dummy", nilmrun.filters.dummy.filterfunc, data)
|
||||||
|
|
||||||
class Server(object):
|
class Server(object):
|
||||||
@@ -198,7 +208,7 @@ class Server(object):
|
|||||||
manager = nilmrun.processmanager.ProcessManager()
|
manager = nilmrun.processmanager.ProcessManager()
|
||||||
root = App()
|
root = App()
|
||||||
root.process = AppProcess(manager)
|
root.process = AppProcess(manager)
|
||||||
root.filter = AppFilter(manager)
|
root.run = AppRun(manager)
|
||||||
cherrypy.tree.apps = {}
|
cherrypy.tree.apps = {}
|
||||||
cherrypy.tree.mount(root, basepath, config = { "/" : app_config })
|
cherrypy.tree.mount(root, basepath, config = { "/" : app_config })
|
||||||
|
|
||||||
|
@@ -25,6 +25,7 @@ import urllib2
|
|||||||
from urllib2 import urlopen, HTTPError
|
from urllib2 import urlopen, HTTPError
|
||||||
import requests
|
import requests
|
||||||
import pprint
|
import pprint
|
||||||
|
import textwrap
|
||||||
|
|
||||||
from testutil.helpers import *
|
from testutil.helpers import *
|
||||||
|
|
||||||
@@ -46,15 +47,29 @@ def teardown_module():
|
|||||||
|
|
||||||
class TestClient(object):
|
class TestClient(object):
|
||||||
|
|
||||||
def wait_end(self, client, pid, timeout = 5):
|
def wait_kill(self, client, pid, timeout = 1):
|
||||||
|
time.sleep(timeout)
|
||||||
|
status = client.get("/process/status", { "pid": pid })
|
||||||
|
if not status["alive"]:
|
||||||
|
raise AssertionError("died before we could kill it")
|
||||||
|
status = client.post("/process/remove", { "pid": pid })
|
||||||
|
if status["alive"]:
|
||||||
|
raise AssertionError("didn't get killed")
|
||||||
|
return status
|
||||||
|
|
||||||
|
def wait_end(self, client, pid, timeout = 5, remove = True):
|
||||||
start = time.time()
|
start = time.time()
|
||||||
status = None
|
status = None
|
||||||
while (time.time() - start) < timeout:
|
while (time.time() - start) < timeout:
|
||||||
status = client.get("/process/status", { "pid": pid })
|
status = client.get("/process/status", { "pid": pid })
|
||||||
if status["alive"] == False:
|
if status["alive"] == False:
|
||||||
return status
|
break
|
||||||
raise AssertionError("process " + str(pid) + " didn't die in " +
|
else:
|
||||||
str(timeout) + " seconds: " + repr(status))
|
raise AssertionError("process " + str(pid) + " didn't die in " +
|
||||||
|
str(timeout) + " seconds: " + repr(status))
|
||||||
|
if remove:
|
||||||
|
status = client.post("/process/remove", { "pid": pid })
|
||||||
|
return status
|
||||||
|
|
||||||
def test_client_01_basic(self):
|
def test_client_01_basic(self):
|
||||||
client = HTTPClient(baseurl = testurl)
|
client = HTTPClient(baseurl = testurl)
|
||||||
@@ -83,7 +98,7 @@ class TestClient(object):
|
|||||||
client = HTTPClient(baseurl = testurl, post_json = True)
|
client = HTTPClient(baseurl = testurl, post_json = True)
|
||||||
|
|
||||||
# start dummy filter
|
# start dummy filter
|
||||||
pid = client.post("/filter/dummy", { "data": 30 })
|
pid = client.post("/run/dummy", { "data": 30 })
|
||||||
eq_(client.get("/process/list"), [pid])
|
eq_(client.get("/process/list"), [pid])
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
@@ -101,7 +116,7 @@ class TestClient(object):
|
|||||||
nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
|
nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
|
||||||
|
|
||||||
# See that it ended properly
|
# See that it ended properly
|
||||||
status = self.wait_end(client, pid)
|
status = self.wait_end(client, pid, remove = False)
|
||||||
in_("dummy 27\ndummy 28\ndummy 29\n", status["log"])
|
in_("dummy 27\ndummy 28\ndummy 29\n", status["log"])
|
||||||
eq_(status["exitcode"], 0)
|
eq_(status["exitcode"], 0)
|
||||||
|
|
||||||
@@ -117,7 +132,7 @@ class TestClient(object):
|
|||||||
client = HTTPClient(baseurl = testurl, post_json = True)
|
client = HTTPClient(baseurl = testurl, post_json = True)
|
||||||
|
|
||||||
# Trigger exception in filter
|
# Trigger exception in filter
|
||||||
pid = client.post("/filter/dummy", { "data": -1 })
|
pid = client.post("/run/dummy", { "data": -1 })
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
status = client.get("/process/status", { "pid": pid })
|
status = client.get("/process/status", { "pid": pid })
|
||||||
eq_(status["alive"], False)
|
eq_(status["alive"], False)
|
||||||
@@ -126,7 +141,7 @@ class TestClient(object):
|
|||||||
client.post("/process/remove", { "pid": pid })
|
client.post("/process/remove", { "pid": pid })
|
||||||
|
|
||||||
# Kill a running filter by removing it early
|
# Kill a running filter by removing it early
|
||||||
newpid = client.post("/filter/dummy", { "data": 50 })
|
newpid = client.post("/run/dummy", { "data": 50 })
|
||||||
ne_(newpid, pid)
|
ne_(newpid, pid)
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
start = time.time()
|
start = time.time()
|
||||||
@@ -141,7 +156,7 @@ class TestClient(object):
|
|||||||
eq_(client.get("/process/list"), [])
|
eq_(client.get("/process/list"), [])
|
||||||
|
|
||||||
# Try to remove a running filter that ignored SIGTERM
|
# Try to remove a running filter that ignored SIGTERM
|
||||||
pid = client.post("/filter/dummy", { "data": 0 })
|
pid = client.post("/run/dummy", { "data": 0 })
|
||||||
start = time.time()
|
start = time.time()
|
||||||
status = client.post("/process/remove", { "pid": pid })
|
status = client.post("/process/remove", { "pid": pid })
|
||||||
elapsed = time.time() - start
|
elapsed = time.time() - start
|
||||||
@@ -152,8 +167,8 @@ class TestClient(object):
|
|||||||
|
|
||||||
def test_client_05_trainola_simple(self):
|
def test_client_05_trainola_simple(self):
|
||||||
client = HTTPClient(baseurl = testurl, post_json = True)
|
client = HTTPClient(baseurl = testurl, post_json = True)
|
||||||
pid = client.post("/filter/trainola", { "data": {} })
|
pid = client.post("/run/trainola", { "data": {} })
|
||||||
status = self.wait_end(client, pid)
|
status = self.wait_end(client, pid, remove = False)
|
||||||
ne_(status["exitcode"], 0)
|
ne_(status["exitcode"], 0)
|
||||||
status = client.post("/process/remove", { "pid": pid })
|
status = client.post("/process/remove", { "pid": pid })
|
||||||
|
|
||||||
@@ -191,7 +206,7 @@ class TestClient(object):
|
|||||||
}
|
}
|
||||||
|
|
||||||
# start trainola
|
# start trainola
|
||||||
pid = client.post("/filter/trainola", { "data": data })
|
pid = client.post("/run/trainola", { "data": data })
|
||||||
|
|
||||||
# wait for it to finish
|
# wait for it to finish
|
||||||
for i in range(60):
|
for i in range(60):
|
||||||
@@ -213,25 +228,16 @@ class TestClient(object):
|
|||||||
if i < 3:
|
if i < 3:
|
||||||
raise AssertionError("too fast?")
|
raise AssertionError("too fast?")
|
||||||
|
|
||||||
def test_client_07_process_command(self):
|
def test_client_07_run_command(self):
|
||||||
client = HTTPClient(baseurl = testurl, post_json = True)
|
client = HTTPClient(baseurl = testurl, post_json = True)
|
||||||
eq_(client.get("/process/list"), [])
|
eq_(client.get("/process/list"), [])
|
||||||
|
|
||||||
def do(args, kill):
|
def do(argv, kill):
|
||||||
pid = client.post("/process/command", { "args": args } )
|
pid = client.post("/run/command", { "argv": argv } )
|
||||||
eq_(client.get("/process/list"), [pid])
|
eq_(client.get("/process/list"), [pid])
|
||||||
if kill:
|
if kill:
|
||||||
time.sleep(1)
|
return self.wait_kill(client, pid)
|
||||||
status = client.get("/process/status", { "pid": pid })
|
return self.wait_end(client, pid)
|
||||||
if not status["alive"]:
|
|
||||||
raise AssertionError("died before we could kill it")
|
|
||||||
status = client.post("/process/remove", { "pid": pid })
|
|
||||||
if status["alive"]:
|
|
||||||
raise AssertionError("didn't get killed")
|
|
||||||
else:
|
|
||||||
self.wait_end(client, pid)
|
|
||||||
status = client.post("/process/remove", { "pid": pid })
|
|
||||||
return status
|
|
||||||
|
|
||||||
# Simple command
|
# Simple command
|
||||||
status = do(["pwd"], False)
|
status = do(["pwd"], False)
|
||||||
@@ -250,3 +256,68 @@ class TestClient(object):
|
|||||||
# Kill a slow command
|
# Kill a slow command
|
||||||
status = do(["sleep", "60"], True)
|
status = do(["sleep", "60"], True)
|
||||||
ne_(status["exitcode"], 0)
|
ne_(status["exitcode"], 0)
|
||||||
|
|
||||||
|
def test_client_08_run_code(self):
|
||||||
|
client = HTTPClient(baseurl = testurl, post_json = True)
|
||||||
|
eq_(client.get("/process/list"), [])
|
||||||
|
|
||||||
|
def do(code, args, kill):
|
||||||
|
pid = client.post("/run/code", { "code": code, "args": args } )
|
||||||
|
eq_(client.get("/process/list"), [pid])
|
||||||
|
if kill:
|
||||||
|
return self.wait_kill(client, pid)
|
||||||
|
return self.wait_end(client, pid)
|
||||||
|
|
||||||
|
# basic code snippet
|
||||||
|
code=textwrap.dedent("""
|
||||||
|
print 'hello'
|
||||||
|
def foo(arg):
|
||||||
|
print 'world'
|
||||||
|
""")
|
||||||
|
status = do(code, [], False)
|
||||||
|
eq_("hello\n", status["log"])
|
||||||
|
eq_(status["exitcode"], 0)
|
||||||
|
|
||||||
|
# compile error
|
||||||
|
code=textwrap.dedent("""
|
||||||
|
def foo(arg:
|
||||||
|
print 'hello'
|
||||||
|
""")
|
||||||
|
status = do(code, [], False)
|
||||||
|
in_("SyntaxError", status["log"])
|
||||||
|
eq_(status["exitcode"], 1)
|
||||||
|
|
||||||
|
# traceback in user code should be formatted nicely
|
||||||
|
code=textwrap.dedent("""
|
||||||
|
def foo(arg):
|
||||||
|
raise Exception(arg)
|
||||||
|
foo(123)
|
||||||
|
""")
|
||||||
|
status = do(code, [], False)
|
||||||
|
eq_('Traceback (most recent call last):\n' +
|
||||||
|
' File "<user-code>", line 4, in <module>\n' +
|
||||||
|
' foo(123)\n' +
|
||||||
|
' File "<user-code>", line 3, in foo\n' +
|
||||||
|
' raise Exception(arg)\n' +
|
||||||
|
'Exception: 123\n', status["log"])
|
||||||
|
eq_(status["exitcode"], 1)
|
||||||
|
|
||||||
|
# argument handling (strings come in as unicode)
|
||||||
|
code=textwrap.dedent("""
|
||||||
|
import sys
|
||||||
|
print sys.argv[1].encode('ascii'), sys.argv[2]
|
||||||
|
""")
|
||||||
|
status = do(code, ["hello", 123], False)
|
||||||
|
eq_(status["log"], "hello 123\n")
|
||||||
|
eq_(status["exitcode"], 0)
|
||||||
|
|
||||||
|
# try killing a long-running process
|
||||||
|
code=textwrap.dedent("""
|
||||||
|
import time
|
||||||
|
print 'hello'
|
||||||
|
time.sleep(60)
|
||||||
|
print 'world'
|
||||||
|
""")
|
||||||
|
status = do(code, [], True)
|
||||||
|
eq_(status["log"], "hello\n")
|
||||||
|
ne_(status["exitcode"], 0)
|
||||||
|
Reference in New Issue
Block a user