Compare commits
8 Commits
nilmrun-0.
...
nilmrun-1.
Author | SHA1 | Date | |
---|---|---|---|
2e9ec63675 | |||
6d295b840a | |||
74a05d05d6 | |||
35b20c90a5 | |||
6d9ee7b405 | |||
721d6c4936 | |||
973d328e1e | |||
0338d40226 |
@@ -1 +0,0 @@
|
|||||||
# Filters
|
|
@@ -1,260 +0,0 @@
|
|||||||
#!/usr/bin/python
|
|
||||||
|
|
||||||
from nilmdb.utils.printf import *
|
|
||||||
import nilmdb.client
|
|
||||||
import nilmtools.filter
|
|
||||||
from nilmdb.utils.time import (timestamp_to_human,
|
|
||||||
timestamp_to_seconds,
|
|
||||||
seconds_to_timestamp)
|
|
||||||
from nilmdb.utils.interval import Interval
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
import scipy
|
|
||||||
import scipy.signal
|
|
||||||
from numpy.core.umath_tests import inner1d
|
|
||||||
import nilmrun
|
|
||||||
from collections import OrderedDict
|
|
||||||
|
|
||||||
class DataError(ValueError):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class Data(object):
|
|
||||||
def __init__(self, name, url, stream, start, end, columns):
|
|
||||||
"""Initialize, get stream info, check columns"""
|
|
||||||
self.name = name
|
|
||||||
self.url = url
|
|
||||||
self.stream = stream
|
|
||||||
self.start = start
|
|
||||||
self.end = end
|
|
||||||
|
|
||||||
# Get stream info
|
|
||||||
self.client = nilmdb.client.numpyclient.NumpyClient(url)
|
|
||||||
self.info = nilmtools.filter.get_stream_info(self.client, stream)
|
|
||||||
|
|
||||||
# Build up name => index mapping for the columns
|
|
||||||
self.columns = OrderedDict()
|
|
||||||
for c in columns:
|
|
||||||
if (c['name'] in self.columns.keys() or
|
|
||||||
c['index'] in self.columns.values()):
|
|
||||||
raise DataError("duplicated columns")
|
|
||||||
if (c['index'] < 0 or c['index'] >= self.info.layout_count):
|
|
||||||
raise DataError("bad column number")
|
|
||||||
self.columns[c['name']] = c['index']
|
|
||||||
if not len(self.columns):
|
|
||||||
raise DataError("no columns")
|
|
||||||
|
|
||||||
# Count points
|
|
||||||
self.count = self.client.stream_count(self.stream, self.start, self.end)
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return sprintf("%-20s: %s%s, %s rows",
|
|
||||||
self.name, self.stream, str(self.columns.keys()),
|
|
||||||
self.count)
|
|
||||||
|
|
||||||
def fetch(self, min_rows = 10, max_rows = 100000):
|
|
||||||
"""Fetch all the data into self.data. This is intended for
|
|
||||||
exemplars, and can only handle a relatively small number of
|
|
||||||
rows"""
|
|
||||||
# Verify count
|
|
||||||
if self.count == 0:
|
|
||||||
raise DataError("No data in this exemplar!")
|
|
||||||
if self.count < min_rows:
|
|
||||||
raise DataError("Too few data points: " + str(self.count))
|
|
||||||
if self.count > max_rows:
|
|
||||||
raise DataError("Too many data points: " + str(self.count))
|
|
||||||
|
|
||||||
# Extract the data
|
|
||||||
datagen = self.client.stream_extract_numpy(self.stream,
|
|
||||||
self.start, self.end,
|
|
||||||
self.info.layout,
|
|
||||||
maxrows = self.count)
|
|
||||||
self.data = list(datagen)[0]
|
|
||||||
|
|
||||||
# Discard timestamp
|
|
||||||
self.data = self.data[:,1:]
|
|
||||||
|
|
||||||
# Subtract the mean from each column
|
|
||||||
self.data = self.data - self.data.mean(axis=0)
|
|
||||||
|
|
||||||
# Get scale factors for each column by computing dot product
|
|
||||||
# of each column with itself.
|
|
||||||
self.scale = inner1d(self.data.T, self.data.T)
|
|
||||||
|
|
||||||
# Ensure a minimum (nonzero) scale and convert to list
|
|
||||||
self.scale = np.maximum(self.scale, [1e-9]).tolist()
|
|
||||||
|
|
||||||
def process(main, function, args = None, rows = 200000):
|
|
||||||
"""Process through the data; similar to nilmtools.Filter.process_numpy"""
|
|
||||||
if args is None:
|
|
||||||
args = []
|
|
||||||
|
|
||||||
extractor = main.client.stream_extract_numpy
|
|
||||||
old_array = np.array([])
|
|
||||||
for new_array in extractor(main.stream, main.start, main.end,
|
|
||||||
layout = main.info.layout, maxrows = rows):
|
|
||||||
# If we still had old data left, combine it
|
|
||||||
if old_array.shape[0] != 0:
|
|
||||||
array = np.vstack((old_array, new_array))
|
|
||||||
else:
|
|
||||||
array = new_array
|
|
||||||
|
|
||||||
# Process it
|
|
||||||
processed = function(array, args)
|
|
||||||
|
|
||||||
# Save the unprocessed parts
|
|
||||||
if processed >= 0:
|
|
||||||
old_array = array[processed:]
|
|
||||||
else:
|
|
||||||
raise Exception(sprintf("%s return value %s must be >= 0",
|
|
||||||
str(function), str(processed)))
|
|
||||||
|
|
||||||
# Warn if there's too much data remaining
|
|
||||||
if old_array.shape[0] > 3 * rows:
|
|
||||||
printf("warning: %d unprocessed rows in buffer\n",
|
|
||||||
old_array.shape[0])
|
|
||||||
|
|
||||||
# Handle leftover data
|
|
||||||
if old_array.shape[0] != 0:
|
|
||||||
processed = function(array, args)
|
|
||||||
|
|
||||||
def peak_detect(data, delta):
|
|
||||||
"""Simple min/max peak detection algorithm, taken from my code
|
|
||||||
in the disagg.m from the 10-8-5 paper"""
|
|
||||||
mins = [];
|
|
||||||
maxs = [];
|
|
||||||
cur_min = (None, np.inf)
|
|
||||||
cur_max = (None, -np.inf)
|
|
||||||
lookformax = False
|
|
||||||
for (n, p) in enumerate(data):
|
|
||||||
if p > cur_max[1]:
|
|
||||||
cur_max = (n, p)
|
|
||||||
if p < cur_min[1]:
|
|
||||||
cur_min = (n, p)
|
|
||||||
if lookformax:
|
|
||||||
if p < (cur_max[1] - delta):
|
|
||||||
maxs.append(cur_max)
|
|
||||||
cur_min = (n, p)
|
|
||||||
lookformax = False
|
|
||||||
else:
|
|
||||||
if p > (cur_min[1] + delta):
|
|
||||||
mins.append(cur_min)
|
|
||||||
cur_max = (n, p)
|
|
||||||
lookformax = True
|
|
||||||
return (mins, maxs)
|
|
||||||
|
|
||||||
def match(data, args):
|
|
||||||
"""Perform cross-correlation match"""
|
|
||||||
( columns, exemplars ) = args
|
|
||||||
nrows = data.shape[0]
|
|
||||||
|
|
||||||
# We want at least 10% more points than the widest exemplar.
|
|
||||||
widest = max([ x.count for x in exemplars ])
|
|
||||||
if (widest * 1.1) > nrows:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
# This is how many points we'll consider valid in the
|
|
||||||
# cross-correlation.
|
|
||||||
valid = nrows + 1 - widest
|
|
||||||
matches = []
|
|
||||||
|
|
||||||
# Try matching against each of the exemplars
|
|
||||||
for e_num, e in enumerate(exemplars):
|
|
||||||
corrs = []
|
|
||||||
|
|
||||||
# Compute cross-correlation for each column
|
|
||||||
for c in e.columns:
|
|
||||||
a = data[:,columns[c] + 1]
|
|
||||||
b = e.data[:,e.columns[c]]
|
|
||||||
corr = scipy.signal.fftconvolve(a, np.flipud(b), 'valid')[0:valid]
|
|
||||||
|
|
||||||
# Scale by the norm of the exemplar
|
|
||||||
corr = corr / e.scale[columns[c]]
|
|
||||||
corrs.append(corr)
|
|
||||||
|
|
||||||
# Find the peaks using the column with the largest amplitude
|
|
||||||
biggest = e.scale.index(max(e.scale))
|
|
||||||
peaks_minmax = peak_detect(corrs[biggest], 0.1)
|
|
||||||
peaks = [ p[0] for p in peaks_minmax[1] ]
|
|
||||||
|
|
||||||
# Now look at every peak
|
|
||||||
for p in peaks:
|
|
||||||
# Correlation for each column must be close enough to 1.
|
|
||||||
for (corr, scale) in zip(corrs, e.scale):
|
|
||||||
# The accepted distance from 1 is based on the relative
|
|
||||||
# amplitude of the column. Use a linear mapping:
|
|
||||||
# scale 1.0 -> distance 0.1
|
|
||||||
# scale 0.0 -> distance 1.0
|
|
||||||
distance = 1 - 0.9 * (scale / e.scale[biggest])
|
|
||||||
if abs(corr[p] - 1) > distance:
|
|
||||||
# No match
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
# Successful match
|
|
||||||
matches.append((p, e_num))
|
|
||||||
|
|
||||||
# Print matches
|
|
||||||
for (point, e_num) in sorted(matches):
|
|
||||||
# Ignore matches that showed up at the very tail of the window,
|
|
||||||
# and shorten the window accordingly. This is an attempt to avoid
|
|
||||||
# problems at chunk boundaries.
|
|
||||||
if point > (valid - 50):
|
|
||||||
valid -= 50
|
|
||||||
break
|
|
||||||
print "matched", data[point,0], "exemplar", exemplars[e_num].name
|
|
||||||
|
|
||||||
#from matplotlib import pyplot as p
|
|
||||||
#p.plot(data[:,1:3])
|
|
||||||
#p.show()
|
|
||||||
|
|
||||||
return max(valid, 0)
|
|
||||||
|
|
||||||
def trainola(conf):
|
|
||||||
# Load main stream data
|
|
||||||
print "Loading stream data"
|
|
||||||
main = Data(None, conf['url'], conf['stream'],
|
|
||||||
conf['start'], conf['end'], conf['columns'])
|
|
||||||
|
|
||||||
# Pull in the exemplar data
|
|
||||||
exemplars = []
|
|
||||||
for n, e in enumerate(conf['exemplars']):
|
|
||||||
print sprintf("Loading exemplar %d: %s", n, e['name'])
|
|
||||||
ex = Data(e['name'], e['url'], e['stream'],
|
|
||||||
e['start'], e['end'], e['columns'])
|
|
||||||
ex.fetch()
|
|
||||||
exemplars.append(ex)
|
|
||||||
|
|
||||||
# Verify that the exemplar columns are all represented in the main data
|
|
||||||
for n, ex in enumerate(exemplars):
|
|
||||||
for col in ex.columns:
|
|
||||||
if col not in main.columns:
|
|
||||||
raise DataError(sprintf("Exemplar %d column %s is not "
|
|
||||||
"available in main data", n, col))
|
|
||||||
|
|
||||||
# Process the main data
|
|
||||||
process(main, match, (main.columns, exemplars))
|
|
||||||
|
|
||||||
return "done"
|
|
||||||
|
|
||||||
filterfunc = trainola
|
|
||||||
|
|
||||||
def main(argv = None):
|
|
||||||
import simplejson as json
|
|
||||||
import argparse
|
|
||||||
import sys
|
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(
|
|
||||||
formatter_class = argparse.RawDescriptionHelpFormatter,
|
|
||||||
version = nilmrun.__version__,
|
|
||||||
description = """Run Trainola using parameters passed in as
|
|
||||||
JSON-formatted data.""")
|
|
||||||
parser.add_argument("file", metavar="FILE", nargs="?",
|
|
||||||
type=argparse.FileType('r'), default=sys.stdin)
|
|
||||||
args = parser.parse_args(argv)
|
|
||||||
|
|
||||||
conf = json.loads(args.file.read())
|
|
||||||
result = trainola(conf)
|
|
||||||
print json.dumps(result, sort_keys = True, indent = 2 * ' ')
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|
@@ -10,8 +10,9 @@ import os
|
|||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
import subprocess
|
|
||||||
import psutil
|
import psutil
|
||||||
|
import imp
|
||||||
|
import traceback
|
||||||
|
|
||||||
class LogReceiver(object):
|
class LogReceiver(object):
|
||||||
"""Spawn a thread that listens to a pipe for log messages,
|
"""Spawn a thread that listens to a pipe for log messages,
|
||||||
@@ -96,8 +97,9 @@ class Process(object):
|
|||||||
# Don't need this extra fd
|
# Don't need this extra fd
|
||||||
os.close(wpipe)
|
os.close(wpipe)
|
||||||
|
|
||||||
# Ready to go -- call the function
|
# Ready to go -- call the function, exit when it's done
|
||||||
func(param)
|
func(param)
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
def terminate(self, timeout = 1.0):
|
def terminate(self, timeout = 1.0):
|
||||||
"""Terminate a process, and all of its children that are in the same
|
"""Terminate a process, and all of its children that are in the same
|
||||||
@@ -152,6 +154,54 @@ class Process(object):
|
|||||||
def exitcode(self):
|
def exitcode(self):
|
||||||
return self._process.exitcode
|
return self._process.exitcode
|
||||||
|
|
||||||
|
def _exec_user_code(codeargs): # pragma: no cover (runs in subprocess)
|
||||||
|
"""Execute 'code' as if it were placed into a file and executed"""
|
||||||
|
(code, args) = codeargs
|
||||||
|
# This is split off into a separate function because the Python3
|
||||||
|
# syntax of "exec" triggers a SyntaxError in Python2, if it's within
|
||||||
|
# a nested function.
|
||||||
|
imp.acquire_lock()
|
||||||
|
try:
|
||||||
|
module = imp.new_module("__main__")
|
||||||
|
finally:
|
||||||
|
imp.release_lock()
|
||||||
|
module.__file__ = "<user-code>"
|
||||||
|
sys.argv = [''] + args
|
||||||
|
# Wrap the compile and exec in a try/except so we can format the
|
||||||
|
# exception more nicely.
|
||||||
|
try:
|
||||||
|
codeobj = compile(code, '<user-code>', 'exec',
|
||||||
|
flags = 0, dont_inherit = 1)
|
||||||
|
exec(codeobj, module.__dict__, {})
|
||||||
|
except Exception:
|
||||||
|
try:
|
||||||
|
# Pull out the exception
|
||||||
|
info = sys.exc_info()
|
||||||
|
tblist = traceback.extract_tb(info[2])
|
||||||
|
|
||||||
|
# First entry is probably this code; get rid of it
|
||||||
|
if len(tblist) and tblist[0][2] == '_exec_user_code':
|
||||||
|
tblist = tblist[1:]
|
||||||
|
|
||||||
|
# Add the user's source code to every line that's missing it
|
||||||
|
lines = code.splitlines()
|
||||||
|
maxline = len(lines)
|
||||||
|
for (n, (name, line, func, text)) in enumerate(tblist):
|
||||||
|
if name == '<user-code>' and text is None and line <= maxline:
|
||||||
|
tblist[n] = (name, line, func, lines[line-1].strip())
|
||||||
|
|
||||||
|
# Format it in the usual manner
|
||||||
|
out = ['Traceback (most recent call last):\n']
|
||||||
|
out.extend(traceback.format_list(tblist))
|
||||||
|
out.extend(traceback.format_exception_only(info[0], info[1]))
|
||||||
|
finally:
|
||||||
|
# Need to explicitly delete traceback object to avoid ref cycle
|
||||||
|
del info
|
||||||
|
sys.stderr.write("".join(out))
|
||||||
|
sys.stderr.flush()
|
||||||
|
sys.exit(1)
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
class ProcessManager(object):
|
class ProcessManager(object):
|
||||||
"""Track and manage a collection of Process objects"""
|
"""Track and manage a collection of Process objects"""
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@@ -163,16 +213,32 @@ class ProcessManager(object):
|
|||||||
def __getitem__(self, key):
|
def __getitem__(self, key):
|
||||||
return self.processes[key]
|
return self.processes[key]
|
||||||
|
|
||||||
def run_python(self, name, function, parameters):
|
def run_function(self, procname, function, parameters):
|
||||||
new = Process(name, function, parameters)
|
"""Run a Python function that already exists"""
|
||||||
|
new = Process(procname, function, parameters)
|
||||||
self.processes[new.pid] = new
|
self.processes[new.pid] = new
|
||||||
return new.pid
|
return new.pid
|
||||||
|
|
||||||
def run_command(self, name, args):
|
def run_code(self, procname, code, args):
|
||||||
def spwan_user_command(args): # pragma: no cover (runs in subprocess)
|
"""Evaluate 'code' as if it were placed into a Python file and
|
||||||
p = subprocess.Popen(args, close_fds = True, cwd = "/tmp")
|
executed. The arguments will be accessible in the code as
|
||||||
sys.exit(p.wait())
|
sys.argv[1:]."""
|
||||||
return self.run_python(name, spwan_user_command, args)
|
return self.run_function(procname, _exec_user_code, (code, args))
|
||||||
|
|
||||||
|
def run_command(self, procname, argv):
|
||||||
|
"""Execute a command line program"""
|
||||||
|
def spwan_user_command(argv): # pragma: no cover (runs in subprocess)
|
||||||
|
try:
|
||||||
|
maxfd = os.sysconf("SC_OPEN_MAX")
|
||||||
|
except Exception:
|
||||||
|
maxfd = 256
|
||||||
|
os.closerange(3, maxfd)
|
||||||
|
try:
|
||||||
|
os.chdir("/tmp")
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
os.execvp(argv[0], argv)
|
||||||
|
return self.run_function(procname, spwan_user_command, argv)
|
||||||
|
|
||||||
def terminate(self, pid):
|
def terminate(self, pid):
|
||||||
return self.processes[pid].terminate()
|
return self.processes[pid].terminate()
|
||||||
|
@@ -25,8 +25,7 @@ from nilmdb.server.serverutil import (
|
|||||||
cherrypy_stop,
|
cherrypy_stop,
|
||||||
)
|
)
|
||||||
import nilmrun
|
import nilmrun
|
||||||
import nilmrun.filters.trainola
|
import nilmrun.testfilter
|
||||||
import nilmrun.filters.dummy
|
|
||||||
|
|
||||||
# Add CORS_allow tool
|
# Add CORS_allow tool
|
||||||
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
|
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
|
||||||
@@ -109,41 +108,42 @@ class AppProcess(object):
|
|||||||
self.manager.remove(pid)
|
self.manager.remove(pid)
|
||||||
return status
|
return status
|
||||||
|
|
||||||
# /process/command
|
class AppRun(object):
|
||||||
@cherrypy.expose
|
|
||||||
@cherrypy.tools.json_in()
|
|
||||||
@cherrypy.tools.json_out()
|
|
||||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
|
||||||
def command(self, args):
|
|
||||||
"""Execute an arbitrary program on the server. 'args' is the
|
|
||||||
argument list, with 'args[0]' being the program and 'args[1]',
|
|
||||||
'args[2]', etc as arguments."""
|
|
||||||
return self.manager.run_command("command", args)
|
|
||||||
|
|
||||||
class AppFilter(object):
|
|
||||||
|
|
||||||
def __init__(self, manager):
|
def __init__(self, manager):
|
||||||
self.manager = manager
|
self.manager = manager
|
||||||
|
|
||||||
# /filter/trainola
|
# /run/command
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
@cherrypy.tools.json_in()
|
@cherrypy.tools.json_in()
|
||||||
@cherrypy.tools.json_out()
|
@cherrypy.tools.json_out()
|
||||||
@exception_to_httperror(KeyError, ValueError)
|
|
||||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||||
def trainola(self, data):
|
def command(self, argv):
|
||||||
return self.manager.run_python(
|
"""Execute an arbitrary program on the server. argv is a
|
||||||
"trainola", nilmrun.filters.trainola.filterfunc, data)
|
list of the program and its arguments: 'argv[0]' is the program
|
||||||
|
and 'argv[1:]' are arguments"""
|
||||||
|
return self.manager.run_command("command", argv)
|
||||||
|
|
||||||
# /filter/dummy
|
# /run/code
|
||||||
|
@cherrypy.expose
|
||||||
|
@cherrypy.tools.json_in()
|
||||||
|
@cherrypy.tools.json_out()
|
||||||
|
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||||
|
def code(self, code, args):
|
||||||
|
"""Execute arbitrary Python code. 'code' is a formatted string.
|
||||||
|
It will be run as if it were written into a Python file and
|
||||||
|
executed, with the arguments in 'args' passed on the command line
|
||||||
|
(i.e., they end up in sys.argv[1:])"""
|
||||||
|
return self.manager.run_code("usercode", code, args)
|
||||||
|
|
||||||
|
# /run/testfilter
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
@cherrypy.tools.json_in()
|
@cherrypy.tools.json_in()
|
||||||
@cherrypy.tools.json_out()
|
@cherrypy.tools.json_out()
|
||||||
@exception_to_httperror(KeyError, ValueError)
|
@exception_to_httperror(KeyError, ValueError)
|
||||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||||
def dummy(self, data):
|
def testfilter(self, data):
|
||||||
return self.manager.run_python(
|
return self.manager.run_function(
|
||||||
"dummy", nilmrun.filters.dummy.filterfunc, data)
|
"dummy", nilmrun.testfilter.test, data)
|
||||||
|
|
||||||
class Server(object):
|
class Server(object):
|
||||||
def __init__(self, host = '127.0.0.1', port = 8080,
|
def __init__(self, host = '127.0.0.1', port = 8080,
|
||||||
@@ -198,7 +198,7 @@ class Server(object):
|
|||||||
manager = nilmrun.processmanager.ProcessManager()
|
manager = nilmrun.processmanager.ProcessManager()
|
||||||
root = App()
|
root = App()
|
||||||
root.process = AppProcess(manager)
|
root.process = AppProcess(manager)
|
||||||
root.filter = AppFilter(manager)
|
root.run = AppRun(manager)
|
||||||
cherrypy.tree.apps = {}
|
cherrypy.tree.apps = {}
|
||||||
cherrypy.tree.mount(root, basepath, config = { "/" : app_config })
|
cherrypy.tree.mount(root, basepath, config = { "/" : app_config })
|
||||||
|
|
||||||
|
@@ -6,7 +6,7 @@ import signal
|
|||||||
import sys
|
import sys
|
||||||
|
|
||||||
# This is just for testing the process management.
|
# This is just for testing the process management.
|
||||||
def filterfunc(n):
|
def test(n):
|
||||||
n = int(n)
|
n = int(n)
|
||||||
if n < 0: # raise an exception
|
if n < 0: # raise an exception
|
||||||
raise Exception("test exception")
|
raise Exception("test exception")
|
@@ -1,3 +1,3 @@
|
|||||||
test_client.py
|
test_nilmrun.py
|
||||||
|
|
||||||
test_*.py
|
test_*.py
|
||||||
|
@@ -25,6 +25,7 @@ import urllib2
|
|||||||
from urllib2 import urlopen, HTTPError
|
from urllib2 import urlopen, HTTPError
|
||||||
import requests
|
import requests
|
||||||
import pprint
|
import pprint
|
||||||
|
import textwrap
|
||||||
|
|
||||||
from testutil.helpers import *
|
from testutil.helpers import *
|
||||||
|
|
||||||
@@ -46,15 +47,29 @@ def teardown_module():
|
|||||||
|
|
||||||
class TestClient(object):
|
class TestClient(object):
|
||||||
|
|
||||||
def wait_end(self, client, pid, timeout = 5):
|
def wait_kill(self, client, pid, timeout = 1):
|
||||||
|
time.sleep(timeout)
|
||||||
|
status = client.get("/process/status", { "pid": pid })
|
||||||
|
if not status["alive"]:
|
||||||
|
raise AssertionError("died before we could kill it")
|
||||||
|
status = client.post("/process/remove", { "pid": pid })
|
||||||
|
if status["alive"]:
|
||||||
|
raise AssertionError("didn't get killed")
|
||||||
|
return status
|
||||||
|
|
||||||
|
def wait_end(self, client, pid, timeout = 5, remove = True):
|
||||||
start = time.time()
|
start = time.time()
|
||||||
status = None
|
status = None
|
||||||
while (time.time() - start) < timeout:
|
while (time.time() - start) < timeout:
|
||||||
status = client.get("/process/status", { "pid": pid })
|
status = client.get("/process/status", { "pid": pid })
|
||||||
if status["alive"] == False:
|
if status["alive"] == False:
|
||||||
return status
|
break
|
||||||
|
else:
|
||||||
raise AssertionError("process " + str(pid) + " didn't die in " +
|
raise AssertionError("process " + str(pid) + " didn't die in " +
|
||||||
str(timeout) + " seconds: " + repr(status))
|
str(timeout) + " seconds: " + repr(status))
|
||||||
|
if remove:
|
||||||
|
status = client.post("/process/remove", { "pid": pid })
|
||||||
|
return status
|
||||||
|
|
||||||
def test_client_01_basic(self):
|
def test_client_01_basic(self):
|
||||||
client = HTTPClient(baseurl = testurl)
|
client = HTTPClient(baseurl = testurl)
|
||||||
@@ -83,7 +98,7 @@ class TestClient(object):
|
|||||||
client = HTTPClient(baseurl = testurl, post_json = True)
|
client = HTTPClient(baseurl = testurl, post_json = True)
|
||||||
|
|
||||||
# start dummy filter
|
# start dummy filter
|
||||||
pid = client.post("/filter/dummy", { "data": 30 })
|
pid = client.post("/run/testfilter", { "data": 30 })
|
||||||
eq_(client.get("/process/list"), [pid])
|
eq_(client.get("/process/list"), [pid])
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
@@ -101,7 +116,7 @@ class TestClient(object):
|
|||||||
nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
|
nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
|
||||||
|
|
||||||
# See that it ended properly
|
# See that it ended properly
|
||||||
status = self.wait_end(client, pid)
|
status = self.wait_end(client, pid, remove = False)
|
||||||
in_("dummy 27\ndummy 28\ndummy 29\n", status["log"])
|
in_("dummy 27\ndummy 28\ndummy 29\n", status["log"])
|
||||||
eq_(status["exitcode"], 0)
|
eq_(status["exitcode"], 0)
|
||||||
|
|
||||||
@@ -117,7 +132,7 @@ class TestClient(object):
|
|||||||
client = HTTPClient(baseurl = testurl, post_json = True)
|
client = HTTPClient(baseurl = testurl, post_json = True)
|
||||||
|
|
||||||
# Trigger exception in filter
|
# Trigger exception in filter
|
||||||
pid = client.post("/filter/dummy", { "data": -1 })
|
pid = client.post("/run/testfilter", { "data": -1 })
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
status = client.get("/process/status", { "pid": pid })
|
status = client.get("/process/status", { "pid": pid })
|
||||||
eq_(status["alive"], False)
|
eq_(status["alive"], False)
|
||||||
@@ -126,7 +141,7 @@ class TestClient(object):
|
|||||||
client.post("/process/remove", { "pid": pid })
|
client.post("/process/remove", { "pid": pid })
|
||||||
|
|
||||||
# Kill a running filter by removing it early
|
# Kill a running filter by removing it early
|
||||||
newpid = client.post("/filter/dummy", { "data": 50 })
|
newpid = client.post("/run/testfilter", { "data": 50 })
|
||||||
ne_(newpid, pid)
|
ne_(newpid, pid)
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
start = time.time()
|
start = time.time()
|
||||||
@@ -141,7 +156,7 @@ class TestClient(object):
|
|||||||
eq_(client.get("/process/list"), [])
|
eq_(client.get("/process/list"), [])
|
||||||
|
|
||||||
# Try to remove a running filter that ignored SIGTERM
|
# Try to remove a running filter that ignored SIGTERM
|
||||||
pid = client.post("/filter/dummy", { "data": 0 })
|
pid = client.post("/run/testfilter", { "data": 0 })
|
||||||
start = time.time()
|
start = time.time()
|
||||||
status = client.post("/process/remove", { "pid": pid })
|
status = client.post("/process/remove", { "pid": pid })
|
||||||
elapsed = time.time() - start
|
elapsed = time.time() - start
|
||||||
@@ -150,14 +165,15 @@ class TestClient(object):
|
|||||||
eq_(status["alive"], False)
|
eq_(status["alive"], False)
|
||||||
ne_(status["exitcode"], 0)
|
ne_(status["exitcode"], 0)
|
||||||
|
|
||||||
|
@unittest.skip("trainola moving to nilmtools")
|
||||||
def test_client_05_trainola_simple(self):
|
def test_client_05_trainola_simple(self):
|
||||||
client = HTTPClient(baseurl = testurl, post_json = True)
|
client = HTTPClient(baseurl = testurl, post_json = True)
|
||||||
pid = client.post("/filter/trainola", { "data": {} })
|
pid = client.post("/run/trainola", { "data": {} })
|
||||||
status = self.wait_end(client, pid)
|
status = self.wait_end(client, pid, remove = False)
|
||||||
ne_(status["exitcode"], 0)
|
ne_(status["exitcode"], 0)
|
||||||
status = client.post("/process/remove", { "pid": pid })
|
status = client.post("/process/remove", { "pid": pid })
|
||||||
|
|
||||||
@unittest.skip("needs a running nilmdb")
|
@unittest.skip("needs a running nilmdb; trainola moved to nilmtools")
|
||||||
def test_client_06_trainola(self):
|
def test_client_06_trainola(self):
|
||||||
client = HTTPClient(baseurl = testurl, post_json = True)
|
client = HTTPClient(baseurl = testurl, post_json = True)
|
||||||
|
|
||||||
@@ -191,7 +207,7 @@ class TestClient(object):
|
|||||||
}
|
}
|
||||||
|
|
||||||
# start trainola
|
# start trainola
|
||||||
pid = client.post("/filter/trainola", { "data": data })
|
pid = client.post("/run/trainola", { "data": data })
|
||||||
|
|
||||||
# wait for it to finish
|
# wait for it to finish
|
||||||
for i in range(60):
|
for i in range(60):
|
||||||
@@ -213,25 +229,16 @@ class TestClient(object):
|
|||||||
if i < 3:
|
if i < 3:
|
||||||
raise AssertionError("too fast?")
|
raise AssertionError("too fast?")
|
||||||
|
|
||||||
def test_client_07_process_command(self):
|
def test_client_07_run_command(self):
|
||||||
client = HTTPClient(baseurl = testurl, post_json = True)
|
client = HTTPClient(baseurl = testurl, post_json = True)
|
||||||
eq_(client.get("/process/list"), [])
|
eq_(client.get("/process/list"), [])
|
||||||
|
|
||||||
def do(args, kill):
|
def do(argv, kill):
|
||||||
pid = client.post("/process/command", { "args": args } )
|
pid = client.post("/run/command", { "argv": argv } )
|
||||||
eq_(client.get("/process/list"), [pid])
|
eq_(client.get("/process/list"), [pid])
|
||||||
if kill:
|
if kill:
|
||||||
time.sleep(1)
|
return self.wait_kill(client, pid)
|
||||||
status = client.get("/process/status", { "pid": pid })
|
return self.wait_end(client, pid)
|
||||||
if not status["alive"]:
|
|
||||||
raise AssertionError("died before we could kill it")
|
|
||||||
status = client.post("/process/remove", { "pid": pid })
|
|
||||||
if status["alive"]:
|
|
||||||
raise AssertionError("didn't get killed")
|
|
||||||
else:
|
|
||||||
self.wait_end(client, pid)
|
|
||||||
status = client.post("/process/remove", { "pid": pid })
|
|
||||||
return status
|
|
||||||
|
|
||||||
# Simple command
|
# Simple command
|
||||||
status = do(["pwd"], False)
|
status = do(["pwd"], False)
|
||||||
@@ -250,3 +257,69 @@ class TestClient(object):
|
|||||||
# Kill a slow command
|
# Kill a slow command
|
||||||
status = do(["sleep", "60"], True)
|
status = do(["sleep", "60"], True)
|
||||||
ne_(status["exitcode"], 0)
|
ne_(status["exitcode"], 0)
|
||||||
|
|
||||||
|
def test_client_08_run_code(self):
|
||||||
|
client = HTTPClient(baseurl = testurl, post_json = True)
|
||||||
|
eq_(client.get("/process/list"), [])
|
||||||
|
|
||||||
|
def do(code, args, kill):
|
||||||
|
pid = client.post("/run/code", { "code": code, "args": args } )
|
||||||
|
eq_(client.get("/process/list"), [pid])
|
||||||
|
if kill:
|
||||||
|
return self.wait_kill(client, pid)
|
||||||
|
return self.wait_end(client, pid)
|
||||||
|
|
||||||
|
# basic code snippet
|
||||||
|
code=textwrap.dedent("""
|
||||||
|
print 'hello'
|
||||||
|
def foo(arg):
|
||||||
|
print 'world'
|
||||||
|
""")
|
||||||
|
status = do(code, [], False)
|
||||||
|
eq_("hello\n", status["log"])
|
||||||
|
eq_(status["exitcode"], 0)
|
||||||
|
|
||||||
|
# compile error
|
||||||
|
code=textwrap.dedent("""
|
||||||
|
def foo(arg:
|
||||||
|
print 'hello'
|
||||||
|
""")
|
||||||
|
status = do(code, [], False)
|
||||||
|
in_("SyntaxError", status["log"])
|
||||||
|
eq_(status["exitcode"], 1)
|
||||||
|
|
||||||
|
# traceback in user code should be formatted nicely
|
||||||
|
code=textwrap.dedent("""
|
||||||
|
def foo(arg):
|
||||||
|
raise Exception(arg)
|
||||||
|
foo(123)
|
||||||
|
""")
|
||||||
|
status = do(code, [], False)
|
||||||
|
eq_('Traceback (most recent call last):\n' +
|
||||||
|
' File "<user-code>", line 4, in <module>\n' +
|
||||||
|
' foo(123)\n' +
|
||||||
|
' File "<user-code>", line 3, in foo\n' +
|
||||||
|
' raise Exception(arg)\n' +
|
||||||
|
'Exception: 123\n', status["log"])
|
||||||
|
eq_(status["exitcode"], 1)
|
||||||
|
|
||||||
|
# argument handling (strings come in as unicode)
|
||||||
|
code=textwrap.dedent("""
|
||||||
|
import sys
|
||||||
|
print sys.argv[1].encode('ascii'), sys.argv[2]
|
||||||
|
sys.exit(0) # also test raising SystemExit
|
||||||
|
""")
|
||||||
|
status = do(code, ["hello", 123], False)
|
||||||
|
eq_(status["log"], "hello 123\n")
|
||||||
|
eq_(status["exitcode"], 0)
|
||||||
|
|
||||||
|
# try killing a long-running process
|
||||||
|
code=textwrap.dedent("""
|
||||||
|
import time
|
||||||
|
print 'hello'
|
||||||
|
time.sleep(60)
|
||||||
|
print 'world'
|
||||||
|
""")
|
||||||
|
status = do(code, [], True)
|
||||||
|
eq_(status["log"], "hello\n")
|
||||||
|
ne_(status["exitcode"], 0)
|
Reference in New Issue
Block a user