28 Commits

Author SHA1 Message Date
477c27a4e6 Clean up temp dirs and processes at exit 2013-07-30 20:15:01 -04:00
bed26e099b Put each code in its own dir; save args too 2013-07-30 20:14:48 -04:00
9224566f9b More robust process killing 2013-07-30 20:13:30 -04:00
a8ecad9329 Use NilmDB serializer for ProcessManager 2013-07-24 14:55:59 -04:00
5b878378f3 Translate UTF-8 in command output more robustly 2013-07-22 13:03:09 -04:00
5cd38f1ba9 Don't spin so fast in tests while waiting 2013-07-21 19:49:44 -04:00
d7551bde0b Make 'args' optional to /run/code 2013-07-21 19:49:30 -04:00
40fd377a38 Remove 'name' from spawned processes 2013-07-21 19:49:15 -04:00
6e7f3ac704 Remove nilm-trainola script 2013-07-18 12:28:32 -04:00
29adb47a33 Fix test order 2013-07-18 11:01:27 -04:00
7c605a469a Cleanup dependencies 2013-07-18 11:00:53 -04:00
f5225f88f9 Add max CPU percentage 2013-07-17 18:48:55 -04:00
32e59310ef Fix for dead processes 2013-07-17 18:19:52 -04:00
5a33ef48cc Add /process/info request 2013-07-17 18:12:44 -04:00
18a5cd6334 Improve boolean parameter parsing 2013-07-15 14:39:28 -04:00
7ec4d60d38 Fix WSGI docs 2013-07-11 16:36:18 -04:00
b2bdf784ac Make test URLs relative 2013-07-11 11:46:02 -04:00
e0709f0d17 Remove multiprocessing due to mod_wsgi incompatibility; use subprocess
Multiprocessing and Apache's mod_wsgi don't play nicely.  Switch to
manually managing processes via subprocess.Popen etc instead.  When
running arbitrary code, we write it to an external file, and running
functions directly is no longer supported.
2013-07-11 11:39:22 -04:00
18d3cff772 Update WSGI docs 2013-07-10 14:16:35 -04:00
a7b9656916 Remove parameters from status output 2013-07-10 11:35:17 -04:00
2e9ec63675 Don't catch SystemExit from a subprocess 2013-07-09 13:15:27 -04:00
6d295b840a Delete trainola; it will live in nilmtools repo 2013-07-08 11:57:45 -04:00
74a05d05d6 Clear out traceback object to avoid reference cycles 2013-07-08 11:44:19 -04:00
35b20c90a5 Rename and reorganize stuff 2013-07-08 11:33:27 -04:00
6d9ee7b405 Ability to run user provided code now 2013-07-07 20:18:52 -04:00
721d6c4936 Add run_code_string function, untested 2013-07-07 15:36:34 -04:00
973d328e1e Rename some URLs (/process/command -> /run/command, etc) 2013-07-07 14:12:48 -04:00
0338d40226 One fewer process when spawning commands 2013-07-07 14:09:47 -04:00
11 changed files with 677 additions and 657 deletions

View File

@@ -6,10 +6,8 @@ Prerequisites:
# Runtime and build environments # Runtime and build environments
sudo apt-get install python2.7 python-setuptools sudo apt-get install python2.7 python-setuptools
# Base dependencies # Plus nilmdb and its dependencies
sudo apt-get install python-numpy python-scipy nilmdb (1.8.3+)
nilmdb (1.8.0+)
Install: Install:

View File

@@ -21,13 +21,13 @@ arbitrary commands.
SSLEngine On SSLEngine On
WSGIScriptAlias /nilmrun /home/nilm/nilmrun.wsgi WSGIScriptAlias /nilmrun /home/nilm/nilmrun.wsgi
WSGIApplicationGroup nilmrun-appgroup
WSGIProcessGroup nilmrun-procgroup
WSGIDaemonProcess nilmrun-procgroup threads=32 user=nilm group=nilm WSGIDaemonProcess nilmrun-procgroup threads=32 user=nilm group=nilm
# Access control example:
<Location /nilmrun> <Location /nilmrun>
WSGIProcessGroup nilmrun-procgroup
WSGIApplicationGroup nilmrun-appgroup
SSLRequireSSL SSLRequireSSL
# Access control example:
Order deny,allow Order deny,allow
Deny from all Deny from all
Allow from 1.2.3.4 Allow from 1.2.3.4

View File

@@ -1 +0,0 @@
# Filters

View File

