7 Commits

Author SHA1 Message Date
f5225f88f9 Add max CPU percentage 2013-07-17 18:48:55 -04:00
32e59310ef Fix for dead processes 2013-07-17 18:19:52 -04:00
5a33ef48cc Add /process/info request 2013-07-17 18:12:44 -04:00
18a5cd6334 Improve boolean parameter parsing 2013-07-15 14:39:28 -04:00
7ec4d60d38 Fix WSGI docs 2013-07-11 16:36:18 -04:00
b2bdf784ac Make test URLs relative 2013-07-11 11:46:02 -04:00
e0709f0d17 Remove multiprocessing due to mod_wsgi incompatibility; use subprocess
Multiprocessing and Apache's mod_wsgi don't play nicely.  Switch to
manually managing processes via subprocess.Popen etc instead.  When
running arbitrary code, we write it to an external file, and running
functions directly is no longer supported.
2013-07-11 11:39:22 -04:00
6 changed files with 319 additions and 240 deletions

View File

@@ -9,7 +9,8 @@ Prerequisites:
# Base dependencies # Base dependencies
sudo apt-get install python-numpy python-scipy sudo apt-get install python-numpy python-scipy
nilmdb (1.8.0+) # Plus nilmdb and its dependencies
nilmdb (1.8.2+)
Install: Install:

View File

@@ -21,9 +21,9 @@ arbitrary commands.
SSLEngine On SSLEngine On
WSGIScriptAlias /nilmrun /home/nilm/nilmrun.wsgi WSGIScriptAlias /nilmrun /home/nilm/nilmrun.wsgi
WSGIProcessGroup nilmrun-procgroup
WSGIDaemonProcess nilmrun-procgroup threads=32 user=nilm group=nilm WSGIDaemonProcess nilmrun-procgroup threads=32 user=nilm group=nilm
<Location /nilmrun> <Location /nilmrun>
WSGIProcessGroup nilmrun-procgroup
WSGIApplicationGroup nilmrun-appgroup WSGIApplicationGroup nilmrun-appgroup
SSLRequireSSL SSLRequireSSL

View File

