6 Commits

8 changed files with 53 additions and 338 deletions

View File

@@ -21,13 +21,13 @@ arbitrary commands.
SSLEngine On
WSGIScriptAlias /nilmrun /home/nilm/nilmrun.wsgi
WSGIApplicationGroup nilmrun-appgroup
WSGIProcessGroup nilmrun-procgroup
WSGIDaemonProcess nilmrun-procgroup threads=32 user=nilm group=nilm
# Access control example:
<Location /nilmrun>
WSGIApplicationGroup nilmrun-appgroup
SSLRequireSSL
# Access control example:
Order deny,allow
Deny from all
Allow from 1.2.3.4

View File

@@ -1 +0,0 @@
# Filters

View File

@@ -1,260 +0,0 @@
#!/usr/bin/python
from nilmdb.utils.printf import *
import nilmdb.client
import nilmtools.filter
from nilmdb.utils.time import (timestamp_to_human,
timestamp_to_seconds,
seconds_to_timestamp)
from nilmdb.utils.interval import Interval
import numpy as np
import scipy
import scipy.signal
from numpy.core.umath_tests import inner1d
import nilmrun
from collections import OrderedDict
class DataError(ValueError):
pass
class Data(object):
def __init__(self, name, url, stream, start, end, columns):
"""Initialize, get stream info, check columns"""
self.name = name
self.url = url
self.stream = stream
self.start = start
self.end = end
# Get stream info
self.client = nilmdb.client.numpyclient.NumpyClient(url)
self.info = nilmtools.filter.get_stream_info(self.client, stream)
# Build up name => index mapping for the columns
self.columns = OrderedDict()
for c in columns:
if (c['name'] in self.columns.keys() or
c['index'] in self.columns.values()):
raise DataError("duplicated columns")
if (c['index'] < 0 or c['index'] >= self.info.layout_count):
raise DataError("bad column number")
self.columns[c['name']] = c['index']
if not len(self.columns):
raise DataError("no columns")
# Count points
self.count = self.client.stream_count(self.stream, self.start, self.end)
def __str__(self):
return sprintf("%-20s: %s%s, %s rows",
self.name, self.stream, str(self.columns.keys()),
self.count)
def fetch(self, min_rows = 10, max_rows = 100000):
"""Fetch all the data into self.data. This is intended for
exemplars, and can only handle a relatively small number of
rows"""
# Verify count
if self.count == 0:
raise DataError("No data in this exemplar!")
if self.count < min_rows:
raise DataError("Too few data points: " + str(self.count))
if self.count > max_rows:
raise DataError("Too many data points: " + str(self.count))
# Extract the data
datagen = self.client.stream_extract_numpy(self.stream,
self.start, self.end,
self.info.layout,
maxrows = self.count)
self.data = list(datagen)[0]
# Discard timestamp
self.data = self.data[:,1:]
# Subtract the mean from each column
self.data = self.data - self.data.mean(axis=0)
# Get scale factors for each column by computing dot product
# of each column with itself.
self.scale = inner1d(self.data.T, self.data.T)
# Ensure a minimum (nonzero) scale and convert to list
self.scale = np.maximum(self.scale, [1e-9]).tolist()
def process(main, function, args = None, rows = 200000):
"""Process through the data; similar to nilmtools.Filter.process_numpy"""
if args is None:
args = []
extractor = main.client.stream_extract_numpy
old_array = np.array([])
for new_array in extractor(main.stream, main.start, main.end,
layout = main.info.layout, maxrows = rows):
# If we still had old data left, combine it
if old_array.shape[0] != 0:
array = np.vstack((old_array, new_array))
else:
array = new_array
# Process it
processed = function(array, args)
# Save the unprocessed parts
if processed >= 0:
old_array = array[processed:]
else:
raise Exception(sprintf("%s return value %s must be >= 0",
str(function), str(processed)))
# Warn if there's too much data remaining
if old_array.shape[0] > 3 * rows:
printf("warning: %d unprocessed rows in buffer\n",
old_array.shape[0])
# Handle leftover data
if old_array.shape[0] != 0:
processed = function(array, args)
def peak_detect(data, delta):
"""Simple min/max peak detection algorithm, taken from my code
in the disagg.m from the 10-8-5 paper"""
mins = [];
maxs = [];
cur_min = (None, np.inf)
cur_max = (None, -np.inf)
lookformax = False
for (n, p) in enumerate(data):
if p > cur_max[1]:
cur_max = (n, p)
if p < cur_min[1]:
cur_min = (n, p)
if lookformax:
if p < (cur_max[1] - delta):
maxs.append(cur_max)
cur_min = (n, p)
lookformax = False
else:
if p > (cur_min[1] + delta):
mins.append(cur_min)
cur_max = (n, p)
lookformax = True
return (mins, maxs)
def match(data, args):
"""Perform cross-correlation match"""
( columns, exemplars ) = args
nrows = data.shape[0]
# We want at least 10% more points than the widest exemplar.
widest = max([ x.count for x in exemplars ])
if (widest * 1.1) > nrows:
return 0
# This is how many points we'll consider valid in the
# cross-correlation.
valid = nrows + 1 - widest
matches = []
# Try matching against each of the exemplars
for e_num, e in enumerate(exemplars):
corrs = []
# Compute cross-correlation for each column
for c in e.columns:
a = data[:,columns[c] + 1]
b = e.data[:,e.columns[c]]
corr = scipy.signal.fftconvolve(a, np.flipud(b), 'valid')[0:valid]
# Scale by the norm of the exemplar
corr = corr / e.scale[columns[c]]
corrs.append(corr)
# Find the peaks using the column with the largest amplitude
biggest = e.scale.index(max(e.scale))
peaks_minmax = peak_detect(corrs[biggest], 0.1)
peaks = [ p[0] for p in peaks_minmax[1] ]
# Now look at every peak
for p in peaks:
# Correlation for each column must be close enough to 1.
for (corr, scale) in zip(corrs, e.scale):
# The accepted distance from 1 is based on the relative
# amplitude of the column. Use a linear mapping:
# scale 1.0 -> distance 0.1
# scale 0.0 -> distance 1.0
distance = 1 - 0.9 * (scale / e.scale[biggest])
if abs(corr[p] - 1) > distance:
# No match
break
else:
# Successful match
matches.append((p, e_num))
# Print matches
for (point, e_num) in sorted(matches):
# Ignore matches that showed up at the very tail of the window,
# and shorten the window accordingly. This is an attempt to avoid
# problems at chunk boundaries.
if point > (valid - 50):
valid -= 50
break
print "matched", data[point,0], "exemplar", exemplars[e_num].name
#from matplotlib import pyplot as p
#p.plot(data[:,1:3])
#p.show()
return max(valid, 0)
def trainola(conf):
# Load main stream data
print "Loading stream data"
main = Data(None, conf['url'], conf['stream'],
conf['start'], conf['end'], conf['columns'])
# Pull in the exemplar data
exemplars = []
for n, e in enumerate(conf['exemplars']):
print sprintf("Loading exemplar %d: %s", n, e['name'])
ex = Data(e['name'], e['url'], e['stream'],
e['start'], e['end'], e['columns'])
ex.fetch()
exemplars.append(ex)
# Verify that the exemplar columns are all represented in the main data
for n, ex in enumerate(exemplars):
for col in ex.columns:
if col not in main.columns:
raise DataError(sprintf("Exemplar %d column %s is not "
"available in main data", n, col))
# Process the main data
process(main, match, (main.columns, exemplars))
return "done"
filterfunc = trainola
def main(argv = None):
import simplejson as json
import argparse
import sys
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmrun.__version__,
description = """Run Trainola using parameters passed in as
JSON-formatted data.""")
parser.add_argument("file", metavar="FILE", nargs="?",
type=argparse.FileType('r'), default=sys.stdin)
args = parser.parse_args(argv)
conf = json.loads(args.file.read())
result = trainola(conf)
print json.dumps(result, sort_keys = True, indent = 2 * ' ')
if __name__ == "__main__":
main()