@@ -1,260 +0,0 @@
#!/usr/bin/python
from nilmdb.utils.printf import *
import nilmdb.client
import nilmtools.filter
from nilmdb.utils.time import (timestamp_to_human,
timestamp_to_seconds,
seconds_to_timestamp)
from nilmdb.utils.interval import Interval
import numpy as np
import scipy
import scipy.signal
from numpy.core.umath_tests import inner1d
import nilmrun
from collections import OrderedDict
class DataError(ValueError):
pass
class Data(object):
def __init__(self, name, url, stream, start, end, columns):
"""Initialize, get stream info, check columns"""
self.name = name
self.url = url
self.stream = stream
self.start = start
self.end = end
# Get stream info
self.client = nilmdb.client.numpyclient.NumpyClient(url)
self.info = nilmtools.filter.get_stream_info(self.client, stream)
# Build up name => index mapping for the columns
self.columns = OrderedDict()
for c in columns:
if (c['name'] in self.columns.keys() or
c['index'] in self.columns.values()):
raise DataError("duplicated columns")
if (c['index'] < 0 or c['index'] >= self.info.layout_count):
raise DataError("bad column number")
self.columns[c['name']] = c['index']
if not len(self.columns):
raise DataError("no columns")
# Count points
self.count = self.client.stream_count(self.stream, self.start, self.end)
def __str__(self):
return sprintf("%-20s: %s%s, %s rows",
self.name, self.stream, str(self.columns.keys()),
self.count)
def fetch(self, min_rows = 10, max_rows = 100000):
"""Fetch all the data into self.data. This is intended for
exemplars, and can only handle a relatively small number of
rows"""
# Verify count
if self.count == 0:
raise DataError("No data in this exemplar!")
if self.count < min_rows:
raise DataError("Too few data points: " + str(self.count))
if self.count > max_rows:
raise DataError("Too many data points: " + str(self.count))
# Extract the data
datagen = self.client.stream_extract_numpy(self.stream,
self.start, self.end,
self.info.layout,
maxrows = self.count)
self.data = list(datagen)[0]
# Discard timestamp
self.data = self.data[:,1:]
# Subtract the mean from each column
self.data = self.data - self.data.mean(axis=0)
# Get scale factors for each column by computing dot product
# of each column with itself.
self.scale = inner1d(self.data.T, self.data.T)
# Ensure a minimum (nonzero) scale and convert to list
self.scale = np.maximum(self.scale, [1e-9]).tolist()
def process(main, function, args = None, rows = 200000):
"""Process through the data; similar to nilmtools.Filter.process_numpy"""
if args is None:
args = []
extractor = main.client.stream_extract_numpy
old_array = np.array([])
for new_array in extractor(main.stream, main.start, main.end,
layout = main.info.layout, maxrows = rows):
# If we still had old data left, combine it
if old_array.shape[0] != 0:
array = np.vstack((old_array, new_array))
else:
array = new_array
# Process it
processed = function(array, args)
# Save the unprocessed parts
if processed >= 0:
old_array = array[processed:]
else:
raise Exception(sprintf("%s return value %s must be >= 0",
str(function), str(processed)))
# Warn if there's too much data remaining
if old_array.shape[0] > 3 * rows:
printf("warning: %d unprocessed rows in buffer\n",
old_array.shape[0])
# Handle leftover data
if old_array.shape[0] != 0:
processed = function(array, args)
def peak_detect(data, delta):
"""Simple min/max peak detection algorithm, taken from my code
in the disagg.m from the 10-8-5 paper"""
mins = [];
maxs = [];
cur_min = (None, np.inf)
cur_max = (None, -np.inf)
lookformax = False
for (n, p) in enumerate(data):
if p > cur_max[1]:
cur_max = (n, p)
if p < cur_min[1]:
cur_min = (n, p)
if lookformax:
if p < (cur_max[1] - delta):
maxs.append(cur_max)
cur_min = (n, p)
lookformax = False
else:
if p > (cur_min[1] + delta):
mins.append(cur_min)
cur_max = (n, p)
lookformax = True
return (mins, maxs)
def match(data, args):
"""Perform cross-correlation match"""
( columns, exemplars ) = args
nrows = data.shape[0]
# We want at least 10% more points than the widest exemplar.
widest = max([ x.count for x in exemplars ])
if (widest * 1.1) > nrows:
return 0
# This is how many points we'll consider valid in the
# cross-correlation.
valid = nrows + 1 - widest
matches = []
# Try matching against each of the exemplars
for e_num, e in enumerate(exemplars):
corrs = []
# Compute cross-correlation for each column
for c in e.columns:
a = data[:,columns[c] + 1]
b = e.data[:,e.columns[c]]
corr = scipy.signal.fftconvolve(a, np.flipud(b), 'valid')[0:valid]
# Scale by the norm of the exemplar
corr = corr / e.scale[columns[c]]
corrs.append(corr)
# Find the peaks using the column with the largest amplitude
biggest = e.scale.index(max(e.scale))
peaks_minmax = peak_detect(corrs[biggest], 0.1)
peaks = [ p[0] for p in peaks_minmax[1] ]
# Now look at every peak
for p in peaks:
# Correlation for each column must be close enough to 1.
for (corr, scale) in zip(corrs, e.scale):
# The accepted distance from 1 is based on the relative
# amplitude of the column. Use a linear mapping:
# scale 1.0 -> distance 0.1
# scale 0.0 -> distance 1.0
distance = 1 - 0.9 * (scale / e.scale[biggest])
if abs(corr[p] - 1) > distance:
# No match
break
else:
# Successful match
matches.append((p, e_num))
# Print matches
for (point, e_num) in sorted(matches):
# Ignore matches that showed up at the very tail of the window,
# and shorten the window accordingly. This is an attempt to avoid
# problems at chunk boundaries.
if point > (valid - 50):
valid -= 50
break
print "matched", data[point,0], "exemplar", exemplars[e_num].name
#from matplotlib import pyplot as p
#p.plot(data[:,1:3])
#p.show()
return max(valid, 0)
def trainola(conf):
# Load main stream data
print "Loading stream data"
main = Data(None, conf['url'], conf['stream'],
conf['start'], conf['end'], conf['columns'])
# Pull in the exemplar data
exemplars = []
for n, e in enumerate(conf['exemplars']):
print sprintf("Loading exemplar %d: %s", n, e['name'])
ex = Data(e['name'], e['url'], e['stream'],
e['start'], e['end'], e['columns'])
ex.fetch()
exemplars.append(ex)
# Verify that the exemplar columns are all represented in the main data
for n, ex in enumerate(exemplars):
for col in ex.columns:
if col not in main.columns:
raise DataError(sprintf("Exemplar %d column %s is not "
"available in main data", n, col))
# Process the main data
process(main, match, (main.columns, exemplars))
return "done"
filterfunc = trainola
def main(argv = None):
import simplejson as json
import argparse
import sys
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmrun.__version__,
description = """Run Trainola using parameters passed in as
JSON-formatted data.""")
parser.add_argument("file", metavar="FILE", nargs="?",
type=argparse.FileType('r'), default=sys.stdin)
args = parser.parse_args(argv)
conf = json.loads(args.file.read())
result = trainola(conf)
print json.dumps(result, sort_keys = True, indent = 2 * ' ')
if __name__ == "__main__":
main()

View File