@@ -3,7 +3,7 @@
from nilmdb.utils.printf import * from nilmdb.utils.printf import *
import threading import threading
import multiprocessing import subprocess
import cStringIO import cStringIO
import sys import sys
import os import os
@@ -11,8 +11,12 @@ import signal
import time import time
import uuid import uuid
import psutil import psutil
import imp import tempfile
import traceback import atexit
import shutil
class ProcessError(Exception):
pass
class LogReceiver(object): class LogReceiver(object):
"""Spawn a thread that listens to a pipe for log messages, """Spawn a thread that listens to a pipe for log messages,
@@ -38,8 +42,8 @@ class LogReceiver(object):
self.log = cStringIO.StringIO() self.log = cStringIO.StringIO()
class Process(object): class Process(object):
"""Spawn and manage a process that calls a Python function""" """Spawn and manage a subprocess, and capture its output."""
def __init__(self, name, function, parameters): def __init__(self, name, argv, tempfile = None):
self.start_time = None self.start_time = None
self.name = name self.name = name
@@ -47,65 +51,40 @@ class Process(object):
(rpipe, wpipe) = os.pipe() (rpipe, wpipe) = os.pipe()
self._log = LogReceiver(rpipe) self._log = LogReceiver(rpipe)
# Start the function in a new process # Stdin is null
self._process = multiprocessing.Process( nullfd = os.open(os.devnull, os.O_RDONLY)
target = self._trampoline, name = name,
args = (function, rpipe, wpipe, parameters))
self._process.daemon = True
self._process.start()
# Close the writer end of the pipe, get process info # Spawn the new process
try:
self._process = subprocess.Popen(args = argv, stdin = nullfd,
stdout = wpipe, stderr = wpipe,
close_fds = True, cwd = "/tmp")
except (OSError, TypeError) as e:
raise ProcessError(str(e))
finally:
# Close the FDs we don't need
os.close(wpipe) os.close(wpipe)
os.close(nullfd)
# Get process info
self.start_time = time.time() self.start_time = time.time()
self.pid = str(uuid.uuid1(self._process.pid or 0)) self.pid = str(uuid.uuid1(self._process.pid or 0))
def _trampoline(self, func, rpipe, wpipe, param): # pragma: no cover def _join(self, timeout = 1.0):
# No coverage report for this, because it's executed in a subprocess start = time.time()
"""Trampoline function to set up stdio and call the real function.""" while True:
# Close the reader end of the pipe if self._process.poll() is not None:
os.close(rpipe) return True
if (time.time() - start) >= timeout:
# Like os.close() but ignores errors return False
def tryclose(fd): time.sleep(0.1)
try:
os.close(fd)
except OSError:
pass
# Remap stdio to go to the pipe. We do this at the OS level,
# replacing FDs, so that future spawned processes do the right thing.
# stdin
sys.stdin.close()
tryclose(0)
fd = os.open(os.devnull, os.O_RDONLY) # 0
sys.stdin = os.fdopen(fd, 'r', 0)
# stdout
sys.stdout.close()
tryclose(1)
fd = os.dup(wpipe) # 1
sys.stdout = os.fdopen(fd, 'w', 0)
# stdout
sys.stderr.close()
tryclose(2)
fd = os.dup(wpipe) # 2
sys.stderr = os.fdopen(fd, 'w', 0)
# Don't need this extra fd
os.close(wpipe)
# Ready to go -- call the function, exit when it's done
func(param)
sys.exit(0)
def terminate(self, timeout = 1.0): def terminate(self, timeout = 1.0):
"""Terminate a process, and all of its children that are in the same """Terminate a process, and all of its children that are in the same
process group.""" process group."""
try:
# First give it some time to die on its own # First give it some time to die on its own
self._process.join(timeout) if self._join(timeout):
if not self.alive:
return True return True
def getpgid(pid): def getpgid(pid):
@@ -125,8 +104,7 @@ class Process(object):
os.kill(proc.pid, signal.SIGTERM) os.kill(proc.pid, signal.SIGTERM)
# Wait for it to die again # Wait for it to die again
self._process.join(timeout) if self._join(timeout):
if not self.alive:
return True return True
# One more try with SIGKILL # One more try with SIGKILL
@@ -135,8 +113,9 @@ class Process(object):
os.kill(proc.pid, signal.SIGKILL) os.kill(proc.pid, signal.SIGKILL)
# See if it worked # See if it worked
self._process.join(timeout) return self._join(timeout)
return not self.alive except psutil.Error: # pragma: no cover (race condition)
return True
def clear_log(self): def clear_log(self):
self._log.clear() self._log.clear()
@@ -147,64 +126,62 @@ class Process(object):
@property @property
def alive(self): def alive(self):
return self._process.is_alive() return self._process.poll() is None
@property @property
def exitcode(self): def exitcode(self):
return self._process.exitcode return self._process.returncode
def _exec_user_code(codeargs): # pragma: no cover (runs in subprocess) def get_info_prepare(self):
"""Execute 'code' as if it were placed into a file and executed""" """Prepare the process list and measurement for .get_info.
(code, args) = codeargs Call .get_info() about a second later."""
# This is split off into a separate function because the Python3
# syntax of "exec" triggers a SyntaxError in Python2, if it's within
# a nested function.
imp.acquire_lock()
try: try:
module = imp.new_module("__main__") main = psutil.Process(self._process.pid)
finally: self._process_list = [ main ] + main.get_children(recursive = True)
imp.release_lock() for proc in self._process_list:
module.__file__ = "<user-code>" proc.get_cpu_percent(0)
sys.argv = [''] + args except psutil.Error: # pragma: no cover (race condition)
# Wrap the compile and exec in a try/except so we can format the self._process_list = [ ]
# exception more nicely.
@staticmethod
def get_empty_info():
return { "cpu_percent": 0,
"cpu_user": 0,
"cpu_sys": 0,
"mem_phys": 0,
"mem_virt": 0,
"io_read": 0,
"io_write": 0,
"procs": 0 }
def get_info(self):
"""Return a dictionary with info about the process CPU and memory
usage. Call .get_info_prepare() about a second before this."""
d = self.get_empty_info()
for proc in self._process_list:
try: try:
codeobj = compile(code, '<user-code>', 'exec', d["cpu_percent"] += proc.get_cpu_percent(0)
flags = 0, dont_inherit = 1) cpuinfo = proc.get_cpu_times()
exec(codeobj, module.__dict__, {}) d["cpu_user"] += cpuinfo.user
except Exception: d["cpu_sys"] += cpuinfo.system
try: meminfo = proc.get_memory_info()
# Pull out the exception d["mem_phys"] += meminfo.rss
info = sys.exc_info() d["mem_virt"] += meminfo.vms
tblist = traceback.extract_tb(info[2]) ioinfo = proc.get_io_counters()
d["io_read"] += ioinfo.read_bytes
# First entry is probably this code; get rid of it d["io_write"] += ioinfo.write_bytes
if len(tblist) and tblist[0][2] == '_exec_user_code': d["procs"] += 1
tblist = tblist[1:] except psutil.Error:
pass
# Add the user's source code to every line that's missing it return d
lines = code.splitlines()
maxline = len(lines)
for (n, (name, line, func, text)) in enumerate(tblist):
if name == '<user-code>' and text is None and line <= maxline:
tblist[n] = (name, line, func, lines[line-1].strip())
# Format it in the usual manner
out = ['Traceback (most recent call last):\n']
out.extend(traceback.format_list(tblist))
out.extend(traceback.format_exception_only(info[0], info[1]))
finally:
# Need to explicitly delete traceback object to avoid ref cycle
del info
sys.stderr.write("".join(out))
sys.stderr.flush()
sys.exit(1)
sys.exit(0)
class ProcessManager(object): class ProcessManager(object):
"""Track and manage a collection of Process objects""" """Track and manage a collection of Process objects"""
def __init__(self): def __init__(self):
self.processes = {} self.processes = {}
self.tmpfiles = {}
self.tmpdir = tempfile.mkdtemp(prefix = "nilmrun-usercode-")
atexit.register(shutil.rmtree, self.tmpdir)
def __iter__(self): def __iter__(self):
return iter(self.processes.keys()) return iter(self.processes.keys())
@@ -212,35 +189,69 @@ class ProcessManager(object):
def __getitem__(self, key): def __getitem__(self, key):
return self.processes[key] return self.processes[key]
def run_function(self, procname, function, parameters):
"""Run a Python function that already exists"""
new = Process(procname, function, parameters)
self.processes[new.pid] = new
return new.pid
def run_code(self, procname, code, args): def run_code(self, procname, code, args):
"""Evaluate 'code' as if it were placed into a Python file and """Evaluate 'code' as if it were placed into a Python file and
executed. The arguments will be accessible in the code as executed. The arguments, which must be strings, will be
sys.argv[1:].""" accessible in the code as sys.argv[1:]."""
return self.run_function(procname, _exec_user_code, (code, args)) # The easiest way to do this, by far, is to just write the
# code to a file.
(fd, path) = tempfile.mkstemp(prefix = "nilmrun-usercode-",
suffix = ".py", dir=self.tmpdir)
with os.fdopen(fd, 'w') as f:
f.write(code)
argv = [ sys.executable, "-B", "-s", "-u", path ] + args
pid = self.run_command(procname, argv)
self.tmpfiles[pid] = path
return pid
def run_command(self, procname, argv): def run_command(self, procname, argv):
"""Execute a command line program""" """Execute a command line program"""
def spwan_user_command(argv): # pragma: no cover (runs in subprocess) new = Process(procname, argv)
try: self.processes[new.pid] = new
maxfd = os.sysconf("SC_OPEN_MAX") return new.pid
except Exception:
maxfd = 256
os.closerange(3, maxfd)
try:
os.chdir("/tmp")
except OSError:
pass
os.execvp(argv[0], argv)
return self.run_function(procname, spwan_user_command, argv)
def terminate(self, pid): def terminate(self, pid):
return self.processes[pid].terminate() return self.processes[pid].terminate()
def remove(self, pid): def remove(self, pid):
if pid in self.tmpfiles:
try:
os.unlink(self.tmpfiles[pid])
except OSError: # pragma: no cover
pass
del self.tmpfiles[pid]
del self.processes[pid] del self.processes[pid]
def get_info(self):
"""Get info about all running PIDs"""
info = { "total" : Process.get_empty_info(),
"pids" : {},
"system" : {}
}
# Trigger CPU usage collection
for pid in self:
self[pid].get_info_prepare()
psutil.cpu_percent(0, percpu = True)
# Give it some time
time.sleep(1)
# Retrieve info for system
info["system"]["cpu_percent"] = sum(psutil.cpu_percent(0, percpu=True))
info["system"]["cpu_max"] = 100.0 * psutil.NUM_CPUS
info["system"]["procs"] = len(psutil.get_pid_list())
# psutil > 0.6.0's psutil.virtual_memory() would be better here,
# but this should give the same info.
meminfo = psutil.phymem_usage()
info["system"]["mem_total"] = meminfo.total
info["system"]["mem_used"] = int(meminfo.total * meminfo.percent / 100)
# Retrieve info for each PID
for pid in self:
info["pids"][pid] = self[pid].get_info()
# Update totals
for key in info["total"]:
info["total"][key] += info["pids"][pid][key]
return info

