4 Commits

3 changed files with 192 additions and 50 deletions

View File

@@ -10,8 +10,9 @@ import os
import signal import signal
import time import time
import uuid import uuid
import subprocess
import psutil import psutil
import imp
import traceback
class LogReceiver(object): class LogReceiver(object):
"""Spawn a thread that listens to a pipe for log messages, """Spawn a thread that listens to a pipe for log messages,
@@ -96,8 +97,9 @@ class Process(object):
# Don't need this extra fd # Don't need this extra fd
os.close(wpipe) os.close(wpipe)
# Ready to go -- call the function # Ready to go -- call the function, exit when it's done
func(param) func(param)
sys.exit(0)
def terminate(self, timeout = 1.0): def terminate(self, timeout = 1.0):
"""Terminate a process, and all of its children that are in the same """Terminate a process, and all of its children that are in the same
@@ -152,6 +154,49 @@ class Process(object):
def exitcode(self): def exitcode(self):
return self._process.exitcode return self._process.exitcode
def _exec_user_code(codeargs): # pragma: no cover (runs in subprocess)
"""Execute 'code' as if it were placed into a file and executed"""
(code, args) = codeargs
# This is split off into a separate function because the Python3
# syntax of "exec" triggers a SyntaxError in Python2, if it's within
# a nested function.
imp.acquire_lock()
try:
module = imp.new_module("__main__")
finally:
imp.release_lock()
module.__file__ = "<user-code>"
sys.argv = [''] + args
# Wrap the compile and exec in a try/except so we can format the
# exception more nicely.
try:
codeobj = compile(code, '<user-code>', 'exec',
flags = 0, dont_inherit = 1)
exec(codeobj, module.__dict__, {})
except:
# Pull out the exception
info = sys.exc_info()
tblist = traceback.extract_tb(info[2])
# First entry is probably this code; get rid of it
if len(tblist) and tblist[0][2] == '_exec_user_code':
tblist = tblist[1:]
# Add the user's source code to every line that's missing it
lines = code.splitlines()
for (n, (name, line, func, text)) in enumerate(tblist):
if name == '<user-code>' and text is None and line <= len(lines):
tblist[n] = (name, line, func, lines[line-1].strip())
# Print it to stderr in the usual format
out = ['Traceback (most recent call last):\n']
out.extend(traceback.format_list(tblist))
out.extend(traceback.format_exception_only(info[0], info[1]))
sys.stderr.write("".join(out))
sys.stderr.flush()
sys.exit(1)
sys.exit(0)
class ProcessManager(object): class ProcessManager(object):
"""Track and manage a collection of Process objects""" """Track and manage a collection of Process objects"""
def __init__(self): def __init__(self):
@@ -163,16 +208,32 @@ class ProcessManager(object):
def __getitem__(self, key): def __getitem__(self, key):
return self.processes[key] return self.processes[key]
def run_python(self, name, function, parameters): def run_function(self, procname, function, parameters):
new = Process(name, function, parameters) """Run a Python function that already exists"""
new = Process(procname, function, parameters)
self.processes[new.pid] = new self.processes[new.pid] = new
return new.pid return new.pid
def run_command(self, name, args): def run_code(self, procname, code, args):
def spwan_user_command(args): # pragma: no cover (runs in subprocess) """Evaluate 'code' as if it were placed into a Python file and
p = subprocess.Popen(args, close_fds = True, cwd = "/tmp") executed. The arguments will be accessible in the code as
sys.exit(p.wait()) sys.argv[1:]."""
return self.run_python(name, spwan_user_command, args) return self.run_function(procname, _exec_user_code, (code, args))
def run_command(self, procname, argv):
"""Execute a command line program"""
def spwan_user_command(argv): # pragma: no cover (runs in subprocess)
try:
maxfd = os.sysconf("SC_OPEN_MAX")
except Exception:
maxfd = 256
os.closerange(3, maxfd)
try:
os.chdir("/tmp")
except OSError:
pass
os.execvp(argv[0], argv)
return self.run_function(procname, spwan_user_command, argv)
def terminate(self, pid): def terminate(self, pid):
return self.processes[pid].terminate() return self.processes[pid].terminate()

View File

@@ -109,40 +109,50 @@ class AppProcess(object):
self.manager.remove(pid) self.manager.remove(pid)
return status return status
# /process/command class AppRun(object):
def __init__(self, manager):
self.manager = manager
# /run/command
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_in() @cherrypy.tools.json_in()
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@cherrypy.tools.CORS_allow(methods = ["POST"]) @cherrypy.tools.CORS_allow(methods = ["POST"])
def command(self, args): def command(self, argv):
"""Execute an arbitrary program on the server. 'args' is the """Execute an arbitrary program on the server. argv is a
argument list, with 'args[0]' being the program and 'args[1]', list of the program and its arguments: 'argv[0]' is the program
'args[2]', etc as arguments.""" and 'argv[1:]' are arguments"""
return self.manager.run_command("command", args) return self.manager.run_command("command", argv)
class AppFilter(object): @cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@cherrypy.tools.CORS_allow(methods = ["POST"])
def code(self, code, args):
"""Execute arbitrary Python code. 'code' is a formatted string.
It will be run as if it were written into a Python file and
executed, with the arguments in 'args' passed on the command line
(i.e., they end up in sys.argv[1:])"""
return self.manager.run_code("usercode", code, args)
def __init__(self, manager): # /run/trainola
self.manager = manager
# /filter/trainola
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_in() @cherrypy.tools.json_in()
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@exception_to_httperror(KeyError, ValueError) @exception_to_httperror(KeyError, ValueError)
@cherrypy.tools.CORS_allow(methods = ["POST"]) @cherrypy.tools.CORS_allow(methods = ["POST"])
def trainola(self, data): def trainola(self, data):
return self.manager.run_python( return self.manager.run_function(
"trainola", nilmrun.filters.trainola.filterfunc, data) "trainola", nilmrun.filters.trainola.filterfunc, data)
# /filter/dummy # /run/dummy
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_in() @cherrypy.tools.json_in()
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@exception_to_httperror(KeyError, ValueError) @exception_to_httperror(KeyError, ValueError)
@cherrypy.tools.CORS_allow(methods = ["POST"]) @cherrypy.tools.CORS_allow(methods = ["POST"])
def dummy(self, data): def dummy(self, data):
return self.manager.run_python( return self.manager.run_function(
"dummy", nilmrun.filters.dummy.filterfunc, data) "dummy", nilmrun.filters.dummy.filterfunc, data)
class Server(object): class Server(object):
@@ -198,7 +208,7 @@ class Server(object):
manager = nilmrun.processmanager.ProcessManager() manager = nilmrun.processmanager.ProcessManager()
root = App() root = App()
root.process = AppProcess(manager) root.process = AppProcess(manager)
root.filter = AppFilter(manager) root.run = AppRun(manager)
cherrypy.tree.apps = {} cherrypy.tree.apps = {}
cherrypy.tree.mount(root, basepath, config = { "/" : app_config }) cherrypy.tree.mount(root, basepath, config = { "/" : app_config })

View File

@@ -25,6 +25,7 @@ import urllib2
from urllib2 import urlopen, HTTPError from urllib2 import urlopen, HTTPError
import requests import requests
import pprint import pprint
import textwrap
from testutil.helpers import * from testutil.helpers import *
@@ -46,15 +47,29 @@ def teardown_module():
class TestClient(object): class TestClient(object):
def wait_end(self, client, pid, timeout = 5): def wait_kill(self, client, pid, timeout = 1):
time.sleep(timeout)
status = client.get("/process/status", { "pid": pid })
if not status["alive"]:
raise AssertionError("died before we could kill it")
status = client.post("/process/remove", { "pid": pid })
if status["alive"]:
raise AssertionError("didn't get killed")
return status
def wait_end(self, client, pid, timeout = 5, remove = True):
start = time.time() start = time.time()
status = None status = None
while (time.time() - start) < timeout: while (time.time() - start) < timeout:
status = client.get("/process/status", { "pid": pid }) status = client.get("/process/status", { "pid": pid })
if status["alive"] == False: if status["alive"] == False:
return status break
raise AssertionError("process " + str(pid) + " didn't die in " + else:
str(timeout) + " seconds: " + repr(status)) raise AssertionError("process " + str(pid) + " didn't die in " +
str(timeout) + " seconds: " + repr(status))
if remove:
status = client.post("/process/remove", { "pid": pid })
return status
def test_client_01_basic(self): def test_client_01_basic(self):
client = HTTPClient(baseurl = testurl) client = HTTPClient(baseurl = testurl)
@@ -83,7 +98,7 @@ class TestClient(object):
client = HTTPClient(baseurl = testurl, post_json = True) client = HTTPClient(baseurl = testurl, post_json = True)
# start dummy filter # start dummy filter
pid = client.post("/filter/dummy", { "data": 30 }) pid = client.post("/run/dummy", { "data": 30 })
eq_(client.get("/process/list"), [pid]) eq_(client.get("/process/list"), [pid])
time.sleep(1) time.sleep(1)
@@ -101,7 +116,7 @@ class TestClient(object):
nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"]) nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
# See that it ended properly # See that it ended properly
status = self.wait_end(client, pid) status = self.wait_end(client, pid, remove = False)
in_("dummy 27\ndummy 28\ndummy 29\n", status["log"]) in_("dummy 27\ndummy 28\ndummy 29\n", status["log"])
eq_(status["exitcode"], 0) eq_(status["exitcode"], 0)
@@ -117,7 +132,7 @@ class TestClient(object):
client = HTTPClient(baseurl = testurl, post_json = True) client = HTTPClient(baseurl = testurl, post_json = True)
# Trigger exception in filter # Trigger exception in filter
pid = client.post("/filter/dummy", { "data": -1 }) pid = client.post("/run/dummy", { "data": -1 })
time.sleep(0.5) time.sleep(0.5)
status = client.get("/process/status", { "pid": pid }) status = client.get("/process/status", { "pid": pid })
eq_(status["alive"], False) eq_(status["alive"], False)
@@ -126,7 +141,7 @@ class TestClient(object):
client.post("/process/remove", { "pid": pid }) client.post("/process/remove", { "pid": pid })
# Kill a running filter by removing it early # Kill a running filter by removing it early
newpid = client.post("/filter/dummy", { "data": 50 }) newpid = client.post("/run/dummy", { "data": 50 })
ne_(newpid, pid) ne_(newpid, pid)
time.sleep(0.5) time.sleep(0.5)
start = time.time() start = time.time()
@@ -141,7 +156,7 @@ class TestClient(object):
eq_(client.get("/process/list"), []) eq_(client.get("/process/list"), [])
# Try to remove a running filter that ignored SIGTERM # Try to remove a running filter that ignored SIGTERM
pid = client.post("/filter/dummy", { "data": 0 }) pid = client.post("/run/dummy", { "data": 0 })
start = time.time() start = time.time()
status = client.post("/process/remove", { "pid": pid }) status = client.post("/process/remove", { "pid": pid })
elapsed = time.time() - start elapsed = time.time() - start
@@ -152,8 +167,8 @@ class TestClient(object):
def test_client_05_trainola_simple(self): def test_client_05_trainola_simple(self):
client = HTTPClient(baseurl = testurl, post_json = True) client = HTTPClient(baseurl = testurl, post_json = True)
pid = client.post("/filter/trainola", { "data": {} }) pid = client.post("/run/trainola", { "data": {} })
status = self.wait_end(client, pid) status = self.wait_end(client, pid, remove = False)
ne_(status["exitcode"], 0) ne_(status["exitcode"], 0)
status = client.post("/process/remove", { "pid": pid }) status = client.post("/process/remove", { "pid": pid })
@@ -191,7 +206,7 @@ class TestClient(object):
} }
# start trainola # start trainola
pid = client.post("/filter/trainola", { "data": data }) pid = client.post("/run/trainola", { "data": data })
# wait for it to finish # wait for it to finish
for i in range(60): for i in range(60):
@@ -213,25 +228,16 @@ class TestClient(object):
if i < 3: if i < 3:
raise AssertionError("too fast?") raise AssertionError("too fast?")
def test_client_07_process_command(self): def test_client_07_run_command(self):
client = HTTPClient(baseurl = testurl, post_json = True) client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("/process/list"), []) eq_(client.get("/process/list"), [])
def do(args, kill): def do(argv, kill):
pid = client.post("/process/command", { "args": args } ) pid = client.post("/run/command", { "argv": argv } )
eq_(client.get("/process/list"), [pid]) eq_(client.get("/process/list"), [pid])
if kill: if kill:
time.sleep(1) return self.wait_kill(client, pid)
status = client.get("/process/status", { "pid": pid }) return self.wait_end(client, pid)
if not status["alive"]:
raise AssertionError("died before we could kill it")
status = client.post("/process/remove", { "pid": pid })
if status["alive"]:
raise AssertionError("didn't get killed")
else:
self.wait_end(client, pid)
status = client.post("/process/remove", { "pid": pid })
return status
# Simple command # Simple command
status = do(["pwd"], False) status = do(["pwd"], False)
@@ -250,3 +256,68 @@ class TestClient(object):
# Kill a slow command # Kill a slow command
status = do(["sleep", "60"], True) status = do(["sleep", "60"], True)
ne_(status["exitcode"], 0) ne_(status["exitcode"], 0)
def test_client_08_run_code(self):
client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("/process/list"), [])
def do(code, args, kill):
pid = client.post("/run/code", { "code": code, "args": args } )
eq_(client.get("/process/list"), [pid])
if kill:
return self.wait_kill(client, pid)
return self.wait_end(client, pid)
# basic code snippet
code=textwrap.dedent("""
print 'hello'
def foo(arg):
print 'world'
""")
status = do(code, [], False)
eq_("hello\n", status["log"])
eq_(status["exitcode"], 0)
# compile error
code=textwrap.dedent("""
def foo(arg:
print 'hello'
""")
status = do(code, [], False)
in_("SyntaxError", status["log"])
eq_(status["exitcode"], 1)
# traceback in user code should be formatted nicely
code=textwrap.dedent("""
def foo(arg):
raise Exception(arg)
foo(123)
""")
status = do(code, [], False)
eq_('Traceback (most recent call last):\n' +
' File "<user-code>", line 4, in <module>\n' +
' foo(123)\n' +
' File "<user-code>", line 3, in foo\n' +
' raise Exception(arg)\n' +
'Exception: 123\n', status["log"])
eq_(status["exitcode"], 1)
# argument handling (strings come in as unicode)
code=textwrap.dedent("""
import sys
print sys.argv[1].encode('ascii'), sys.argv[2]
""")
status = do(code, ["hello", 123], False)
eq_(status["log"], "hello 123\n")
eq_(status["exitcode"], 0)
# try killing a long-running process
code=textwrap.dedent("""
import time
print 'hello'
time.sleep(60)
print 'world'
""")
status = do(code, [], True)
eq_(status["log"], "hello\n")
ne_(status["exitcode"], 0)