View File

@@ -40,7 +40,6 @@ class LogReceiver(object):
class Process(object):
"""Spawn and manage a process that calls a Python function"""
def __init__(self, name, function, parameters):
self.parameters = parameters
self.start_time = None
self.name = name
@@ -173,25 +172,30 @@ def _exec_user_code(codeargs): # pragma: no cover (runs in subprocess)
codeobj = compile(code, '<user-code>', 'exec',
flags = 0, dont_inherit = 1)
exec(codeobj, module.__dict__, {})
except:
# Pull out the exception
info = sys.exc_info()
tblist = traceback.extract_tb(info[2])
except Exception:
try:
# Pull out the exception
info = sys.exc_info()
tblist = traceback.extract_tb(info[2])
# First entry is probably this code; get rid of it
if len(tblist) and tblist[0][2] == '_exec_user_code':
tblist = tblist[1:]
# First entry is probably this code; get rid of it
if len(tblist) and tblist[0][2] == '_exec_user_code':
tblist = tblist[1:]
# Add the user's source code to every line that's missing it
lines = code.splitlines()
for (n, (name, line, func, text)) in enumerate(tblist):
if name == '<user-code>' and text is None and line <= len(lines):
tblist[n] = (name, line, func, lines[line-1].strip())
# Add the user's source code to every line that's missing it
lines = code.splitlines()
maxline = len(lines)
for (n, (name, line, func, text)) in enumerate(tblist):
if name == '<user-code>' and text is None and line <= maxline:
tblist[n] = (name, line, func, lines[line-1].strip())
# Print it to stderr in the usual format
out = ['Traceback (most recent call last):\n']
out.extend(traceback.format_list(tblist))
out.extend(traceback.format_exception_only(info[0], info[1]))
# Format it in the usual manner
out = ['Traceback (most recent call last):\n']
out.extend(traceback.format_list(tblist))
out.extend(traceback.format_exception_only(info[0], info[1]))
finally:
# Need to explicitly delete traceback object to avoid ref cycle
del info
sys.stderr.write("".join(out))
sys.stderr.flush()
sys.exit(1)