@@ -3,15 +3,20 @@
from nilmdb.utils.printf import * from nilmdb.utils.printf import *
import threading import threading
import multiprocessing import subprocess
import cStringIO import cStringIO
import sys import sys
import os import os
import signal import signal
import time import time
import uuid import uuid
import subprocess
import psutil import psutil
import tempfile
import atexit
import shutil
class ProcessError(Exception):
pass
class LogReceiver(object): class LogReceiver(object):
"""Spawn a thread that listens to a pipe for log messages, """Spawn a thread that listens to a pipe for log messages,
@@ -37,106 +42,86 @@ class LogReceiver(object):
self.log = cStringIO.StringIO() self.log = cStringIO.StringIO()
class Process(object): class Process(object):
"""Spawn and manage a process that calls a Python function""" """Spawn and manage a subprocess, and capture its output."""
def __init__(self, name, function, parameters): def __init__(self, argv, tempfile = None):
self.parameters = parameters
self.start_time = None self.start_time = None
self.name = name
# Use a pipe for communicating log data # Use a pipe for communicating log data
(rpipe, wpipe) = os.pipe() (rpipe, wpipe) = os.pipe()
self._log = LogReceiver(rpipe) self._log = LogReceiver(rpipe)
# Start the function in a new process # Stdin is null
self._process = multiprocessing.Process( nullfd = os.open(os.devnull, os.O_RDONLY)
target = self._trampoline, name = name,
args = (function, rpipe, wpipe, parameters))
self._process.daemon = True
self._process.start()
# Close the writer end of the pipe, get process info # Spawn the new process
os.close(wpipe) try:
self._process = subprocess.Popen(args = argv, stdin = nullfd,
stdout = wpipe, stderr = wpipe,
close_fds = True, cwd = "/tmp")
except (OSError, TypeError) as e:
raise ProcessError(str(e))
finally:
# Close the FDs we don't need
os.close(wpipe)
os.close(nullfd)
# Get process info
self.start_time = time.time() self.start_time = time.time()
self.pid = str(uuid.uuid1(self._process.pid or 0)) self.pid = str(uuid.uuid1(self._process.pid or 0))
def _trampoline(self, func, rpipe, wpipe, param): # pragma: no cover def _join(self, timeout = 1.0):
# No coverage report for this, because it's executed in a subprocess start = time.time()
"""Trampoline function to set up stdio and call the real function.""" while True:
# Close the reader end of the pipe if self._process.poll() is not None:
os.close(rpipe) return True
if (time.time() - start) >= timeout:
# Like os.close() but ignores errors return False
def tryclose(fd): time.sleep(0.1)
try:
os.close(fd)
except OSError:
pass
# Remap stdio to go to the pipe. We do this at the OS level,
# replacing FDs, so that future spawned processes do the right thing.
# stdin
sys.stdin.close()
tryclose(0)
fd = os.open(os.devnull, os.O_RDONLY) # 0
sys.stdin = os.fdopen(fd, 'r', 0)
# stdout
sys.stdout.close()
tryclose(1)
fd = os.dup(wpipe) # 1
sys.stdout = os.fdopen(fd, 'w', 0)
# stdout
sys.stderr.close()
tryclose(2)
fd = os.dup(wpipe) # 2
sys.stderr = os.fdopen(fd, 'w', 0)
# Don't need this extra fd
os.close(wpipe)
# Ready to go -- call the function
func(param)
def terminate(self, timeout = 1.0): def terminate(self, timeout = 1.0):
"""Terminate a process, and all of its children that are in the same """Terminate a process, and all of its children that are in the same
process group.""" process group."""
# First give it some time to die on its own try:
self._process.join(timeout) # First give it some time to die on its own
if not self.alive: if self._join(timeout):
return True
def getpgid(pid):
try:
return os.getpgid(pid)
except OSError: # pragma: no cover
return None
def kill(pid, sig):
try:
return os.kill(pid, sig)
except OSError: # pragma: no cover
return
# Find all children
group = getpgid(self._process.pid)
main = psutil.Process(self._process.pid)
allproc = [ main ] + main.get_children(recursive = True)
# Kill with SIGTERM, if they're still in this process group
for proc in allproc:
if getpgid(proc.pid) == group:
kill(proc.pid, signal.SIGTERM)
# Wait for it to die again
if self._join(timeout):
return True
# One more try with SIGKILL
for proc in allproc:
if getpgid(proc.pid) == group:
kill(proc.pid, signal.SIGKILL)
# See if it worked
return self._join(timeout)
except psutil.Error: # pragma: no cover (race condition)
return True return True
def getpgid(pid):
try:
return os.getpgid(pid)
except OSError: # pragma: no cover
return None
# Find all children
group = getpgid(self._process.pid)
main = psutil.Process(self._process.pid)
allproc = [ main ] + main.get_children(recursive = True)
# Kill with SIGTERM, if they're still in this process group
for proc in allproc:
if getpgid(proc.pid) == group:
os.kill(proc.pid, signal.SIGTERM)
# Wait for it to die again
self._process.join(timeout)
if not self.alive:
return True
# One more try with SIGKILL
for proc in allproc:
if getpgid(proc.pid) == group:
os.kill(proc.pid, signal.SIGKILL)
# See if it worked
self._process.join(timeout)
return not self.alive
def clear_log(self): def clear_log(self):
self._log.clear() self._log.clear()
@@ -146,16 +131,80 @@ class Process(object):
@property @property
def alive(self): def alive(self):
return self._process.is_alive() return self._process.poll() is None
@property @property
def exitcode(self): def exitcode(self):
return self._process.exitcode return self._process.returncode
def get_info_prepare(self):
"""Prepare the process list and measurement for .get_info.
Call .get_info() about a second later."""
try:
main = psutil.Process(self._process.pid)
self._process_list = [ main ] + main.get_children(recursive = True)
for proc in self._process_list:
proc.get_cpu_percent(0)
except psutil.Error: # pragma: no cover (race condition)
self._process_list = [ ]
@staticmethod
def get_empty_info():
return { "cpu_percent": 0,
"cpu_user": 0,
"cpu_sys": 0,
"mem_phys": 0,
"mem_virt": 0,
"io_read": 0,
"io_write": 0,
"procs": 0 }
def get_info(self):
"""Return a dictionary with info about the process CPU and memory
usage. Call .get_info_prepare() about a second before this."""
d = self.get_empty_info()
for proc in self._process_list:
try:
d["cpu_percent"] += proc.get_cpu_percent(0)
cpuinfo = proc.get_cpu_times()
d["cpu_user"] += cpuinfo.user
d["cpu_sys"] += cpuinfo.system
meminfo = proc.get_memory_info()
d["mem_phys"] += meminfo.rss
d["mem_virt"] += meminfo.vms
ioinfo = proc.get_io_counters()
d["io_read"] += ioinfo.read_bytes
d["io_write"] += ioinfo.write_bytes
d["procs"] += 1
except psutil.Error:
pass
return d
class ProcessManager(object): class ProcessManager(object):
"""Track and manage a collection of Process objects""" """Track and manage a collection of Process objects"""
def __init__(self): def __init__(self):
self.processes = {} self.processes = {}
self.tmpdirs = {}
atexit.register(self._atexit)
def _cleanup_tmpdir(self, pid):
if pid in self.tmpdirs:
try:
shutil.rmtree(self.tmpdirs[pid])
except OSError: # pragma: no cover
pass
del self.tmpdirs[pid]
def _atexit(self):
# Kill remaining processes, remove their dirs
for pid in self.processes.keys():
try:
self.processes[pid].terminate()
del self.processes[pid]
shutil.rmtree(self.tmpdirs[pid])
del self.tmpdirs[pid]
except Exception: # pragma: no cover
pass
def __iter__(self): def __iter__(self):
return iter(self.processes.keys()) return iter(self.processes.keys())
@@ -163,19 +212,82 @@ class ProcessManager(object):
def __getitem__(self, key): def __getitem__(self, key):
return self.processes[key] return self.processes[key]
def run_python(self, name, function, parameters): def run_code(self, code, args):
new = Process(name, function, parameters) """Evaluate 'code' as if it were placed into a Python file and
executed. The arguments, which must be strings, will be
accessible in the code as sys.argv[1:]."""
# The easiest way to do this, by far, is to just write the
# code to a file. Make a directory to put it in.
tmpdir = tempfile.mkdtemp(prefix = "nilmrun-usercode-")
try:
# Write the code
codepath = os.path.join(tmpdir, "usercode.py")
with open(codepath, "w") as f:
f.write(code)
# Save the args too, for debugging purposes
with open(os.path.join(tmpdir, "args.txt"), "w") as f:
f.write(repr(args))
# Run the code
argv = [ sys.executable, "-B", "-s", "-u", codepath ] + args
pid = self.run_command(argv)
# Save the temp dir
self.tmpdirs[pid] = tmpdir
tmpdir = None # Don't need to remove it anymore
return pid
finally:
# Clean up tempdir if we didn't finish
if tmpdir is not None:
try:
shutil.rmtree(tmpdir)
except OSError: # pragma: no cover
pass
def run_command(self, argv):
"""Execute a command line program"""
new = Process(argv)
self.processes[new.pid] = new self.processes[new.pid] = new
return new.pid return new.pid
def run_command(self, name, args):
def spwan_user_command(args): # pragma: no cover (runs in subprocess)
p = subprocess.Popen(args, close_fds = True, cwd = "/tmp")
sys.exit(p.wait())
return self.run_python(name, spwan_user_command, args)
def terminate(self, pid): def terminate(self, pid):
return self.processes[pid].terminate() return self.processes[pid].terminate()
def remove(self, pid): def remove(self, pid):
self._cleanup_tmpdir(pid)
del self.processes[pid] del self.processes[pid]
def get_info(self):
"""Get info about all running PIDs"""
info = { "total" : Process.get_empty_info(),
"pids" : {},
"system" : {}
}
# Trigger CPU usage collection
for pid in self:
self[pid].get_info_prepare()
psutil.cpu_percent(0, percpu = True)
# Give it some time
time.sleep(1)
# Retrieve info for system
info["system"]["cpu_percent"] = sum(psutil.cpu_percent(0, percpu=True))
info["system"]["cpu_max"] = 100.0 * psutil.NUM_CPUS
info["system"]["procs"] = len(psutil.get_pid_list())
# psutil > 0.6.0's psutil.virtual_memory() would be better here,
# but this should give the same info.
meminfo = psutil.phymem_usage()
info["system"]["mem_total"] = meminfo.total
info["system"]["mem_used"] = int(meminfo.total * meminfo.percent / 100)
# Retrieve info for each PID
for pid in self:
info["pids"][pid] = self[pid].get_info()
# Update totals
for key in info["total"]:
info["total"][key] += info["pids"][pid][key]
return info

