15 Commits

9 changed files with 312 additions and 51 deletions

View File

@@ -6,11 +6,8 @@ Prerequisites:
# Runtime and build environments
sudo apt-get install python2.7 python-setuptools
# Base dependencies
sudo apt-get install python-numpy python-scipy
# Plus nilmdb and its dependencies
nilmdb (1.8.2+)
nilmdb (1.9.5+)
Install:

View File

@@ -43,9 +43,8 @@ class LogReceiver(object):
class Process(object):
"""Spawn and manage a subprocess, and capture its output."""
def __init__(self, name, argv, tempfile = None):
def __init__(self, argv, tempfile = None):
self.start_time = None
self.name = name
# Use a pipe for communicating log data
(rpipe, wpipe) = os.pipe()
@@ -93,6 +92,12 @@ class Process(object):
except OSError: # pragma: no cover
return None
def kill(pid, sig):
try:
return os.kill(pid, sig)
except OSError: # pragma: no cover
return
# Find all children
group = getpgid(self._process.pid)
main = psutil.Process(self._process.pid)
@@ -101,7 +106,7 @@ class Process(object):
# Kill with SIGTERM, if they're still in this process group
for proc in allproc:
if getpgid(proc.pid) == group:
os.kill(proc.pid, signal.SIGTERM)
kill(proc.pid, signal.SIGTERM)
# Wait for it to die again
if self._join(timeout):
@@ -110,7 +115,7 @@ class Process(object):
# One more try with SIGKILL
for proc in allproc:
if getpgid(proc.pid) == group:
os.kill(proc.pid, signal.SIGKILL)
kill(proc.pid, signal.SIGKILL)
# See if it worked
return self._join(timeout)
@@ -179,9 +184,27 @@ class ProcessManager(object):
"""Track and manage a collection of Process objects"""
def __init__(self):
self.processes = {}
self.tmpfiles = {}
self.tmpdir = tempfile.mkdtemp(prefix = "nilmrun-usercode-")
atexit.register(shutil.rmtree, self.tmpdir)
self.tmpdirs = {}
atexit.register(self._atexit)
def _cleanup_tmpdir(self, pid):
if pid in self.tmpdirs:
try:
shutil.rmtree(self.tmpdirs[pid])
except OSError: # pragma: no cover
pass
del self.tmpdirs[pid]
def _atexit(self):
# Kill remaining processes, remove their dirs
for pid in self.processes.keys():
try:
self.processes[pid].terminate()
del self.processes[pid]
shutil.rmtree(self.tmpdirs[pid])
del self.tmpdirs[pid]
except Exception: # pragma: no cover
pass
def __iter__(self):
return iter(self.processes.keys())
@@ -189,24 +212,42 @@ class ProcessManager(object):
def __getitem__(self, key):
return self.processes[key]
def run_code(self, procname, code, args):
def run_code(self, code, args):
"""Evaluate 'code' as if it were placed into a Python file and
executed. The arguments, which must be strings, will be
accessible in the code as sys.argv[1:]."""
# The easiest way to do this, by far, is to just write the
# code to a file.
(fd, path) = tempfile.mkstemp(prefix = "nilmrun-usercode-",
suffix = ".py", dir=self.tmpdir)
with os.fdopen(fd, 'w') as f:
# code to a file. Make a directory to put it in.
tmpdir = tempfile.mkdtemp(prefix = "nilmrun-usercode-")
try:
# Write the code
codepath = os.path.join(tmpdir, "usercode.py")
with open(codepath, "w") as f:
f.write(code)
argv = [ sys.executable, "-B", "-s", "-u", path ] + args
pid = self.run_command(procname, argv)
self.tmpfiles[pid] = path
return pid
# Save the args too, for debugging purposes
with open(os.path.join(tmpdir, "args.txt"), "w") as f:
f.write(repr(args))
def run_command(self, procname, argv):
# Run the code
argv = [ sys.executable, "-B", "-s", "-u", codepath ] + args
pid = self.run_command(argv)
# Save the temp dir
self.tmpdirs[pid] = tmpdir
tmpdir = None # Don't need to remove it anymore
return pid
finally:
# Clean up tempdir if we didn't finish
if tmpdir is not None:
try:
shutil.rmtree(tmpdir)
except OSError: # pragma: no cover
pass
def run_command(self, argv):
"""Execute a command line program"""
new = Process(procname, argv)
new = Process(argv)
self.processes[new.pid] = new
return new.pid
@@ -214,12 +255,7 @@ class ProcessManager(object):
return self.processes[pid].terminate()
def remove(self, pid):
if pid in self.tmpfiles:
try:
os.unlink(self.tmpfiles[pid])
except OSError: # pragma: no cover
pass
del self.tmpfiles[pid]
self._cleanup_tmpdir(pid)
del self.processes[pid]
def get_info(self):
@@ -239,6 +275,7 @@ class ProcessManager(object):
# Retrieve info for system
info["system"]["cpu_percent"] = sum(psutil.cpu_percent(0, percpu=True))
info["system"]["cpu_max"] = 100.0 * psutil.NUM_CPUS
info["system"]["procs"] = len(psutil.get_pid_list())
# psutil > 0.6.0's psutil.virtual_memory() would be better here,
# but this should give the same info.