View File

@@ -25,8 +25,7 @@ from nilmdb.server.serverutil import (
cherrypy_stop,
)
import nilmrun
import nilmrun.filters.trainola
import nilmrun.filters.dummy
import nilmrun.testfilter
# Add CORS_allow tool
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
@@ -69,7 +68,6 @@ class AppProcess(object):
"exitcode": self.manager[pid].exitcode,
"name": self.manager[pid].name,
"start_time": self.manager[pid].start_time,
"parameters": self.manager[pid].parameters,
"log": self.manager[pid].log,
}
@@ -124,6 +122,7 @@ class AppRun(object):
and 'argv[1:]' are arguments"""
return self.manager.run_command("command", argv)
# /run/code
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@@ -135,25 +134,15 @@ class AppRun(object):
(i.e., they end up in sys.argv[1:])"""
return self.manager.run_code("usercode", code, args)
# /run/trainola
# /run/testfilter
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@exception_to_httperror(KeyError, ValueError)
@cherrypy.tools.CORS_allow(methods = ["POST"])
def trainola(self, data):
def testfilter(self, data):
return self.manager.run_function(
"trainola", nilmrun.filters.trainola.filterfunc, data)
# /run/dummy
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@exception_to_httperror(KeyError, ValueError)
@cherrypy.tools.CORS_allow(methods = ["POST"])
def dummy(self, data):
return self.manager.run_function(
"dummy", nilmrun.filters.dummy.filterfunc, data)
"dummy", nilmrun.testfilter.test, data)
class Server(object):
def __init__(self, host = '127.0.0.1', port = 8080,

View File

@@ -6,7 +6,7 @@ import signal
import sys
# This is just for testing the process management.
def filterfunc(n):
def test(n):
n = int(n)
if n < 0: # raise an exception
raise Exception("test exception")

View File

@@ -1,3 +1,3 @@
test_client.py
test_nilmrun.py
test_*.py

View File

@@ -98,14 +98,13 @@ class TestClient(object):
client = HTTPClient(baseurl = testurl, post_json = True)
# start dummy filter
pid = client.post("/run/dummy", { "data": 30 })
pid = client.post("/run/testfilter", { "data": 30 })
eq_(client.get("/process/list"), [pid])
time.sleep(1)
# Verify that status looks OK
status = client.get("/process/status", { "pid": pid, "clear": True })
for x in [ "pid", "alive", "exitcode", "name",
"start_time", "parameters", "log" ]:
for x in [ "pid", "alive", "exitcode", "name", "start_time", "log" ]:
in_(x, status)
in_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
eq_(status["alive"], True)
@@ -132,7 +131,7 @@ class TestClient(object):
client = HTTPClient(baseurl = testurl, post_json = True)
# Trigger exception in filter
pid = client.post("/run/dummy", { "data": -1 })
pid = client.post("/run/testfilter", { "data": -1 })
time.sleep(0.5)
status = client.get("/process/status", { "pid": pid })
eq_(status["alive"], False)
@@ -141,7 +140,7 @@ class TestClient(object):
client.post("/process/remove", { "pid": pid })
# Kill a running filter by removing it early
newpid = client.post("/run/dummy", { "data": 50 })
newpid = client.post("/run/testfilter", { "data": 50 })
ne_(newpid, pid)
time.sleep(0.5)
start = time.time()
@@ -156,7 +155,7 @@ class TestClient(object):
eq_(client.get("/process/list"), [])
# Try to remove a running filter that ignored SIGTERM
pid = client.post("/run/dummy", { "data": 0 })
pid = client.post("/run/testfilter", { "data": 0 })
start = time.time()
status = client.post("/process/remove", { "pid": pid })
elapsed = time.time() - start
@@ -165,18 +164,11 @@ class TestClient(object):
eq_(status["alive"], False)
ne_(status["exitcode"], 0)
def test_client_05_trainola_simple(self):
@unittest.skip("needs a running nilmdb; trainola moved to nilmtools")
def test_client_05_trainola(self):
client = HTTPClient(baseurl = testurl, post_json = True)
pid = client.post("/run/trainola", { "data": {} })
status = self.wait_end(client, pid, remove = False)
ne_(status["exitcode"], 0)
status = client.post("/process/remove", { "pid": pid })
@unittest.skip("needs a running nilmdb")
def test_client_06_trainola(self):
client = HTTPClient(baseurl = testurl, post_json = True)
data = { "url": "http://bucket.mit.edu/nilmdb",
"dest_stream": "/sharon/prep-a-matches",
"stream": "/sharon/prep-a",
"start": 1366111383280463,
"end": 1366126163457797,
@@ -189,6 +181,7 @@ class TestClient(object):
"stream": "/sharon/prep-a",
"start": 1366260494269078,
"end": 1366260608185031,
"dest_column": 0,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
@@ -198,37 +191,26 @@ class TestClient(object):
"stream": "/sharon/prep-a",
"start": 1366260864215764,
"end": 1366260870882998,
"dest_column": 1,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
}
]
}
# start trainola
pid = client.post("/run/trainola", { "data": data })
# wait for it to finish
for i in range(60):
time.sleep(1)
if i == 2:
status = client.get("/process/status", { "pid": pid,
"clear": True })
in_("Loading stream data", status['log'])
elif i == 3:
status = client.get("/process/status", { "pid": pid })
nin_("Loading stream data", status['log'])
else:
status = client.get("/process/status", { "pid": pid })
pid = client.post("/run/code", { "code": "import nilmtools.trainola\n" +
"nilmtools.trainola.main()",
"args": [ data ] })
while True:
status = client.get("/process/status", { "pid": pid, "clear": 1 })
sys.stdout.write(status["log"])
sys.stdout.flush()
if status["alive"] == False:
break
else:
client.post("/process/remove", {"pid": pid })
raise AssertionError("took too long")
if i < 3:
raise AssertionError("too fast?")
status = client.post("/process/remove", { "pid": pid })
os._exit(int(status["exitcode"]))
def test_client_07_run_command(self):
def test_client_06_run_command(self):
client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("/process/list"), [])
@@ -257,7 +239,7 @@ class TestClient(object):
status = do(["sleep", "60"], True)
ne_(status["exitcode"], 0)
def test_client_08_run_code(self):
def test_client_07_run_code(self):
client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("/process/list"), [])
@@ -306,6 +288,7 @@ class TestClient(object):
code=textwrap.dedent("""
import sys
print sys.argv[1].encode('ascii'), sys.argv[2]
sys.exit(0) # also test raising SystemExit
""")
status = do(code, ["hello", 123], False)
eq_(status["log"], "hello 123\n")