View File

@@ -23,6 +23,7 @@ from nilmdb.server.serverutil import (
json_error_page, json_error_page,
cherrypy_start, cherrypy_start,
cherrypy_stop, cherrypy_stop,
bool_param,
) )
import nilmrun import nilmrun
import nilmrun.testfilter import nilmrun.testfilter
@@ -77,6 +78,7 @@ class AppProcess(object):
def status(self, pid, clear = False): def status(self, pid, clear = False):
"""Return status about a process. If clear = True, also clear """Return status about a process. If clear = True, also clear
the log.""" the log."""
clear = bool_param(clear)
if pid not in self.manager: if pid not in self.manager:
raise cherrypy.HTTPError("404 Not Found", "No such PID") raise cherrypy.HTTPError("404 Not Found", "No such PID")
status = self.process_status(pid) status = self.process_status(pid)
@@ -91,6 +93,14 @@ class AppProcess(object):
"""Return a list of processes in the manager.""" """Return a list of processes in the manager."""
return list(self.manager) return list(self.manager)
# /process/info
@cherrypy.expose
@cherrypy.tools.json_out()
def info(self):
"""Return detailed CPU and memory info about the system and
all processes"""
return self.manager.get_info()
# /process/remove # /process/remove
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_in() @cherrypy.tools.json_in()
@@ -115,35 +125,34 @@ class AppRun(object):
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_in() @cherrypy.tools.json_in()
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@exception_to_httperror(nilmrun.processmanager.ProcessError)
@cherrypy.tools.CORS_allow(methods = ["POST"]) @cherrypy.tools.CORS_allow(methods = ["POST"])
def command(self, argv): def command(self, argv):
"""Execute an arbitrary program on the server. argv is a """Execute an arbitrary program on the server. argv is a
list of the program and its arguments: 'argv[0]' is the program list of the program and its arguments: 'argv[0]' is the program
and 'argv[1:]' are arguments""" and 'argv[1:]' are arguments"""
if not isinstance(argv, list):
raise cherrypy.HTTPError("400 Bad Request",
"argv must be a list of strings")
return self.manager.run_command("command", argv) return self.manager.run_command("command", argv)
# /run/code # /run/code
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_in() @cherrypy.tools.json_in()
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@exception_to_httperror(nilmrun.processmanager.ProcessError)
@cherrypy.tools.CORS_allow(methods = ["POST"]) @cherrypy.tools.CORS_allow(methods = ["POST"])
def code(self, code, args): def code(self, code, args):
"""Execute arbitrary Python code. 'code' is a formatted string. """Execute arbitrary Python code. 'code' is a formatted string.
It will be run as if it were written into a Python file and It will be run as if it were written into a Python file and
executed, with the arguments in 'args' passed on the command line executed. 'args' is a list of strings, and they are passed
(i.e., they end up in sys.argv[1:])""" on the command line as additional arguments (i.e., they end up
in sys.argv[1:])"""
if not isinstance(args, list):
raise cherrypy.HTTPError("400 Bad Request",
"args must be a list of strings")
return self.manager.run_code("usercode", code, args) return self.manager.run_code("usercode", code, args)
# /run/testfilter
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@exception_to_httperror(KeyError, ValueError)
@cherrypy.tools.CORS_allow(methods = ["POST"])
def testfilter(self, data):
return self.manager.run_function(
"dummy", nilmrun.testfilter.test, data)
class Server(object): class Server(object):
def __init__(self, host = '127.0.0.1', port = 8080, def __init__(self, host = '127.0.0.1', port = 8080,
embedded = True, # hide diagnostics and output, etc embedded = True, # hide diagnostics and output, etc

View File

@@ -61,8 +61,12 @@ setup(name='nilmrun',
long_description = "NILM Database Filter Runner", long_description = "NILM Database Filter Runner",
license = "Proprietary", license = "Proprietary",
author_email = 'jim@jtan.com', author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.8.0', install_requires = [ 'nilmdb >= 1.8.2',
'nilmtools >= 1.2.2', 'nilmtools >= 1.2.2',
'psutil >= 0.3.0',
'cherrypy >= 3.2',
'decorator',
'simplejson',
'numpy', 'numpy',
'scipy', 'scipy',
], ],

View File

@@ -30,6 +30,7 @@ import textwrap
from testutil.helpers import * from testutil.helpers import *
testurl = "http://localhost:32181/" testurl = "http://localhost:32181/"
#testurl = "http://bucket.mit.edu/nilmrun/"
def setup_module(): def setup_module():
global test_server global test_server
@@ -49,10 +50,10 @@ class TestClient(object):
def wait_kill(self, client, pid, timeout = 1): def wait_kill(self, client, pid, timeout = 1):
time.sleep(timeout) time.sleep(timeout)
status = client.get("/process/status", { "pid": pid }) status = client.get("process/status", { "pid": pid })
if not status["alive"]: if not status["alive"]:
raise AssertionError("died before we could kill it") raise AssertionError("died before we could kill it")
status = client.post("/process/remove", { "pid": pid }) status = client.post("process/remove", { "pid": pid })
if status["alive"]: if status["alive"]:
raise AssertionError("didn't get killed") raise AssertionError("didn't get killed")
return status return status
@@ -61,49 +62,89 @@ class TestClient(object):
start = time.time() start = time.time()
status = None status = None
while (time.time() - start) < timeout: while (time.time() - start) < timeout:
status = client.get("/process/status", { "pid": pid }) status = client.get("process/status", { "pid": pid })
if status["alive"] == False: if status["alive"] == False:
break break
else: else:
raise AssertionError("process " + str(pid) + " didn't die in " + raise AssertionError("process " + str(pid) + " didn't die in " +
str(timeout) + " seconds: " + repr(status)) str(timeout) + " seconds: " + repr(status))
if remove: if remove:
status = client.post("/process/remove", { "pid": pid }) status = client.post("process/remove", { "pid": pid })
return status return status
def test_client_01_basic(self): def test_client_01_basic(self):
client = HTTPClient(baseurl = testurl) client = HTTPClient(baseurl = testurl)
version = client.get("/version") version = client.get("version")
eq_(distutils.version.LooseVersion(version), eq_(distutils.version.LooseVersion(version),
distutils.version.LooseVersion(nilmrun.__version__)) distutils.version.LooseVersion(nilmrun.__version__))
in_("This is NilmRun", client.get("/")) in_("This is NilmRun", client.get(""))
with assert_raises(ClientError): with assert_raises(ClientError):
client.get("/favicon.ico") client.get("favicon.ico")
def test_client_02_manager(self): def test_client_02_manager(self):
client = HTTPClient(baseurl = testurl) client = HTTPClient(baseurl = testurl)
eq_(client.get("/process/list"), []) eq_(client.get("process/list"), [])
with assert_raises(ClientError) as e: with assert_raises(ClientError) as e:
client.get("/process/status", { "pid": 12345 }) client.get("process/status", { "pid": 12345 })
in_("No such PID", str(e.exception)) in_("No such PID", str(e.exception))
with assert_raises(ClientError): with assert_raises(ClientError):
client.get("/process/remove", { "pid": 12345 }) client.get("process/remove", { "pid": 12345 })
in_("No such PID", str(e.exception)) in_("No such PID", str(e.exception))
def test_client_03_process_basic(self): def test_client_03_run_command(self):
client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("process/list"), [])
def do(argv, kill):
pid = client.post("run/command", { "argv": argv } )
eq_(client.get("process/list"), [pid])
if kill:
return self.wait_kill(client, pid)
return self.wait_end(client, pid)
# Simple command
status = do(["pwd"], False)
eq_(status["exitcode"], 0)
eq_("/tmp\n", status["log"])
# Command with args
status = do(["expr", "1", "+", "2"], False)
eq_(status["exitcode"], 0)
eq_("3\n", status["log"])
# Missing command
with assert_raises(ClientError) as e:
do(["/no-such-command-blah-blah"], False)
in_("No such file or directory", str(e.exception))
# Kill a slow command
status = do(["sleep", "60"], True)
ne_(status["exitcode"], 0)
def _run_testfilter(self, client, args):
code = textwrap.dedent("""
import nilmrun.testfilter
import simplejson as json
import sys
nilmrun.testfilter.test(json.loads(sys.argv[1]))
""")
jsonargs = json.dumps(args)
return client.post("run/code", { "code": code, "args": [ jsonargs ] })
def test_client_04_process_basic(self):
client = HTTPClient(baseurl = testurl, post_json = True) client = HTTPClient(baseurl = testurl, post_json = True)
# start dummy filter # start dummy filter
pid = client.post("/run/testfilter", { "data": 30 }) pid = self._run_testfilter(client, 30)
eq_(client.get("/process/list"), [pid]) eq_(client.get("process/list"), [pid])
time.sleep(1) time.sleep(1)
# Verify that status looks OK # Verify that status looks OK
status = client.get("/process/status", { "pid": pid, "clear": True }) status = client.get("process/status", { "pid": pid, "clear": True })
for x in [ "pid", "alive", "exitcode", "name", "start_time", "log" ]: for x in [ "pid", "alive", "exitcode", "name", "start_time", "log" ]:
in_(x, status) in_(x, status)
in_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"]) in_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
@@ -111,7 +152,7 @@ class TestClient(object):
eq_(status["exitcode"], None) eq_(status["exitcode"], None)
# Check that the log got cleared # Check that the log got cleared
status = client.get("/process/status", { "pid": pid }) status = client.get("process/status", { "pid": pid })
nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"]) nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
# See that it ended properly # See that it ended properly
@@ -120,31 +161,31 @@ class TestClient(object):
eq_(status["exitcode"], 0) eq_(status["exitcode"], 0)
# Remove it # Remove it
killstatus = client.post("/process/remove", { "pid": pid }) killstatus = client.post("process/remove", { "pid": pid })
eq_(status, killstatus) eq_(status, killstatus)
eq_(client.get("/process/list"), []) eq_(client.get("process/list"), [])
with assert_raises(ClientError) as e: with assert_raises(ClientError) as e:
client.post("/process/remove", { "pid": pid }) client.post("process/remove", { "pid": pid })
in_("No such PID", str(e.exception)) in_("No such PID", str(e.exception))
def test_client_04_process_terminate(self): def test_client_05_process_terminate(self):
client = HTTPClient(baseurl = testurl, post_json = True) client = HTTPClient(baseurl = testurl, post_json = True)
# Trigger exception in filter # Trigger exception in filter
pid = client.post("/run/testfilter", { "data": -1 }) pid = self._run_testfilter(client, -1)
time.sleep(0.5) time.sleep(0.5)
status = client.get("/process/status", { "pid": pid }) status = client.get("process/status", { "pid": pid })
eq_(status["alive"], False) eq_(status["alive"], False)
eq_(status["exitcode"], 1) eq_(status["exitcode"], 1)
in_("Exception: test exception", status["log"]) in_("Exception: test exception", status["log"])
client.post("/process/remove", { "pid": pid }) client.post("process/remove", { "pid": pid })
# Kill a running filter by removing it early # Kill a running filter by removing it early
newpid = client.post("/run/testfilter", { "data": 50 }) newpid = self._run_testfilter(client, 50)
ne_(newpid, pid) ne_(newpid, pid)
time.sleep(0.5) time.sleep(0.5)
start = time.time() start = time.time()
status = client.post("/process/remove", { "pid": newpid }) status = client.post("process/remove", { "pid": newpid })
elapsed = time.time() - start elapsed = time.time() - start
# Should have died in slightly over 1 second # Should have died in slightly over 1 second
assert(0.5 < elapsed < 2) assert(0.5 < elapsed < 2)
@@ -152,12 +193,12 @@ class TestClient(object):
ne_(status["exitcode"], 0) ne_(status["exitcode"], 0)
# No more # No more
eq_(client.get("/process/list"), []) eq_(client.get("process/list"), [])
# Try to remove a running filter that ignored SIGTERM # Try to remove a running filter that ignored SIGTERM
pid = client.post("/run/testfilter", { "data": 0 }) pid = self._run_testfilter(client, 0)
start = time.time() start = time.time()
status = client.post("/process/remove", { "pid": pid }) status = client.post("process/remove", { "pid": pid })
elapsed = time.time() - start elapsed = time.time() - start
# Should have died in slightly over 2 seconds # Should have died in slightly over 2 seconds
assert(1.5 < elapsed < 3) assert(1.5 < elapsed < 3)
@@ -165,7 +206,7 @@ class TestClient(object):
ne_(status["exitcode"], 0) ne_(status["exitcode"], 0)
@unittest.skip("needs a running nilmdb; trainola moved to nilmtools") @unittest.skip("needs a running nilmdb; trainola moved to nilmtools")
def test_client_05_trainola(self): def test_client_06_trainola(self):
client = HTTPClient(baseurl = testurl, post_json = True) client = HTTPClient(baseurl = testurl, post_json = True)
data = { "url": "http://bucket.mit.edu/nilmdb", data = { "url": "http://bucket.mit.edu/nilmdb",
"dest_stream": "/sharon/prep-a-matches", "dest_stream": "/sharon/prep-a-matches",
@@ -198,60 +239,31 @@ class TestClient(object):
} }
] ]
} }
pid = client.post("/run/code", { "code": "import nilmtools.trainola\n" + pid = client.post("run/code", { "code": "import nilmtools.trainola\n" +
"nilmtools.trainola.main()", "nilmtools.trainola.main()",
"args": [ data ] }) "args": [ data ] })
while True: while True:
status = client.get("/process/status", { "pid": pid, "clear": 1 }) status = client.get("process/status", { "pid": pid, "clear": 1 })
sys.stdout.write(status["log"]) sys.stdout.write(status["log"])
sys.stdout.flush() sys.stdout.flush()
if status["alive"] == False: if status["alive"] == False:
break break
status = client.post("/process/remove", { "pid": pid }) status = client.post("process/remove", { "pid": pid })
os._exit(int(status["exitcode"])) os._exit(int(status["exitcode"]))
def test_client_06_run_command(self):
client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("/process/list"), [])
def do(argv, kill):
pid = client.post("/run/command", { "argv": argv } )
eq_(client.get("/process/list"), [pid])
if kill:
return self.wait_kill(client, pid)
return self.wait_end(client, pid)
# Simple command
status = do(["pwd"], False)
eq_(status["exitcode"], 0)
eq_("/tmp\n", status["log"])
# Command with args
status = do(["expr", "1", "+", "2"], False)
eq_(status["exitcode"], 0)
eq_("3\n", status["log"])
# Missing command
status = do(["/no-such-command-blah-blah"], False)
ne_(status["exitcode"], 0)
# Kill a slow command
status = do(["sleep", "60"], True)
ne_(status["exitcode"], 0)
def test_client_07_run_code(self): def test_client_07_run_code(self):
client = HTTPClient(baseurl = testurl, post_json = True) client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("/process/list"), []) eq_(client.get("process/list"), [])
def do(code, args, kill): def do(code, args, kill):
pid = client.post("/run/code", { "code": code, "args": args } ) pid = client.post("run/code", { "code": code, "args": args } )
eq_(client.get("/process/list"), [pid]) eq_(client.get("process/list"), [pid])
if kill: if kill:
return self.wait_kill(client, pid) return self.wait_kill(client, pid)
return self.wait_end(client, pid) return self.wait_end(client, pid)
# basic code snippet # basic code snippet
code=textwrap.dedent(""" code = textwrap.dedent("""
print 'hello' print 'hello'
def foo(arg): def foo(arg):
print 'world' print 'world'
@@ -261,7 +273,7 @@ class TestClient(object):
eq_(status["exitcode"], 0) eq_(status["exitcode"], 0)
# compile error # compile error
code=textwrap.dedent(""" code = textwrap.dedent("""
def foo(arg: def foo(arg:
print 'hello' print 'hello'
""") """)
@@ -270,32 +282,36 @@ class TestClient(object):
eq_(status["exitcode"], 1) eq_(status["exitcode"], 1)
# traceback in user code should be formatted nicely # traceback in user code should be formatted nicely
code=textwrap.dedent(""" code = textwrap.dedent("""
def foo(arg): def foo(arg):
raise Exception(arg) raise Exception(arg)
foo(123) foo(123)
""") """)
status = do(code, [], False) status = do(code, [], False)
cleaned_log = re.sub('File "[^"]*",', 'File "",', status["log"])
eq_('Traceback (most recent call last):\n' + eq_('Traceback (most recent call last):\n' +
' File "<user-code>", line 4, in <module>\n' + ' File "", line 4, in <module>\n' +
' foo(123)\n' + ' foo(123)\n' +
' File "<user-code>", line 3, in foo\n' + ' File "", line 3, in foo\n' +
' raise Exception(arg)\n' + ' raise Exception(arg)\n' +
'Exception: 123\n', status["log"]) 'Exception: 123\n', cleaned_log)
eq_(status["exitcode"], 1) eq_(status["exitcode"], 1)
# argument handling (strings come in as unicode) # argument handling (strings come in as unicode)
code=textwrap.dedent(""" code = textwrap.dedent("""
import sys import sys
print sys.argv[1].encode('ascii'), sys.argv[2] print sys.argv[1].encode('ascii'), sys.argv[2]
sys.exit(0) # also test raising SystemExit sys.exit(0) # also test raising SystemExit
""") """)
status = do(code, ["hello", 123], False) with assert_raises(ClientError) as e:
do(code, ["hello", 123], False)
in_("400 Bad Request", str(e.exception))
status = do(code, ["hello", "123"], False)
eq_(status["log"], "hello 123\n") eq_(status["log"], "hello 123\n")
eq_(status["exitcode"], 0) eq_(status["exitcode"], 0)
# try killing a long-running process # try killing a long-running process
code=textwrap.dedent(""" code = textwrap.dedent("""
import time import time
print 'hello' print 'hello'
time.sleep(60) time.sleep(60)
@@ -304,3 +320,41 @@ class TestClient(object):
status = do(code, [], True) status = do(code, [], True)
eq_(status["log"], "hello\n") eq_(status["log"], "hello\n")
ne_(status["exitcode"], 0) ne_(status["exitcode"], 0)
def test_client_08_bad_types(self):
client = HTTPClient(baseurl = testurl, post_json = True)
with assert_raises(ClientError) as e:
client.post("run/code", { "code": "asdf", "args": "qwer" })
in_("must be a list", str(e.exception))
with assert_raises(ClientError) as e:
client.post("run/command", { "argv": "asdf" })
in_("must be a list", str(e.exception))
def test_client_00_info(self):
client = HTTPClient(baseurl = testurl, post_json = True)
# start some processes
a = client.post("run/command", { "argv": ["sleep","60"] } )
b = client.post("run/command", { "argv": ["sh","-c","sleep 2;true"] } )
c = client.post("run/command", { "argv": ["sh","-c","burnP5;true"] } )
d = client.post("run/command", { "argv": ["burnP5" ] } )
info = client.get("process/info")
eq_(info["pids"][a]["procs"], 1)
eq_(info["pids"][b]["procs"], 2)
eq_(info["pids"][c]["procs"], 2)
eq_(info["pids"][d]["procs"], 1)
eq_(info["total"]["procs"], 6)
lt_(info["pids"][a]["cpu_percent"], 50)
lt_(20, info["pids"][c]["cpu_percent"])
lt_(80, info["system"]["cpu_percent"])
time.sleep(2)
info = client.get("process/info")
eq_(info["pids"][b]["procs"], 0)
# kill all processes
for pid in client.get("process/list"):
client.post("process/remove", { "pid": pid })