5 Commits

5 changed files with 105 additions and 24 deletions

View File

@@ -7,7 +7,7 @@ Prerequisites:
sudo apt-get install python2.7 python-setuptools sudo apt-get install python2.7 python-setuptools
# Plus nilmdb and its dependencies # Plus nilmdb and its dependencies
nilmdb (1.8.2+) nilmdb (1.8.3+)
Install: Install:

View File

@@ -92,6 +92,12 @@ class Process(object):
except OSError: # pragma: no cover except OSError: # pragma: no cover
return None return None
def kill(pid, sig):
try:
return os.kill(pid, sig)
except OSError: # pragma: no cover
return
# Find all children # Find all children
group = getpgid(self._process.pid) group = getpgid(self._process.pid)
main = psutil.Process(self._process.pid) main = psutil.Process(self._process.pid)
@@ -100,7 +106,7 @@ class Process(object):
# Kill with SIGTERM, if they're still in this process group # Kill with SIGTERM, if they're still in this process group
for proc in allproc: for proc in allproc:
if getpgid(proc.pid) == group: if getpgid(proc.pid) == group:
os.kill(proc.pid, signal.SIGTERM) kill(proc.pid, signal.SIGTERM)
# Wait for it to die again # Wait for it to die again
if self._join(timeout): if self._join(timeout):
@@ -109,7 +115,7 @@ class Process(object):
# One more try with SIGKILL # One more try with SIGKILL
for proc in allproc: for proc in allproc:
if getpgid(proc.pid) == group: if getpgid(proc.pid) == group:
os.kill(proc.pid, signal.SIGKILL) kill(proc.pid, signal.SIGKILL)
# See if it worked # See if it worked
return self._join(timeout) return self._join(timeout)
@@ -178,9 +184,27 @@ class ProcessManager(object):
"""Track and manage a collection of Process objects""" """Track and manage a collection of Process objects"""
def __init__(self): def __init__(self):
self.processes = {} self.processes = {}
self.tmpfiles = {} self.tmpdirs = {}
self.tmpdir = tempfile.mkdtemp(prefix = "nilmrun-usercode-") atexit.register(self._atexit)
atexit.register(shutil.rmtree, self.tmpdir)
def _cleanup_tmpdir(self, pid):
if pid in self.tmpdirs:
try:
shutil.rmtree(self.tmpdirs[pid])
except OSError: # pragma: no cover
pass
del self.tmpdirs[pid]
def _atexit(self):
# Kill remaining processes, remove their dirs
for pid in self.processes.keys():
try:
self.processes[pid].terminate()
del self.processes[pid]
shutil.rmtree(self.tmpdirs[pid])
del self.tmpdirs[pid]
except Exception: # pragma: no cover
pass
def __iter__(self): def __iter__(self):
return iter(self.processes.keys()) return iter(self.processes.keys())
@@ -193,15 +217,33 @@ class ProcessManager(object):
executed. The arguments, which must be strings, will be executed. The arguments, which must be strings, will be
accessible in the code as sys.argv[1:].""" accessible in the code as sys.argv[1:]."""
# The easiest way to do this, by far, is to just write the # The easiest way to do this, by far, is to just write the
# code to a file. # code to a file. Make a directory to put it in.
(fd, path) = tempfile.mkstemp(prefix = "nilmrun-usercode-", tmpdir = tempfile.mkdtemp(prefix = "nilmrun-usercode-")
suffix = ".py", dir=self.tmpdir) try:
with os.fdopen(fd, 'w') as f: # Write the code
f.write(code) codepath = os.path.join(tmpdir, "usercode.py")
argv = [ sys.executable, "-B", "-s", "-u", path ] + args with open(codepath, "w") as f:
pid = self.run_command(argv) f.write(code)
self.tmpfiles[pid] = path # Save the args too, for debugging purposes
return pid with open(os.path.join(tmpdir, "args.txt"), "w") as f:
f.write(repr(args))
# Run the code
argv = [ sys.executable, "-B", "-s", "-u", codepath ] + args
pid = self.run_command(argv)
# Save the temp dir
self.tmpdirs[pid] = tmpdir
tmpdir = None # Don't need to remove it anymore
return pid
finally:
# Clean up tempdir if we didn't finish
if tmpdir is not None:
try:
shutil.rmtree(tmpdir)
except OSError: # pragma: no cover
pass
def run_command(self, argv): def run_command(self, argv):
"""Execute a command line program""" """Execute a command line program"""
@@ -213,12 +255,7 @@ class ProcessManager(object):
return self.processes[pid].terminate() return self.processes[pid].terminate()
def remove(self, pid): def remove(self, pid):
if pid in self.tmpfiles: self._cleanup_tmpdir(pid)
try:
os.unlink(self.tmpfiles[pid])
except OSError: # pragma: no cover
pass
del self.tmpfiles[pid]
del self.processes[pid] del self.processes[pid]
def get_info(self): def get_info(self):