View File

@@ -5,10 +5,7 @@ import sys
import os
import socket
import simplejson as json
import decorator
import psutil
import traceback
import argparse
import time
import nilmdb
@@ -25,6 +22,7 @@ from nilmdb.server.serverutil import (
cherrypy_stop,
bool_param,
)
from nilmdb.utils import serializer_proxy
import nilmrun
import nilmrun.testfilter
@@ -63,13 +61,16 @@ class AppProcess(object):
self.manager = manager
def process_status(self, pid):
# We need to convert the log (which is bytes) to Unicode
# characters, in order to send it via JSON. Treat it as UTF-8
# but replace invalid characters with markers.
log = self.manager[pid].log.decode('utf-8', errors='replace')
return {
"pid": pid,
"alive": self.manager[pid].alive,
"exitcode": self.manager[pid].exitcode,
"name": self.manager[pid].name,
"start_time": self.manager[pid].start_time,
"log": self.manager[pid].log,
"log": log
}
# /process/status
@@ -134,7 +135,7 @@ class AppRun(object):
if not isinstance(argv, list):
raise cherrypy.HTTPError("400 Bad Request",
"argv must be a list of strings")
return self.manager.run_command("command", argv)
return self.manager.run_command(argv)
# /run/code
@cherrypy.expose
@@ -142,16 +143,18 @@ class AppRun(object):
@cherrypy.tools.json_out()
@exception_to_httperror(nilmrun.processmanager.ProcessError)
@cherrypy.tools.CORS_allow(methods = ["POST"])
def code(self, code, args):
def code(self, code, args = None):
"""Execute arbitrary Python code. 'code' is a formatted string.
It will be run as if it were written into a Python file and
executed. 'args' is a list of strings, and they are passed
on the command line as additional arguments (i.e., they end up
in sys.argv[1:])"""
if args is None:
args = []
if not isinstance(args, list):
raise cherrypy.HTTPError("400 Bad Request",
"args must be a list of strings")
return self.manager.run_code("usercode", code, args)
return self.manager.run_code(code, args)
class Server(object):
def __init__(self, host = '127.0.0.1', port = 8080,
@@ -202,8 +205,12 @@ class Server(object):
# error messages.
cherrypy._cperror._ie_friendly_error_sizes = {}
# The manager maintains internal state and isn't necessarily
# thread-safe, so wrap it in the serializer.
manager = serializer_proxy(nilmrun.processmanager.ProcessManager)()
# Build up the application and mount it
manager = nilmrun.processmanager.ProcessManager()
self._manager = manager
root = App()
root.process = AppProcess(manager)
root.run = AppRun(manager)

50
scripts/kill.py Executable file
View File

@@ -0,0 +1,50 @@
#!/usr/bin/python
from nilmdb.client.httpclient import HTTPClient, ClientError, ServerError
from nilmdb.utils.printf import *
import nilmrun
import argparse
import os
import sys
def main():
"""Kill/remove a process from the NilmRun server"""
def_url = os.environ.get("NILMRUN_URL", "http://localhost/nilmrun/")
parser = argparse.ArgumentParser(
description = 'Kill/remove a process from the NilmRun server',
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
version = nilmrun.__version__)
group = parser.add_argument_group("Standard options")
group.add_argument('-u', '--url',
help = 'NilmRun server URL', default = def_url)
group.add_argument('-n', '--noverify', action="store_true",
help = 'Disable SSL certificate verification')
group = parser.add_argument_group("Program")
group.add_argument('-q', '--quiet', action="store_true",
help = "Don't print out the final log contents")
group.add_argument('pid', nargs='+', help="PIDs to kill")
args = parser.parse_args()
client = HTTPClient(baseurl = args.url, verify_ssl = not args.noverify)
# Kill or remove process
all_failed = True
for pid in args.pid:
try:
s = client.post("process/remove", { "pid": pid })
if not args.quiet:
sys.stdout.write(s['log'])
all_failed = False
except ClientError as e:
if "404" in e.status:
fprintf(sys.stderr, "no such pid: %s\n", pid)
else:
raise
# Return error if we failed to remove any of them
if all_failed:
raise SystemExit(1)
if __name__ == "__main__":
main()

View File

@@ -10,9 +10,7 @@ def main():
parser = argparse.ArgumentParser(
description = 'Run the NilmRun server',
formatter_class = argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("-V", "--version", action="version",
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
version = nilmrun.__version__)
group = parser.add_argument_group("Standard options")

66
scripts/ps.py Executable file
View File

@@ -0,0 +1,66 @@
#!/usr/bin/python
from nilmdb.client.httpclient import HTTPClient, ClientError, ServerError
from nilmdb.utils.printf import *
from nilmdb.utils import datetime_tz
import nilmrun
import argparse
import os
def main():
"""List NilmRun processes"""
def_url = os.environ.get("NILMRUN_URL", "http://localhost/nilmrun/")
parser = argparse.ArgumentParser(
description = 'List NilmRun processes',
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
version = nilmrun.__version__)
group = parser.add_argument_group("Standard options")
group.add_argument('-u', '--url',
help = 'NilmRun server URL', default = def_url)
group.add_argument('-n', '--noverify', action="store_true",
help = 'Disable SSL certificate verification')
args = parser.parse_args()
client = HTTPClient(baseurl = args.url, verify_ssl = not args.noverify)
# Print overall system info
info = client.get("process/info")
total = info['total']
system = info['system']
printf(" procs: %d nilm, %d other\n", info['total']['procs'],
info['system']['procs'] - info['total']['procs'])
printf(" cpu: %d%% nilm, %d%% other, %d%% max\n",
round(info['total']['cpu_percent']),
round(info['system']['cpu_percent'] - info['total']['cpu_percent']),
round(info['system']['cpu_max']))
printf(" mem: %d MiB used, %d MiB total, %d%%\n",
round(info['system']['mem_used'] / 1048576.0),
round(info['system']['mem_total'] / 1048576.0),
round(info['system']['mem_used'] * 100.0
/ info['system']['mem_total']))
# Print process detail for each managed process
fmt = "%-36s %-6s %-15s %-4s %-3s %-5s\n"
printf(fmt, "PID", "STATE", "SINCE", "PROC", "CPU", "LOG")
if len(info['pids']) == 0:
printf("No running processes\n")
raise SystemExit(0)
for pid in sorted(info['pids'].keys()):
pidinfo = client.get("process/status", { "pid": pid })
if pidinfo['alive']:
status = "alive"
else:
if pidinfo['exitcode']:
status = "error"
else:
status = "done"
dt = datetime_tz.datetime_tz.fromtimestamp(pidinfo['start_time'])
since = dt.strftime("%m/%d-%H:%M:%S")
printf(fmt, pid, status, since, info['pids'][pid]['procs'],
str(int(round(info['pids'][pid]['cpu_percent']))),
len(pidinfo['log']))
if __name__ == "__main__":
main()

59
scripts/run.py Executable file
View File

@@ -0,0 +1,59 @@
#!/usr/bin/python
from nilmdb.client.httpclient import HTTPClient, ClientError, ServerError
from nilmdb.utils.printf import *
import nilmrun
import argparse
import os
import time
import sys
def main():
"""Run a command on the NilmRun server"""
def_url = os.environ.get("NILMRUN_URL", "http://localhost/nilmrun/")
parser = argparse.ArgumentParser(
description = 'Run a command on the NilmRun server',
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
version = nilmrun.__version__)
group = parser.add_argument_group("Standard options")
group.add_argument('-u', '--url',
help = 'NilmRun server URL', default = def_url)
group.add_argument('-n', '--noverify', action="store_true",
help = 'Disable SSL certificate verification')
group = parser.add_argument_group("Program")
group.add_argument('-d', '--detach', action="store_true",
help = 'Run process and return immediately without '
'printing its output')
group.add_argument('cmd', help="Command to run")
group.add_argument('arg', nargs=argparse.REMAINDER,
help="Arguments for command")
args = parser.parse_args()
client = HTTPClient(baseurl = args.url, verify_ssl = not args.noverify)
# Run command
pid = client.post("run/command", { "argv": [ args.cmd ] + args.arg })
# If we're detaching, just print the PID
if args.detach:
print pid
raise SystemExit(0)
# Otherwise, watch the log output, and kill the process when it's done
# or when this script terminates.
try:
while True:
s = client.get("process/status", { "pid": pid, "clear": 1 })
sys.stdout.write(s['log'])
sys.stdout.flush()
if not s['alive']:
break
time.sleep(1)
finally:
s = client.post("process/remove", { "pid": pid })
raise SystemExit(s['exitcode'])
if __name__ == "__main__":
main()

View File

@@ -61,14 +61,10 @@ setup(name='nilmrun',
long_description = "NILM Database Filter Runner",
license = "Proprietary",
author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.8.2',
'nilmtools >= 1.2.2',
install_requires = [ 'nilmdb >= 1.9.5',
'psutil >= 0.3.0',
'cherrypy >= 3.2',
'decorator',
'simplejson',
'numpy',
'scipy',
],
packages = [ 'nilmrun',
'nilmrun.scripts',
@@ -79,7 +75,9 @@ setup(name='nilmrun',
entry_points = {
'console_scripts': [
'nilmrun-server = nilmrun.scripts.nilmrun_server:main',
'nilm-trainola = nilmrun.trainola:main',
'nilmrun-ps = nilmrun.scripts.ps:main',
'nilmrun-run = nilmrun.scripts.run:main',
'nilmrun-kill = nilmrun.scripts.kill:main',
],
},
zip_safe = False,

View File

@@ -65,6 +65,7 @@ class TestClient(object):
status = client.get("process/status", { "pid": pid })
if status["alive"] == False:
break
time.sleep(0.1)
else:
raise AssertionError("process " + str(pid) + " didn't die in " +
str(timeout) + " seconds: " + repr(status))
@@ -145,7 +146,7 @@ class TestClient(object):
# Verify that status looks OK
status = client.get("process/status", { "pid": pid, "clear": True })
for x in [ "pid", "alive", "exitcode", "name", "start_time", "log" ]:
for x in [ "pid", "alive", "exitcode", "start_time", "log" ]:
in_(x, status)
in_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
eq_(status["alive"], True)
@@ -248,6 +249,7 @@ class TestClient(object):
sys.stdout.flush()
if status["alive"] == False:
break
time.sleep(0.1)
status = client.post("process/remove", { "pid": pid })
os._exit(int(status["exitcode"]))
@@ -256,7 +258,10 @@ class TestClient(object):
eq_(client.get("process/list"), [])
def do(code, args, kill):
if args is not None:
pid = client.post("run/code", { "code": code, "args": args } )
else:
pid = client.post("run/code", { "code": code } )
eq_(client.get("process/list"), [pid])
if kill:
return self.wait_kill(client, pid)
@@ -321,6 +326,15 @@ class TestClient(object):
eq_(status["log"], "hello\n")
ne_(status["exitcode"], 0)
# default arguments are empty
code = textwrap.dedent("""
import sys
print 'args:', len(sys.argv[1:])
""")
status = do(code, None, False)
eq_(status["log"], "args: 0\n")
eq_(status["exitcode"], 0)
def test_client_08_bad_types(self):
client = HTTPClient(baseurl = testurl, post_json = True)
@@ -332,7 +346,7 @@ class TestClient(object):
client.post("run/command", { "argv": "asdf" })
in_("must be a list", str(e.exception))
def test_client_00_info(self):
def test_client_09_info(self):
client = HTTPClient(baseurl = testurl, post_json = True)
# start some processes
@@ -358,3 +372,38 @@ class TestClient(object):
# kill all processes
for pid in client.get("process/list"):
client.post("process/remove", { "pid": pid })
def test_client_10_unicode(self):
client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("process/list"), [])
def verify(cmd, result):
pid = client.post("run/command", { "argv": [ "sh", "-c", cmd ] })
eq_(client.get("process/list"), [pid])
status = self.wait_end(client, pid)
eq_(result, status["log"])
# Unicode should work
verify("echo -n hello", "hello")
verify(u"echo -n ☠", u"")
verify("echo -ne \\\\xe2\\\\x98\\\\xa0", u"")
# Programs that spit out invalid UTF-8 should get replacement
# markers
verify("echo -ne \\\\xae", u"\ufffd")
def test_client_11_atexit(self):
# Leave a directory and running process behind, for the atexit
# handler to clean up. Here we trigger the atexit manually,
# since it's hard to trigger it as part of the test suite.
client = HTTPClient(baseurl = testurl, post_json = True)
code = textwrap.dedent("""
import time
time.sleep(10)
""")
client.post("run/code", { "code": code, "args": [ "hello"] })
# Trigger atexit function
test_server._manager._atexit()
# Ensure no processes exit
eq_(client.get("process/list"), [])