View File

@@ -5,10 +5,7 @@ import sys
import os import os
import socket import socket
import simplejson as json import simplejson as json
import decorator
import psutil
import traceback import traceback
import argparse
import time import time
import nilmdb import nilmdb
@@ -23,10 +20,11 @@ from nilmdb.server.serverutil import (
json_error_page, json_error_page,
cherrypy_start, cherrypy_start,
cherrypy_stop, cherrypy_stop,
bool_param,
) )
from nilmdb.utils import serializer_proxy
import nilmrun import nilmrun
import nilmrun.filters.trainola import nilmrun.testfilter
import nilmrun.filters.dummy
# Add CORS_allow tool # Add CORS_allow tool
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow) cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
@@ -63,14 +61,16 @@ class AppProcess(object):
self.manager = manager self.manager = manager
def process_status(self, pid): def process_status(self, pid):
# We need to convert the log (which is bytes) to Unicode
# characters, in order to send it via JSON. Treat it as UTF-8
# but replace invalid characters with markers.
log = self.manager[pid].log.decode('utf-8', errors='replace')
return { return {
"pid": pid, "pid": pid,
"alive": self.manager[pid].alive, "alive": self.manager[pid].alive,
"exitcode": self.manager[pid].exitcode, "exitcode": self.manager[pid].exitcode,
"name": self.manager[pid].name,
"start_time": self.manager[pid].start_time, "start_time": self.manager[pid].start_time,
"parameters": self.manager[pid].parameters, "log": log
"log": self.manager[pid].log,
} }
# /process/status # /process/status
@@ -79,6 +79,7 @@ class AppProcess(object):
def status(self, pid, clear = False): def status(self, pid, clear = False):
"""Return status about a process. If clear = True, also clear """Return status about a process. If clear = True, also clear
the log.""" the log."""
clear = bool_param(clear)
if pid not in self.manager: if pid not in self.manager:
raise cherrypy.HTTPError("404 Not Found", "No such PID") raise cherrypy.HTTPError("404 Not Found", "No such PID")
status = self.process_status(pid) status = self.process_status(pid)
@@ -93,6 +94,14 @@ class AppProcess(object):
"""Return a list of processes in the manager.""" """Return a list of processes in the manager."""
return list(self.manager) return list(self.manager)
# /process/info
@cherrypy.expose
@cherrypy.tools.json_out()
def info(self):
"""Return detailed CPU and memory info about the system and
all processes"""
return self.manager.get_info()
# /process/remove # /process/remove
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_in() @cherrypy.tools.json_in()
@@ -109,41 +118,43 @@ class AppProcess(object):
self.manager.remove(pid) self.manager.remove(pid)
return status return status
# /process/command class AppRun(object):
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@cherrypy.tools.CORS_allow(methods = ["POST"])
def command(self, args):
"""Execute an arbitrary program on the server. 'args' is the
argument list, with 'args[0]' being the program and 'args[1]',
'args[2]', etc as arguments."""
return self.manager.run_command("command", args)
class AppFilter(object):
def __init__(self, manager): def __init__(self, manager):
self.manager = manager self.manager = manager
# /filter/trainola # /run/command
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_in() @cherrypy.tools.json_in()
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@exception_to_httperror(KeyError, ValueError) @exception_to_httperror(nilmrun.processmanager.ProcessError)
@cherrypy.tools.CORS_allow(methods = ["POST"]) @cherrypy.tools.CORS_allow(methods = ["POST"])
def trainola(self, data): def command(self, argv):
return self.manager.run_python( """Execute an arbitrary program on the server. argv is a
"trainola", nilmrun.filters.trainola.filterfunc, data) list of the program and its arguments: 'argv[0]' is the program
and 'argv[1:]' are arguments"""
if not isinstance(argv, list):
raise cherrypy.HTTPError("400 Bad Request",
"argv must be a list of strings")
return self.manager.run_command(argv)
# /filter/dummy # /run/code
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_in() @cherrypy.tools.json_in()
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@exception_to_httperror(KeyError, ValueError) @exception_to_httperror(nilmrun.processmanager.ProcessError)
@cherrypy.tools.CORS_allow(methods = ["POST"]) @cherrypy.tools.CORS_allow(methods = ["POST"])
def dummy(self, data): def code(self, code, args = None):
return self.manager.run_python( """Execute arbitrary Python code. 'code' is a formatted string.
"dummy", nilmrun.filters.dummy.filterfunc, data) It will be run as if it were written into a Python file and
executed. 'args' is a list of strings, and they are passed
on the command line as additional arguments (i.e., they end up
in sys.argv[1:])"""
if args is None:
args = []
if not isinstance(args, list):
raise cherrypy.HTTPError("400 Bad Request",
"args must be a list of strings")
return self.manager.run_code(code, args)
class Server(object): class Server(object):
def __init__(self, host = '127.0.0.1', port = 8080, def __init__(self, host = '127.0.0.1', port = 8080,
@@ -194,11 +205,15 @@ class Server(object):
# error messages. # error messages.
cherrypy._cperror._ie_friendly_error_sizes = {} cherrypy._cperror._ie_friendly_error_sizes = {}
# The manager maintains internal state and isn't necessarily
# thread-safe, so wrap it in the serializer.
manager = serializer_proxy(nilmrun.processmanager.ProcessManager)()
# Build up the application and mount it # Build up the application and mount it
manager = nilmrun.processmanager.ProcessManager() self._manager = manager
root = App() root = App()
root.process = AppProcess(manager) root.process = AppProcess(manager)
root.filter = AppFilter(manager) root.run = AppRun(manager)
cherrypy.tree.apps = {} cherrypy.tree.apps = {}
cherrypy.tree.mount(root, basepath, config = { "/" : app_config }) cherrypy.tree.mount(root, basepath, config = { "/" : app_config })

View File

@@ -6,7 +6,7 @@ import signal
import sys import sys
# This is just for testing the process management. # This is just for testing the process management.
def filterfunc(n): def test(n):
n = int(n) n = int(n)
if n < 0: # raise an exception if n < 0: # raise an exception
raise Exception("test exception") raise Exception("test exception")

View File

@@ -61,10 +61,10 @@ setup(name='nilmrun',
long_description = "NILM Database Filter Runner", long_description = "NILM Database Filter Runner",
license = "Proprietary", license = "Proprietary",
author_email = 'jim@jtan.com', author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.8.0', install_requires = [ 'nilmdb >= 1.8.3',
'nilmtools >= 1.2.2', 'psutil >= 0.3.0',
'numpy', 'cherrypy >= 3.2',
'scipy', 'simplejson',
], ],
packages = [ 'nilmrun', packages = [ 'nilmrun',
'nilmrun.scripts', 'nilmrun.scripts',
@@ -75,7 +75,6 @@ setup(name='nilmrun',
entry_points = { entry_points = {
'console_scripts': [ 'console_scripts': [
'nilmrun-server = nilmrun.scripts.nilmrun_server:main', 'nilmrun-server = nilmrun.scripts.nilmrun_server:main',
'nilm-trainola = nilmrun.trainola:main',
], ],
}, },
zip_safe = False, zip_safe = False,

View File

@@ -1,3 +1,3 @@
test_client.py test_nilmrun.py
test_*.py test_*.py

View File

@@ -1,252 +0,0 @@
# -*- coding: utf-8 -*-
import nilmrun.server
from nilmdb.client.httpclient import HTTPClient, ClientError, ServerError
from nilmdb.utils.printf import *
from nose.plugins.skip import SkipTest
from nose.tools import *
from nose.tools import assert_raises
import itertools
import distutils.version
import os
import sys
import threading
import cStringIO
import simplejson as json
import unittest
import warnings
import time
import re
import urllib2
from urllib2 import urlopen, HTTPError
import requests
import pprint
from testutil.helpers import *
testurl = "http://localhost:32181/"
def setup_module():
global test_server
# Start web app on a custom port
test_server = nilmrun.server.Server(host = "127.0.0.1",
port = 32181,
force_traceback = True)
test_server.start(blocking = False)
def teardown_module():
global test_server
# Close web app
test_server.stop()
class TestClient(object):
def wait_end(self, client, pid, timeout = 5):
start = time.time()
status = None
while (time.time() - start) < timeout:
status = client.get("/process/status", { "pid": pid })
if status["alive"] == False:
return status
raise AssertionError("process " + str(pid) + " didn't die in " +
str(timeout) + " seconds: " + repr(status))
def test_client_01_basic(self):
client = HTTPClient(baseurl = testurl)
version = client.get("/version")
eq_(distutils.version.LooseVersion(version),
distutils.version.LooseVersion(nilmrun.__version__))
in_("This is NilmRun", client.get("/"))
with assert_raises(ClientError):
client.get("/favicon.ico")
def test_client_02_manager(self):
client = HTTPClient(baseurl = testurl)
eq_(client.get("/process/list"), [])
with assert_raises(ClientError) as e:
client.get("/process/status", { "pid": 12345 })
in_("No such PID", str(e.exception))
with assert_raises(ClientError):
client.get("/process/remove", { "pid": 12345 })
in_("No such PID", str(e.exception))
def test_client_03_process_basic(self):
client = HTTPClient(baseurl = testurl, post_json = True)
# start dummy filter
pid = client.post("/filter/dummy", { "data": 30 })
eq_(client.get("/process/list"), [pid])
time.sleep(1)
# Verify that status looks OK
status = client.get("/process/status", { "pid": pid, "clear": True })
for x in [ "pid", "alive", "exitcode", "name",
"start_time", "parameters", "log" ]:
in_(x, status)
in_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
eq_(status["alive"], True)
eq_(status["exitcode"], None)
# Check that the log got cleared
status = client.get("/process/status", { "pid": pid })
nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
# See that it ended properly
status = self.wait_end(client, pid)
in_("dummy 27\ndummy 28\ndummy 29\n", status["log"])
eq_(status["exitcode"], 0)
# Remove it
killstatus = client.post("/process/remove", { "pid": pid })
eq_(status, killstatus)
eq_(client.get("/process/list"), [])
with assert_raises(ClientError) as e:
client.post("/process/remove", { "pid": pid })
in_("No such PID", str(e.exception))
def test_client_04_process_terminate(self):
client = HTTPClient(baseurl = testurl, post_json = True)
# Trigger exception in filter
pid = client.post("/filter/dummy", { "data": -1 })
time.sleep(0.5)
status = client.get("/process/status", { "pid": pid })
eq_(status["alive"], False)
eq_(status["exitcode"], 1)
in_("Exception: test exception", status["log"])
client.post("/process/remove", { "pid": pid })
# Kill a running filter by removing it early
newpid = client.post("/filter/dummy", { "data": 50 })
ne_(newpid, pid)
time.sleep(0.5)
start = time.time()
status = client.post("/process/remove", { "pid": newpid })
elapsed = time.time() - start
# Should have died in slightly over 1 second
assert(0.5 < elapsed < 2)
eq_(status["alive"], False)
ne_(status["exitcode"], 0)
# No more
eq_(client.get("/process/list"), [])
# Try to remove a running filter that ignored SIGTERM
pid = client.post("/filter/dummy", { "data": 0 })
start = time.time()
status = client.post("/process/remove", { "pid": pid })
elapsed = time.time() - start
# Should have died in slightly over 2 seconds
assert(1.5 < elapsed < 3)
eq_(status["alive"], False)
ne_(status["exitcode"], 0)
def test_client_05_trainola_simple(self):
client = HTTPClient(baseurl = testurl, post_json = True)
pid = client.post("/filter/trainola", { "data": {} })
status = self.wait_end(client, pid)
ne_(status["exitcode"], 0)
status = client.post("/process/remove", { "pid": pid })
@unittest.skip("needs a running nilmdb")
def test_client_06_trainola(self):
client = HTTPClient(baseurl = testurl, post_json = True)
data = { "url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366111383280463,
"end": 1366126163457797,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 },
{ "name": "P3", "index": 2 } ],
"exemplars": [
{ "name": "Boiler Pump ON",
"url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366260494269078,
"end": 1366260608185031,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
},
{ "name": "Boiler Pump OFF",
"url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366260864215764,
"end": 1366260870882998,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
}
]
}
# start trainola
pid = client.post("/filter/trainola", { "data": data })
# wait for it to finish
for i in range(60):
time.sleep(1)
if i == 2:
status = client.get("/process/status", { "pid": pid,
"clear": True })
in_("Loading stream data", status['log'])
elif i == 3:
status = client.get("/process/status", { "pid": pid })
nin_("Loading stream data", status['log'])
else:
status = client.get("/process/status", { "pid": pid })
if status["alive"] == False:
break
else:
client.post("/process/remove", {"pid": pid })
raise AssertionError("took too long")
if i < 3:
raise AssertionError("too fast?")
def test_client_07_process_command(self):
client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("/process/list"), [])
def do(args, kill):
pid = client.post("/process/command", { "args": args } )
eq_(client.get("/process/list"), [pid])
if kill:
time.sleep(1)
status = client.get("/process/status", { "pid": pid })
if not status["alive"]:
raise AssertionError("died before we could kill it")
status = client.post("/process/remove", { "pid": pid })
if status["alive"]:
raise AssertionError("didn't get killed")
else:
self.wait_end(client, pid)
status = client.post("/process/remove", { "pid": pid })
return status
# Simple command
status = do(["pwd"], False)
eq_(status["exitcode"], 0)
eq_("/tmp\n", status["log"])
# Command with args
status = do(["expr", "1", "+", "2"], False)
eq_(status["exitcode"], 0)
eq_("3\n", status["log"])
# Missing command
status = do(["/no-such-command-blah-blah"], False)
ne_(status["exitcode"], 0)
# Kill a slow command
status = do(["sleep", "60"], True)
ne_(status["exitcode"], 0)

409
tests/test_nilmrun.py Normal file
View File

@@ -0,0 +1,409 @@
# -*- coding: utf-8 -*-
import nilmrun.server
from nilmdb.client.httpclient import HTTPClient, ClientError, ServerError
from nilmdb.utils.printf import *
from nose.plugins.skip import SkipTest
from nose.tools import *
from nose.tools import assert_raises
import itertools
import distutils.version
import os
import sys
import threading
import cStringIO
import simplejson as json
import unittest
import warnings
import time
import re
import urllib2
from urllib2 import urlopen, HTTPError
import requests
import pprint
import textwrap
from testutil.helpers import *
testurl = "http://localhost:32181/"
#testurl = "http://bucket.mit.edu/nilmrun/"
def setup_module():
global test_server
# Start web app on a custom port
test_server = nilmrun.server.Server(host = "127.0.0.1",
port = 32181,
force_traceback = True)
test_server.start(blocking = False)
def teardown_module():
global test_server
# Close web app
test_server.stop()
class TestClient(object):
def wait_kill(self, client, pid, timeout = 1):
time.sleep(timeout)
status = client.get("process/status", { "pid": pid })
if not status["alive"]:
raise AssertionError("died before we could kill it")
status = client.post("process/remove", { "pid": pid })
if status["alive"]:
raise AssertionError("didn't get killed")
return status
def wait_end(self, client, pid, timeout = 5, remove = True):
start = time.time()
status = None
while (time.time() - start) < timeout:
status = client.get("process/status", { "pid": pid })
if status["alive"] == False:
break
time.sleep(0.1)
else:
raise AssertionError("process " + str(pid) + " didn't die in " +
str(timeout) + " seconds: " + repr(status))
if remove:
status = client.post("process/remove", { "pid": pid })
return status
def test_client_01_basic(self):
client = HTTPClient(baseurl = testurl)
version = client.get("version")
eq_(distutils.version.LooseVersion(version),
distutils.version.LooseVersion(nilmrun.__version__))
in_("This is NilmRun", client.get(""))
with assert_raises(ClientError):
client.get("favicon.ico")
def test_client_02_manager(self):
client = HTTPClient(baseurl = testurl)
eq_(client.get("process/list"), [])
with assert_raises(ClientError) as e:
client.get("process/status", { "pid": 12345 })
in_("No such PID", str(e.exception))
with assert_raises(ClientError):
client.get("process/remove", { "pid": 12345 })
in_("No such PID", str(e.exception))
def test_client_03_run_command(self):
client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("process/list"), [])
def do(argv, kill):
pid = client.post("run/command", { "argv": argv } )
eq_(client.get("process/list"), [pid])
if kill:
return self.wait_kill(client, pid)
return self.wait_end(client, pid)
# Simple command
status = do(["pwd"], False)
eq_(status["exitcode"], 0)
eq_("/tmp\n", status["log"])
# Command with args
status = do(["expr", "1", "+", "2"], False)
eq_(status["exitcode"], 0)
eq_("3\n", status["log"])
# Missing command
with assert_raises(ClientError) as e:
do(["/no-such-command-blah-blah"], False)
in_("No such file or directory", str(e.exception))
# Kill a slow command
status = do(["sleep", "60"], True)
ne_(status["exitcode"], 0)
def _run_testfilter(self, client, args):
code = textwrap.dedent("""
import nilmrun.testfilter
import simplejson as json
import sys
nilmrun.testfilter.test(json.loads(sys.argv[1]))
""")
jsonargs = json.dumps(args)
return client.post("run/code", { "code": code, "args": [ jsonargs ] })
def test_client_04_process_basic(self):
client = HTTPClient(baseurl = testurl, post_json = True)
# start dummy filter
pid = self._run_testfilter(client, 30)
eq_(client.get("process/list"), [pid])
time.sleep(1)
# Verify that status looks OK
status = client.get("process/status", { "pid": pid, "clear": True })
for x in [ "pid", "alive", "exitcode", "start_time", "log" ]:
in_(x, status)
in_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
eq_(status["alive"], True)
eq_(status["exitcode"], None)
# Check that the log got cleared
status = client.get("process/status", { "pid": pid })
nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
# See that it ended properly
status = self.wait_end(client, pid, remove = False)
in_("dummy 27\ndummy 28\ndummy 29\n", status["log"])
eq_(status["exitcode"], 0)
# Remove it
killstatus = client.post("process/remove", { "pid": pid })
eq_(status, killstatus)
eq_(client.get("process/list"), [])
with assert_raises(ClientError) as e:
client.post("process/remove", { "pid": pid })
in_("No such PID", str(e.exception))
def test_client_05_process_terminate(self):
client = HTTPClient(baseurl = testurl, post_json = True)
# Trigger exception in filter
pid = self._run_testfilter(client, -1)
time.sleep(0.5)
status = client.get("process/status", { "pid": pid })
eq_(status["alive"], False)
eq_(status["exitcode"], 1)
in_("Exception: test exception", status["log"])
client.post("process/remove", { "pid": pid })
# Kill a running filter by removing it early
newpid = self._run_testfilter(client, 50)
ne_(newpid, pid)
time.sleep(0.5)
start = time.time()
status = client.post("process/remove", { "pid": newpid })
elapsed = time.time() - start
# Should have died in slightly over 1 second
assert(0.5 < elapsed < 2)
eq_(status["alive"], False)
ne_(status["exitcode"], 0)
# No more
eq_(client.get("process/list"), [])
# Try to remove a running filter that ignored SIGTERM
pid = self._run_testfilter(client, 0)
start = time.time()
status = client.post("process/remove", { "pid": pid })
elapsed = time.time() - start
# Should have died in slightly over 2 seconds
assert(1.5 < elapsed < 3)
eq_(status["alive"], False)
ne_(status["exitcode"], 0)
@unittest.skip("needs a running nilmdb; trainola moved to nilmtools")
def test_client_06_trainola(self):
client = HTTPClient(baseurl = testurl, post_json = True)
data = { "url": "http://bucket.mit.edu/nilmdb",
"dest_stream": "/sharon/prep-a-matches",
"stream": "/sharon/prep-a",
"start": 1366111383280463,
"end": 1366126163457797,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 },
{ "name": "P3", "index": 2 } ],
"exemplars": [
{ "name": "Boiler Pump ON",
"url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366260494269078,
"end": 1366260608185031,
"dest_column": 0,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
},
{ "name": "Boiler Pump OFF",
"url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366260864215764,
"end": 1366260870882998,
"dest_column": 1,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
}
]
}
pid = client.post("run/code", { "code": "import nilmtools.trainola\n" +
"nilmtools.trainola.main()",
"args": [ data ] })
while True:
status = client.get("process/status", { "pid": pid, "clear": 1 })
sys.stdout.write(status["log"])
sys.stdout.flush()
if status["alive"] == False:
break
time.sleep(0.1)
status = client.post("process/remove", { "pid": pid })
os._exit(int(status["exitcode"]))
def test_client_07_run_code(self):
client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("process/list"), [])
def do(code, args, kill):
if args is not None:
pid = client.post("run/code", { "code": code, "args": args } )
else:
pid = client.post("run/code", { "code": code } )
eq_(client.get("process/list"), [pid])
if kill:
return self.wait_kill(client, pid)
return self.wait_end(client, pid)
# basic code snippet
code = textwrap.dedent("""
print 'hello'
def foo(arg):
print 'world'
""")
status = do(code, [], False)
eq_("hello\n", status["log"])
eq_(status["exitcode"], 0)
# compile error
code = textwrap.dedent("""
def foo(arg:
print 'hello'
""")
status = do(code, [], False)
in_("SyntaxError", status["log"])
eq_(status["exitcode"], 1)
# traceback in user code should be formatted nicely
code = textwrap.dedent("""
def foo(arg):
raise Exception(arg)
foo(123)
""")
status = do(code, [], False)
cleaned_log = re.sub('File "[^"]*",', 'File "",', status["log"])
eq_('Traceback (most recent call last):\n' +
' File "", line 4, in <module>\n' +
' foo(123)\n' +
' File "", line 3, in foo\n' +
' raise Exception(arg)\n' +
'Exception: 123\n', cleaned_log)
eq_(status["exitcode"], 1)
# argument handling (strings come in as unicode)
code = textwrap.dedent("""
import sys
print sys.argv[1].encode('ascii'), sys.argv[2]
sys.exit(0) # also test raising SystemExit
""")
with assert_raises(ClientError) as e:
do(code, ["hello", 123], False)
in_("400 Bad Request", str(e.exception))
status = do(code, ["hello", "123"], False)
eq_(status["log"], "hello 123\n")
eq_(status["exitcode"], 0)
# try killing a long-running process
code = textwrap.dedent("""
import time
print 'hello'
time.sleep(60)
print 'world'
""")
status = do(code, [], True)
eq_(status["log"], "hello\n")
ne_(status["exitcode"], 0)
# default arguments are empty
code = textwrap.dedent("""
import sys
print 'args:', len(sys.argv[1:])
""")
status = do(code, None, False)
eq_(status["log"], "args: 0\n")
eq_(status["exitcode"], 0)
def test_client_08_bad_types(self):
client = HTTPClient(baseurl = testurl, post_json = True)
with assert_raises(ClientError) as e:
client.post("run/code", { "code": "asdf", "args": "qwer" })
in_("must be a list", str(e.exception))
with assert_raises(ClientError) as e:
client.post("run/command", { "argv": "asdf" })
in_("must be a list", str(e.exception))
def test_client_09_info(self):
client = HTTPClient(baseurl = testurl, post_json = True)
# start some processes
a = client.post("run/command", { "argv": ["sleep","60"] } )
b = client.post("run/command", { "argv": ["sh","-c","sleep 2;true"] } )
c = client.post("run/command", { "argv": ["sh","-c","burnP5;true"] } )
d = client.post("run/command", { "argv": ["burnP5" ] } )
info = client.get("process/info")
eq_(info["pids"][a]["procs"], 1)
eq_(info["pids"][b]["procs"], 2)
eq_(info["pids"][c]["procs"], 2)
eq_(info["pids"][d]["procs"], 1)
eq_(info["total"]["procs"], 6)
lt_(info["pids"][a]["cpu_percent"], 50)
lt_(20, info["pids"][c]["cpu_percent"])
lt_(80, info["system"]["cpu_percent"])
time.sleep(2)
info = client.get("process/info")
eq_(info["pids"][b]["procs"], 0)
# kill all processes
for pid in client.get("process/list"):
client.post("process/remove", { "pid": pid })
def test_client_10_unicode(self):
client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("process/list"), [])
def verify(cmd, result):
pid = client.post("run/command", { "argv": [ "sh", "-c", cmd ] })
eq_(client.get("process/list"), [pid])
status = self.wait_end(client, pid)
eq_(result, status["log"])
# Unicode should work
verify("echo -n hello", "hello")
verify(u"echo -n ☠", u"")
verify("echo -ne \\\\xe2\\\\x98\\\\xa0", u"")
# Programs that spit out invalid UTF-8 should get replacement
# markers
verify("echo -ne \\\\xae", u"\ufffd")
def test_client_11_atexit(self):
# Leave a directory and running process behind, for the atexit
# handler to clean up. Here we trigger the atexit manually,
# since it's hard to trigger it as part of the test suite.
client = HTTPClient(baseurl = testurl, post_json = True)
code = textwrap.dedent("""
import time
time.sleep(10)
""")
client.post("run/code", { "code": code, "args": [ "hello"] })
# Trigger atexit function
test_server._manager._atexit()
# Ensure no processes exit
eq_(client.get("process/list"), [])