View File

@@ -22,6 +22,7 @@ from nilmdb.server.serverutil import (
cherrypy_stop, cherrypy_stop,
bool_param, bool_param,
) )
from nilmdb.utils import serializer_proxy
import nilmrun import nilmrun
import nilmrun.testfilter import nilmrun.testfilter
@@ -60,12 +61,16 @@ class AppProcess(object):
self.manager = manager self.manager = manager
def process_status(self, pid): def process_status(self, pid):
# We need to convert the log (which is bytes) to Unicode
# characters, in order to send it via JSON. Treat it as UTF-8
# but replace invalid characters with markers.
log = self.manager[pid].log.decode('utf-8', errors='replace')
return { return {
"pid": pid, "pid": pid,
"alive": self.manager[pid].alive, "alive": self.manager[pid].alive,
"exitcode": self.manager[pid].exitcode, "exitcode": self.manager[pid].exitcode,
"start_time": self.manager[pid].start_time, "start_time": self.manager[pid].start_time,
"log": self.manager[pid].log, "log": log
} }
# /process/status # /process/status
@@ -200,8 +205,12 @@ class Server(object):
# error messages. # error messages.
cherrypy._cperror._ie_friendly_error_sizes = {} cherrypy._cperror._ie_friendly_error_sizes = {}
# The manager maintains internal state and isn't necessarily
# thread-safe, so wrap it in the serializer.
manager = serializer_proxy(nilmrun.processmanager.ProcessManager)()
# Build up the application and mount it # Build up the application and mount it
manager = nilmrun.processmanager.ProcessManager() self._manager = manager
root = App() root = App()
root.process = AppProcess(manager) root.process = AppProcess(manager)
root.run = AppRun(manager) root.run = AppRun(manager)

View File

@@ -61,7 +61,7 @@ setup(name='nilmrun',
long_description = "NILM Database Filter Runner", long_description = "NILM Database Filter Runner",
license = "Proprietary", license = "Proprietary",
author_email = 'jim@jtan.com', author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.8.2', install_requires = [ 'nilmdb >= 1.8.3',
'psutil >= 0.3.0', 'psutil >= 0.3.0',
'cherrypy >= 3.2', 'cherrypy >= 3.2',
'simplejson', 'simplejson',

View File

@@ -372,3 +372,38 @@ class TestClient(object):
# kill all processes # kill all processes
for pid in client.get("process/list"): for pid in client.get("process/list"):
client.post("process/remove", { "pid": pid }) client.post("process/remove", { "pid": pid })
def test_client_10_unicode(self):
client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("process/list"), [])
def verify(cmd, result):
pid = client.post("run/command", { "argv": [ "sh", "-c", cmd ] })
eq_(client.get("process/list"), [pid])
status = self.wait_end(client, pid)
eq_(result, status["log"])
# Unicode should work
verify("echo -n hello", "hello")
verify(u"echo -n ☠", u"")
verify("echo -ne \\\\xe2\\\\x98\\\\xa0", u"")
# Programs that spit out invalid UTF-8 should get replacement
# markers
verify("echo -ne \\\\xae", u"\ufffd")
def test_client_11_atexit(self):
# Leave a directory and running process behind, for the atexit
# handler to clean up. Here we trigger the atexit manually,
# since it's hard to trigger it as part of the test suite.
client = HTTPClient(baseurl = testurl, post_json = True)
code = textwrap.dedent("""
import time
time.sleep(10)
""")
client.post("run/code", { "code": code, "args": [ "hello"] })
# Trigger atexit function
test_server._manager._atexit()
# Ensure no processes exit
eq_(client.get("process/list"), [])