Compare commits
11 Commits
nilmrun-0.
...
nilmrun-0.
Author | SHA1 | Date | |
---|---|---|---|
734e1d9b52 | |||
30a3559253 | |||
caad5dec04 | |||
e89a48dbb7 | |||
e2c9575937 | |||
258fe2358d | |||
f73de35ee6 | |||
65e48caf5f | |||
a9bac7d9a0 | |||
afd21bfef2 | |||
b228c3e35f |
10
.coveragerc
Normal file
10
.coveragerc
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
# -*- conf -*-
|
||||||
|
|
||||||
|
[run]
|
||||||
|
# branch = True
|
||||||
|
|
||||||
|
[report]
|
||||||
|
exclude_lines =
|
||||||
|
pragma: no cover
|
||||||
|
if 0:
|
||||||
|
omit = scripts,nilmrun/_version.py,nilmrun/filters/*
|
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,3 +1,4 @@
|
|||||||
|
.coverage
|
||||||
build/
|
build/
|
||||||
*.pyc
|
*.pyc
|
||||||
dist/
|
dist/
|
||||||
|
37
Makefile
37
Makefile
@@ -1,18 +1,15 @@
|
|||||||
URL="http://localhost/nilmdb"
|
# By default, run the tests.
|
||||||
|
all: test
|
||||||
|
|
||||||
all:
|
test2:
|
||||||
ifeq ($(INSIDE_EMACS), t)
|
nilmrun/trainola.py data.js
|
||||||
@make test
|
|
||||||
else
|
|
||||||
@echo "Try 'make install'"
|
|
||||||
endif
|
|
||||||
|
|
||||||
test:
|
|
||||||
echo No tests yet
|
|
||||||
|
|
||||||
version:
|
version:
|
||||||
python setup.py version
|
python setup.py version
|
||||||
|
|
||||||
|
build:
|
||||||
|
python setup.py build_ext --inplace
|
||||||
|
|
||||||
dist: sdist
|
dist: sdist
|
||||||
sdist:
|
sdist:
|
||||||
python setup.py sdist
|
python setup.py sdist
|
||||||
@@ -23,11 +20,29 @@ install:
|
|||||||
develop:
|
develop:
|
||||||
python setup.py develop
|
python setup.py develop
|
||||||
|
|
||||||
|
docs:
|
||||||
|
make -C docs
|
||||||
|
|
||||||
|
lint:
|
||||||
|
pylint --rcfile=.pylintrc nilmdb
|
||||||
|
|
||||||
|
test:
|
||||||
|
ifeq ($(INSIDE_EMACS), t)
|
||||||
|
# Use the slightly more flexible script
|
||||||
|
python setup.py build_ext --inplace
|
||||||
|
python tests/runtests.py
|
||||||
|
else
|
||||||
|
# Let setup.py check dependencies, build stuff, and run the test
|
||||||
|
python setup.py nosetests
|
||||||
|
endif
|
||||||
|
|
||||||
clean::
|
clean::
|
||||||
|
rm -f .coverage
|
||||||
find . -name '*pyc' | xargs rm -f
|
find . -name '*pyc' | xargs rm -f
|
||||||
rm -rf nilmtools.egg-info/ build/ MANIFEST.in
|
rm -rf nilmtools.egg-info/ build/ MANIFEST.in
|
||||||
|
make -C docs clean
|
||||||
|
|
||||||
gitclean::
|
gitclean::
|
||||||
git clean -dXf
|
git clean -dXf
|
||||||
|
|
||||||
.PHONY: all test version dist sdist install clean gitclean
|
.PHONY: all version dist sdist install docs lint test clean gitclean
|
||||||
|
28
data.js
Normal file
28
data.js
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
{ "url": "http://bucket.mit.edu/nilmdb",
|
||||||
|
"stream": "/sharon/prep-a",
|
||||||
|
"start": 1366111383280463,
|
||||||
|
"end": 1366126163457797,
|
||||||
|
"columns": [ { "name": "P1", "index": 0 },
|
||||||
|
{ "name": "Q1", "index": 1 },
|
||||||
|
{ "name": "P3", "index": 2 } ],
|
||||||
|
"exemplars": [
|
||||||
|
{ "name": "Boiler Pump ON",
|
||||||
|
"url": "http://bucket.mit.edu/nilmdb",
|
||||||
|
"stream": "/sharon/prep-a",
|
||||||
|
"start": 1366260494269078,
|
||||||
|
"end": 1366260608185031,
|
||||||
|
"columns": [ { "name": "P1", "index": 0 },
|
||||||
|
{ "name": "Q1", "index": 1 }
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{ "name": "Boiler Pump OFF",
|
||||||
|
"url": "http://bucket.mit.edu/nilmdb",
|
||||||
|
"stream": "/sharon/prep-a",
|
||||||
|
"start": 1366260864215764,
|
||||||
|
"end": 1366260870882998,
|
||||||
|
"columns": [ { "name": "P1", "index": 0 },
|
||||||
|
{ "name": "Q1", "index": 1 }
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
@@ -13,9 +13,13 @@ First, create a WSGI script `/home/nilm/nilmrun.wsgi` containing:
|
|||||||
|
|
||||||
The first parameter is the path part of the URL.
|
The first parameter is the path part of the URL.
|
||||||
|
|
||||||
Then, set up Apache with a configuration like:
|
Then, set up Apache with a configuration like below. SSL and access
|
||||||
|
control/authentication are strongly recommended since this can execute
|
||||||
|
arbitrary commands.
|
||||||
|
|
||||||
|
<VirtualHost *:443>
|
||||||
|
SSLEngine On
|
||||||
|
|
||||||
<VirtualHost>
|
|
||||||
WSGIScriptAlias /nilmrun /home/nilm/nilmrun.wsgi
|
WSGIScriptAlias /nilmrun /home/nilm/nilmrun.wsgi
|
||||||
WSGIApplicationGroup nilmrun-appgroup
|
WSGIApplicationGroup nilmrun-appgroup
|
||||||
WSGIProcessGroup nilmrun-procgroup
|
WSGIProcessGroup nilmrun-procgroup
|
||||||
@@ -23,6 +27,7 @@ Then, set up Apache with a configuration like:
|
|||||||
|
|
||||||
# Access control example:
|
# Access control example:
|
||||||
<Location /nilmrun>
|
<Location /nilmrun>
|
||||||
|
SSLRequireSSL
|
||||||
Order deny,allow
|
Order deny,allow
|
||||||
Deny from all
|
Deny from all
|
||||||
Allow from 1.2.3.4
|
Allow from 1.2.3.4
|
||||||
|
@@ -1,3 +1,4 @@
|
|||||||
|
import nilmrun.processmanager
|
||||||
|
|
||||||
from ._version import get_versions
|
from ._version import get_versions
|
||||||
__version__ = get_versions()['version']
|
__version__ = get_versions()['version']
|
@@ -181,7 +181,7 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False)
|
|||||||
|
|
||||||
tag_prefix = "nilmrun-"
|
tag_prefix = "nilmrun-"
|
||||||
parentdir_prefix = "nilmrun-"
|
parentdir_prefix = "nilmrun-"
|
||||||
versionfile_source = "src/_version.py"
|
versionfile_source = "nilmrun/_version.py"
|
||||||
|
|
||||||
def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
|
def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
|
||||||
variables = { "refnames": git_refnames, "full": git_full }
|
variables = { "refnames": git_refnames, "full": git_full }
|
1
nilmrun/filters/__init__.py
Normal file
1
nilmrun/filters/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
# Filters
|
22
nilmrun/filters/dummy.py
Normal file
22
nilmrun/filters/dummy.py
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
#!/usr/bin/python
|
||||||
|
|
||||||
|
from nilmdb.utils.printf import *
|
||||||
|
import time
|
||||||
|
import signal
|
||||||
|
import sys
|
||||||
|
|
||||||
|
# This is just for testing the process management.
|
||||||
|
def filterfunc(n):
|
||||||
|
n = int(n)
|
||||||
|
if n < 0: # raise an exception
|
||||||
|
raise Exception("test exception")
|
||||||
|
if n == 0: # ignore SIGTERM and count to 100
|
||||||
|
n = 100
|
||||||
|
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
||||||
|
for x in range(n):
|
||||||
|
s = sprintf("dummy %d\n", x)
|
||||||
|
if x & 1:
|
||||||
|
sys.stdout.write(s)
|
||||||
|
else:
|
||||||
|
sys.stderr.write(s)
|
||||||
|
time.sleep(0.1)
|
260
nilmrun/filters/trainola.py
Normal file
260
nilmrun/filters/trainola.py
Normal file
@@ -0,0 +1,260 @@
|
|||||||
|
#!/usr/bin/python
|
||||||
|
|
||||||
|
from nilmdb.utils.printf import *
|
||||||
|
import nilmdb.client
|
||||||
|
import nilmtools.filter
|
||||||
|
from nilmdb.utils.time import (timestamp_to_human,
|
||||||
|
timestamp_to_seconds,
|
||||||
|
seconds_to_timestamp)
|
||||||
|
from nilmdb.utils.interval import Interval
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import scipy
|
||||||
|
import scipy.signal
|
||||||
|
from numpy.core.umath_tests import inner1d
|
||||||
|
import nilmrun
|
||||||
|
from collections import OrderedDict
|
||||||
|
|
||||||
|
class DataError(ValueError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class Data(object):
|
||||||
|
def __init__(self, name, url, stream, start, end, columns):
|
||||||
|
"""Initialize, get stream info, check columns"""
|
||||||
|
self.name = name
|
||||||
|
self.url = url
|
||||||
|
self.stream = stream
|
||||||
|
self.start = start
|
||||||
|
self.end = end
|
||||||
|
|
||||||
|
# Get stream info
|
||||||
|
self.client = nilmdb.client.numpyclient.NumpyClient(url)
|
||||||
|
self.info = nilmtools.filter.get_stream_info(self.client, stream)
|
||||||
|
|
||||||
|
# Build up name => index mapping for the columns
|
||||||
|
self.columns = OrderedDict()
|
||||||
|
for c in columns:
|
||||||
|
if (c['name'] in self.columns.keys() or
|
||||||
|
c['index'] in self.columns.values()):
|
||||||
|
raise DataError("duplicated columns")
|
||||||
|
if (c['index'] < 0 or c['index'] >= self.info.layout_count):
|
||||||
|
raise DataError("bad column number")
|
||||||
|
self.columns[c['name']] = c['index']
|
||||||
|
if not len(self.columns):
|
||||||
|
raise DataError("no columns")
|
||||||
|
|
||||||
|
# Count points
|
||||||
|
self.count = self.client.stream_count(self.stream, self.start, self.end)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return sprintf("%-20s: %s%s, %s rows",
|
||||||
|
self.name, self.stream, str(self.columns.keys()),
|
||||||
|
self.count)
|
||||||
|
|
||||||
|
def fetch(self, min_rows = 10, max_rows = 100000):
|
||||||
|
"""Fetch all the data into self.data. This is intended for
|
||||||
|
exemplars, and can only handle a relatively small number of
|
||||||
|
rows"""
|
||||||
|
# Verify count
|
||||||
|
if self.count == 0:
|
||||||
|
raise DataError("No data in this exemplar!")
|
||||||
|
if self.count < min_rows:
|
||||||
|
raise DataError("Too few data points: " + str(self.count))
|
||||||
|
if self.count > max_rows:
|
||||||
|
raise DataError("Too many data points: " + str(self.count))
|
||||||
|
|
||||||
|
# Extract the data
|
||||||
|
datagen = self.client.stream_extract_numpy(self.stream,
|
||||||
|
self.start, self.end,
|
||||||
|
self.info.layout,
|
||||||
|
maxrows = self.count)
|
||||||
|
self.data = list(datagen)[0]
|
||||||
|
|
||||||
|
# Discard timestamp
|
||||||
|
self.data = self.data[:,1:]
|
||||||
|
|
||||||
|
# Subtract the mean from each column
|
||||||
|
self.data = self.data - self.data.mean(axis=0)
|
||||||
|
|
||||||
|
# Get scale factors for each column by computing dot product
|
||||||
|
# of each column with itself.
|
||||||
|
self.scale = inner1d(self.data.T, self.data.T)
|
||||||
|
|
||||||
|
# Ensure a minimum (nonzero) scale and convert to list
|
||||||
|
self.scale = np.maximum(self.scale, [1e-9]).tolist()
|
||||||
|
|
||||||
|
def process(main, function, args = None, rows = 200000):
|
||||||
|
"""Process through the data; similar to nilmtools.Filter.process_numpy"""
|
||||||
|
if args is None:
|
||||||
|
args = []
|
||||||
|
|
||||||
|
extractor = main.client.stream_extract_numpy
|
||||||
|
old_array = np.array([])
|
||||||
|
for new_array in extractor(main.stream, main.start, main.end,
|
||||||
|
layout = main.info.layout, maxrows = rows):
|
||||||
|
# If we still had old data left, combine it
|
||||||
|
if old_array.shape[0] != 0:
|
||||||
|
array = np.vstack((old_array, new_array))
|
||||||
|
else:
|
||||||
|
array = new_array
|
||||||
|
|
||||||
|
# Process it
|
||||||
|
processed = function(array, args)
|
||||||
|
|
||||||
|
# Save the unprocessed parts
|
||||||
|
if processed >= 0:
|
||||||
|
old_array = array[processed:]
|
||||||
|
else:
|
||||||
|
raise Exception(sprintf("%s return value %s must be >= 0",
|
||||||
|
str(function), str(processed)))
|
||||||
|
|
||||||
|
# Warn if there's too much data remaining
|
||||||
|
if old_array.shape[0] > 3 * rows:
|
||||||
|
printf("warning: %d unprocessed rows in buffer\n",
|
||||||
|
old_array.shape[0])
|
||||||
|
|
||||||
|
# Handle leftover data
|
||||||
|
if old_array.shape[0] != 0:
|
||||||
|
processed = function(array, args)
|
||||||
|
|
||||||
|
def peak_detect(data, delta):
|
||||||
|
"""Simple min/max peak detection algorithm, taken from my code
|
||||||
|
in the disagg.m from the 10-8-5 paper"""
|
||||||
|
mins = [];
|
||||||
|
maxs = [];
|
||||||
|
cur_min = (None, np.inf)
|
||||||
|
cur_max = (None, -np.inf)
|
||||||
|
lookformax = False
|
||||||
|
for (n, p) in enumerate(data):
|
||||||
|
if p > cur_max[1]:
|
||||||
|
cur_max = (n, p)
|
||||||
|
if p < cur_min[1]:
|
||||||
|
cur_min = (n, p)
|
||||||
|
if lookformax:
|
||||||
|
if p < (cur_max[1] - delta):
|
||||||
|
maxs.append(cur_max)
|
||||||
|
cur_min = (n, p)
|
||||||
|
lookformax = False
|
||||||
|
else:
|
||||||
|
if p > (cur_min[1] + delta):
|
||||||
|
mins.append(cur_min)
|
||||||
|
cur_max = (n, p)
|
||||||
|
lookformax = True
|
||||||
|
return (mins, maxs)
|
||||||
|
|
||||||
|
def match(data, args):
|
||||||
|
"""Perform cross-correlation match"""
|
||||||
|
( columns, exemplars ) = args
|
||||||
|
nrows = data.shape[0]
|
||||||
|
|
||||||
|
# We want at least 10% more points than the widest exemplar.
|
||||||
|
widest = max([ x.count for x in exemplars ])
|
||||||
|
if (widest * 1.1) > nrows:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# This is how many points we'll consider valid in the
|
||||||
|
# cross-correlation.
|
||||||
|
valid = nrows + 1 - widest
|
||||||
|
matches = []
|
||||||
|
|
||||||
|
# Try matching against each of the exemplars
|
||||||
|
for e_num, e in enumerate(exemplars):
|
||||||
|
corrs = []
|
||||||
|
|
||||||
|
# Compute cross-correlation for each column
|
||||||
|
for c in e.columns:
|
||||||
|
a = data[:,columns[c] + 1]
|
||||||
|
b = e.data[:,e.columns[c]]
|
||||||
|
corr = scipy.signal.fftconvolve(a, np.flipud(b), 'valid')[0:valid]
|
||||||
|
|
||||||
|
# Scale by the norm of the exemplar
|
||||||
|
corr = corr / e.scale[columns[c]]
|
||||||
|
corrs.append(corr)
|
||||||
|
|
||||||
|
# Find the peaks using the column with the largest amplitude
|
||||||
|
biggest = e.scale.index(max(e.scale))
|
||||||
|
peaks_minmax = peak_detect(corrs[biggest], 0.1)
|
||||||
|
peaks = [ p[0] for p in peaks_minmax[1] ]
|
||||||
|
|
||||||
|
# Now look at every peak
|
||||||
|
for p in peaks:
|
||||||
|
# Correlation for each column must be close enough to 1.
|
||||||
|
for (corr, scale) in zip(corrs, e.scale):
|
||||||
|
# The accepted distance from 1 is based on the relative
|
||||||
|
# amplitude of the column. Use a linear mapping:
|
||||||
|
# scale 1.0 -> distance 0.1
|
||||||
|
# scale 0.0 -> distance 1.0
|
||||||
|
distance = 1 - 0.9 * (scale / e.scale[biggest])
|
||||||
|
if abs(corr[p] - 1) > distance:
|
||||||
|
# No match
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
# Successful match
|
||||||
|
matches.append((p, e_num))
|
||||||
|
|
||||||
|
# Print matches
|
||||||
|
for (point, e_num) in sorted(matches):
|
||||||
|
# Ignore matches that showed up at the very tail of the window,
|
||||||
|
# and shorten the window accordingly. This is an attempt to avoid
|
||||||
|
# problems at chunk boundaries.
|
||||||
|
if point > (valid - 50):
|
||||||
|
valid -= 50
|
||||||
|
break
|
||||||
|
print "matched", data[point,0], "exemplar", exemplars[e_num].name
|
||||||
|
|
||||||
|
#from matplotlib import pyplot as p
|
||||||
|
#p.plot(data[:,1:3])
|
||||||
|
#p.show()
|
||||||
|
|
||||||
|
return max(valid, 0)
|
||||||
|
|
||||||
|
def trainola(conf):
|
||||||
|
# Load main stream data
|
||||||
|
print "Loading stream data"
|
||||||
|
main = Data(None, conf['url'], conf['stream'],
|
||||||
|
conf['start'], conf['end'], conf['columns'])
|
||||||
|
|
||||||
|
# Pull in the exemplar data
|
||||||
|
exemplars = []
|
||||||
|
for n, e in enumerate(conf['exemplars']):
|
||||||
|
print sprintf("Loading exemplar %d: %s", n, e['name'])
|
||||||
|
ex = Data(e['name'], e['url'], e['stream'],
|
||||||
|
e['start'], e['end'], e['columns'])
|
||||||
|
ex.fetch()
|
||||||
|
exemplars.append(ex)
|
||||||
|
|
||||||
|
# Verify that the exemplar columns are all represented in the main data
|
||||||
|
for n, ex in enumerate(exemplars):
|
||||||
|
for col in ex.columns:
|
||||||
|
if col not in main.columns:
|
||||||
|
raise DataError(sprintf("Exemplar %d column %s is not "
|
||||||
|
"available in main data", n, col))
|
||||||
|
|
||||||
|
# Process the main data
|
||||||
|
process(main, match, (main.columns, exemplars))
|
||||||
|
|
||||||
|
return "done"
|
||||||
|
|
||||||
|
filterfunc = trainola
|
||||||
|
|
||||||
|
def main(argv = None):
|
||||||
|
import simplejson as json
|
||||||
|
import argparse
|
||||||
|
import sys
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
formatter_class = argparse.RawDescriptionHelpFormatter,
|
||||||
|
version = nilmrun.__version__,
|
||||||
|
description = """Run Trainola using parameters passed in as
|
||||||
|
JSON-formatted data.""")
|
||||||
|
parser.add_argument("file", metavar="FILE", nargs="?",
|
||||||
|
type=argparse.FileType('r'), default=sys.stdin)
|
||||||
|
args = parser.parse_args(argv)
|
||||||
|
|
||||||
|
conf = json.loads(args.file.read())
|
||||||
|
result = trainola(conf)
|
||||||
|
print json.dumps(result, sort_keys = True, indent = 2 * ' ')
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
181
nilmrun/processmanager.py
Normal file
181
nilmrun/processmanager.py
Normal file
@@ -0,0 +1,181 @@
|
|||||||
|
#!/usr/bin/python
|
||||||
|
|
||||||
|
from nilmdb.utils.printf import *
|
||||||
|
|
||||||
|
import threading
|
||||||
|
import multiprocessing
|
||||||
|
import cStringIO
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import signal
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
import subprocess
|
||||||
|
import psutil
|
||||||
|
|
||||||
|
class LogReceiver(object):
|
||||||
|
"""Spawn a thread that listens to a pipe for log messages,
|
||||||
|
and stores them locally."""
|
||||||
|
def __init__(self, pipe):
|
||||||
|
self.pipe = pipe
|
||||||
|
self.log = cStringIO.StringIO()
|
||||||
|
self.thread = threading.Thread(target = self.run)
|
||||||
|
self.thread.start()
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while True:
|
||||||
|
data = os.read(self.pipe, 65536)
|
||||||
|
if not data:
|
||||||
|
os.close(self.pipe)
|
||||||
|
return
|
||||||
|
self.log.write(data)
|
||||||
|
|
||||||
|
def getvalue(self):
|
||||||
|
return self.log.getvalue()
|
||||||
|
|
||||||
|
def clear(self):
|
||||||
|
self.log = cStringIO.StringIO()
|
||||||
|
|
||||||
|
class Process(object):
|
||||||
|
"""Spawn and manage a process that calls a Python function"""
|
||||||
|
def __init__(self, name, function, parameters):
|
||||||
|
self.parameters = parameters
|
||||||
|
self.start_time = None
|
||||||
|
self.name = name
|
||||||
|
|
||||||
|
# Use a pipe for communicating log data
|
||||||
|
(rpipe, wpipe) = os.pipe()
|
||||||
|
self._log = LogReceiver(rpipe)
|
||||||
|
|
||||||
|
# Start the function in a new process
|
||||||
|
self._process = multiprocessing.Process(
|
||||||
|
target = self._trampoline, name = name,
|
||||||
|
args = (function, rpipe, wpipe, parameters))
|
||||||
|
self._process.daemon = True
|
||||||
|
self._process.start()
|
||||||
|
|
||||||
|
# Close the writer end of the pipe, get process info
|
||||||
|
os.close(wpipe)
|
||||||
|
self.start_time = time.time()
|
||||||
|
self.pid = str(uuid.uuid1(self._process.pid or 0))
|
||||||
|
|
||||||
|
def _trampoline(self, func, rpipe, wpipe, param): # pragma: no cover
|
||||||
|
# No coverage report for this, because it's executed in a subprocess
|
||||||
|
"""Trampoline function to set up stdio and call the real function."""
|
||||||
|
# Close the reader end of the pipe
|
||||||
|
os.close(rpipe)
|
||||||
|
|
||||||
|
# Like os.close() but ignores errors
|
||||||
|
def tryclose(fd):
|
||||||
|
try:
|
||||||
|
os.close(fd)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Remap stdio to go to the pipe. We do this at the OS level,
|
||||||
|
# replacing FDs, so that future spawned processes do the right thing.
|
||||||
|
|
||||||
|
# stdin
|
||||||
|
sys.stdin.close()
|
||||||
|
tryclose(0)
|
||||||
|
fd = os.open(os.devnull, os.O_RDONLY) # 0
|
||||||
|
sys.stdin = os.fdopen(fd, 'r', 0)
|
||||||
|
|
||||||
|
# stdout
|
||||||
|
sys.stdout.close()
|
||||||
|
tryclose(1)
|
||||||
|
fd = os.dup(wpipe) # 1
|
||||||
|
sys.stdout = os.fdopen(fd, 'w', 0)
|
||||||
|
|
||||||
|
# stdout
|
||||||
|
sys.stderr.close()
|
||||||
|
tryclose(2)
|
||||||
|
fd = os.dup(wpipe) # 2
|
||||||
|
sys.stderr = os.fdopen(fd, 'w', 0)
|
||||||
|
|
||||||
|
# Don't need this extra fd
|
||||||
|
os.close(wpipe)
|
||||||
|
|
||||||
|
# Ready to go -- call the function
|
||||||
|
func(param)
|
||||||
|
|
||||||
|
def terminate(self, timeout = 1.0):
|
||||||
|
"""Terminate a process, and all of its children that are in the same
|
||||||
|
process group."""
|
||||||
|
# First give it some time to die on its own
|
||||||
|
self._process.join(timeout)
|
||||||
|
if not self.alive:
|
||||||
|
return True
|
||||||
|
|
||||||
|
def getpgid(pid):
|
||||||
|
try:
|
||||||
|
return os.getpgid(pid)
|
||||||
|
except OSError: # pragma: no cover
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Find all children
|
||||||
|
group = getpgid(self._process.pid)
|
||||||
|
main = psutil.Process(self._process.pid)
|
||||||
|
allproc = [ main ] + main.get_children(recursive = True)
|
||||||
|
|
||||||
|
# Kill with SIGTERM, if they're still in this process group
|
||||||
|
for proc in allproc:
|
||||||
|
if getpgid(proc.pid) == group:
|
||||||
|
os.kill(proc.pid, signal.SIGTERM)
|
||||||
|
|
||||||
|
# Wait for it to die again
|
||||||
|
self._process.join(timeout)
|
||||||
|
if not self.alive:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# One more try with SIGKILL
|
||||||
|
for proc in allproc:
|
||||||
|
if getpgid(proc.pid) == group:
|
||||||
|
os.kill(proc.pid, signal.SIGKILL)
|
||||||
|
|
||||||
|
# See if it worked
|
||||||
|
self._process.join(timeout)
|
||||||
|
return not self.alive
|
||||||
|
|
||||||
|
def clear_log(self):
|
||||||
|
self._log.clear()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def log(self):
|
||||||
|
return self._log.getvalue()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def alive(self):
|
||||||
|
return self._process.is_alive()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def exitcode(self):
|
||||||
|
return self._process.exitcode
|
||||||
|
|
||||||
|
class ProcessManager(object):
|
||||||
|
"""Track and manage a collection of Process objects"""
|
||||||
|
def __init__(self):
|
||||||
|
self.processes = {}
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return iter(self.processes.keys())
|
||||||
|
|
||||||
|
def __getitem__(self, key):
|
||||||
|
return self.processes[key]
|
||||||
|
|
||||||
|
def run_python(self, name, function, parameters):
|
||||||
|
new = Process(name, function, parameters)
|
||||||
|
self.processes[new.pid] = new
|
||||||
|
return new.pid
|
||||||
|
|
||||||
|
def run_command(self, name, args):
|
||||||
|
def spwan_user_command(args): # pragma: no cover (runs in subprocess)
|
||||||
|
p = subprocess.Popen(args, close_fds = True, cwd = "/tmp")
|
||||||
|
sys.exit(p.wait())
|
||||||
|
return self.run_python(name, spwan_user_command, args)
|
||||||
|
|
||||||
|
def terminate(self, pid):
|
||||||
|
return self.processes[pid].terminate()
|
||||||
|
|
||||||
|
def remove(self, pid):
|
||||||
|
del self.processes[pid]
|
@@ -9,6 +9,7 @@ import decorator
|
|||||||
import psutil
|
import psutil
|
||||||
import traceback
|
import traceback
|
||||||
import argparse
|
import argparse
|
||||||
|
import time
|
||||||
|
|
||||||
import nilmdb
|
import nilmdb
|
||||||
from nilmdb.utils.printf import *
|
from nilmdb.utils.printf import *
|
||||||
@@ -24,12 +25,14 @@ from nilmdb.server.serverutil import (
|
|||||||
cherrypy_stop,
|
cherrypy_stop,
|
||||||
)
|
)
|
||||||
import nilmrun
|
import nilmrun
|
||||||
|
import nilmrun.filters.trainola
|
||||||
|
import nilmrun.filters.dummy
|
||||||
|
|
||||||
# Add CORS_allow tool
|
# Add CORS_allow tool
|
||||||
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
|
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
|
||||||
|
|
||||||
# CherryPy apps
|
# CherryPy apps
|
||||||
class NilmRunApp(object):
|
class App(object):
|
||||||
"""Root application for NILM runner"""
|
"""Root application for NILM runner"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@@ -40,7 +43,7 @@ class NilmRunApp(object):
|
|||||||
def index(self):
|
def index(self):
|
||||||
cherrypy.response.headers['Content-Type'] = 'text/plain'
|
cherrypy.response.headers['Content-Type'] = 'text/plain'
|
||||||
msg = sprintf("This is NilmRun version %s, running on host %s.\n",
|
msg = sprintf("This is NilmRun version %s, running on host %s.\n",
|
||||||
nilmdb.__version__, socket.getfqdn())
|
nilmrun.__version__, socket.getfqdn())
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
# /favicon.ico
|
# /favicon.ico
|
||||||
@@ -54,6 +57,94 @@ class NilmRunApp(object):
|
|||||||
def version(self):
|
def version(self):
|
||||||
return nilmrun.__version__
|
return nilmrun.__version__
|
||||||
|
|
||||||
|
class AppProcess(object):
|
||||||
|
|
||||||
|
def __init__(self, manager):
|
||||||
|
self.manager = manager
|
||||||
|
|
||||||
|
def process_status(self, pid):
|
||||||
|
return {
|
||||||
|
"pid": pid,
|
||||||
|
"alive": self.manager[pid].alive,
|
||||||
|
"exitcode": self.manager[pid].exitcode,
|
||||||
|
"name": self.manager[pid].name,
|
||||||
|
"start_time": self.manager[pid].start_time,
|
||||||
|
"parameters": self.manager[pid].parameters,
|
||||||
|
"log": self.manager[pid].log,
|
||||||
|
}
|
||||||
|
|
||||||
|
# /process/status
|
||||||
|
@cherrypy.expose
|
||||||
|
@cherrypy.tools.json_out()
|
||||||
|
def status(self, pid, clear = False):
|
||||||
|
"""Return status about a process. If clear = True, also clear
|
||||||
|
the log."""
|
||||||
|
if pid not in self.manager:
|
||||||
|
raise cherrypy.HTTPError("404 Not Found", "No such PID")
|
||||||
|
status = self.process_status(pid)
|
||||||
|
if clear:
|
||||||
|
self.manager[pid].clear_log()
|
||||||
|
return status
|
||||||
|
|
||||||
|
# /process/list
|
||||||
|
@cherrypy.expose
|
||||||
|
@cherrypy.tools.json_out()
|
||||||
|
def list(self):
|
||||||
|
"""Return a list of processes in the manager."""
|
||||||
|
return list(self.manager)
|
||||||
|
|
||||||
|
# /process/remove
|
||||||
|
@cherrypy.expose
|
||||||
|
@cherrypy.tools.json_in()
|
||||||
|
@cherrypy.tools.json_out()
|
||||||
|
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||||
|
def remove(self, pid):
|
||||||
|
"""Remove a process from the manager, killing it if necessary."""
|
||||||
|
if pid not in self.manager:
|
||||||
|
raise cherrypy.HTTPError("404 Not Found", "No such PID")
|
||||||
|
if not self.manager.terminate(pid): # pragma: no cover
|
||||||
|
raise cherrypy.HTTPError("503 Service Unavailable",
|
||||||
|
"Failed to stop process")
|
||||||
|
status = self.process_status(pid)
|
||||||
|
self.manager.remove(pid)
|
||||||
|
return status
|
||||||
|
|
||||||
|
# /process/command
|
||||||
|
@cherrypy.expose
|
||||||
|
@cherrypy.tools.json_in()
|
||||||
|
@cherrypy.tools.json_out()
|
||||||
|
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||||
|
def command(self, args):
|
||||||
|
"""Execute an arbitrary program on the server. 'args' is the
|
||||||
|
argument list, with 'args[0]' being the program and 'args[1]',
|
||||||
|
'args[2]', etc as arguments."""
|
||||||
|
return self.manager.run_command("command", args)
|
||||||
|
|
||||||
|
class AppFilter(object):
|
||||||
|
|
||||||
|
def __init__(self, manager):
|
||||||
|
self.manager = manager
|
||||||
|
|
||||||
|
# /filter/trainola
|
||||||
|
@cherrypy.expose
|
||||||
|
@cherrypy.tools.json_in()
|
||||||
|
@cherrypy.tools.json_out()
|
||||||
|
@exception_to_httperror(KeyError, ValueError)
|
||||||
|
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||||
|
def trainola(self, data):
|
||||||
|
return self.manager.run_python(
|
||||||
|
"trainola", nilmrun.filters.trainola.filterfunc, data)
|
||||||
|
|
||||||
|
# /filter/dummy
|
||||||
|
@cherrypy.expose
|
||||||
|
@cherrypy.tools.json_in()
|
||||||
|
@cherrypy.tools.json_out()
|
||||||
|
@exception_to_httperror(KeyError, ValueError)
|
||||||
|
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||||
|
def dummy(self, data):
|
||||||
|
return self.manager.run_python(
|
||||||
|
"dummy", nilmrun.filters.dummy.filterfunc, data)
|
||||||
|
|
||||||
class Server(object):
|
class Server(object):
|
||||||
def __init__(self, host = '127.0.0.1', port = 8080,
|
def __init__(self, host = '127.0.0.1', port = 8080,
|
||||||
embedded = True, # hide diagnostics and output, etc
|
embedded = True, # hide diagnostics and output, etc
|
||||||
@@ -104,7 +195,10 @@ class Server(object):
|
|||||||
cherrypy._cperror._ie_friendly_error_sizes = {}
|
cherrypy._cperror._ie_friendly_error_sizes = {}
|
||||||
|
|
||||||
# Build up the application and mount it
|
# Build up the application and mount it
|
||||||
root = NilmRunApp()
|
manager = nilmrun.processmanager.ProcessManager()
|
||||||
|
root = App()
|
||||||
|
root.process = AppProcess(manager)
|
||||||
|
root.filter = AppFilter(manager)
|
||||||
cherrypy.tree.apps = {}
|
cherrypy.tree.apps = {}
|
||||||
cherrypy.tree.mount(root, basepath, config = { "/" : app_config })
|
cherrypy.tree.mount(root, basepath, config = { "/" : app_config })
|
||||||
|
|
@@ -20,7 +20,7 @@ def main():
|
|||||||
help = 'Only listen on the given address',
|
help = 'Only listen on the given address',
|
||||||
default = '0.0.0.0')
|
default = '0.0.0.0')
|
||||||
group.add_argument('-p', '--port', help = 'Listen on the given port',
|
group.add_argument('-p', '--port', help = 'Listen on the given port',
|
||||||
type = int, default = 12380)
|
type = int, default = 12381)
|
||||||
group.add_argument('-q', '--quiet', help = 'Silence output',
|
group.add_argument('-q', '--quiet', help = 'Silence output',
|
||||||
action = 'store_true')
|
action = 'store_true')
|
||||||
group.add_argument('-t', '--traceback',
|
group.add_argument('-t', '--traceback',
|
||||||
@@ -41,12 +41,14 @@ def main():
|
|||||||
|
|
||||||
# Print info
|
# Print info
|
||||||
if not args.quiet:
|
if not args.quiet:
|
||||||
print "Version: %s" % nilmrun.__version__
|
print "NilmRun version: %s" % nilmrun.__version__
|
||||||
|
print ("Note: This server does not do any authentication! " +
|
||||||
|
"Anyone who can connect can run arbitrary commands.")
|
||||||
if args.address == '0.0.0.0' or args.address == '::':
|
if args.address == '0.0.0.0' or args.address == '::':
|
||||||
host = socket.getfqdn()
|
host = socket.getfqdn()
|
||||||
else:
|
else:
|
||||||
host = args.address
|
host = args.address
|
||||||
print "NilmRun Server URL: http://%s:%d/" % ( host, args.port)
|
print "Server URL: http://%s:%d/" % ( host, args.port)
|
||||||
print "----"
|
print "----"
|
||||||
|
|
||||||
server.start(blocking = True)
|
server.start(blocking = True)
|
||||||
|
22
setup.cfg
Normal file
22
setup.cfg
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
[aliases]
|
||||||
|
test = nosetests
|
||||||
|
|
||||||
|
[nosetests]
|
||||||
|
# Note: values must be set to 1, and have no comments on the same line,
|
||||||
|
# for "python setup.py nosetests" to work correctly.
|
||||||
|
nocapture=1
|
||||||
|
# Comment this out to see CherryPy logs on failure:
|
||||||
|
nologcapture=1
|
||||||
|
with-coverage=1
|
||||||
|
cover-inclusive=1
|
||||||
|
cover-package=nilmrun
|
||||||
|
cover-erase=1
|
||||||
|
# this works, puts html output in cover/ dir:
|
||||||
|
# cover-html=1
|
||||||
|
# need nose 1.1.3 for this:
|
||||||
|
# cover-branches=1
|
||||||
|
#debug=nose
|
||||||
|
#debug-log=nose.log
|
||||||
|
stop=1
|
||||||
|
verbosity=2
|
||||||
|
tests=tests
|
5
setup.py
5
setup.py
@@ -30,7 +30,7 @@ except ImportError:
|
|||||||
# Versioneer manages version numbers from git tags.
|
# Versioneer manages version numbers from git tags.
|
||||||
# https://github.com/warner/python-versioneer
|
# https://github.com/warner/python-versioneer
|
||||||
import versioneer
|
import versioneer
|
||||||
versioneer.versionfile_source = 'src/_version.py'
|
versioneer.versionfile_source = 'nilmrun/_version.py'
|
||||||
versioneer.versionfile_build = 'nilmrun/_version.py'
|
versioneer.versionfile_build = 'nilmrun/_version.py'
|
||||||
versioneer.tag_prefix = 'nilmrun-'
|
versioneer.tag_prefix = 'nilmrun-'
|
||||||
versioneer.parentdir_prefix = 'nilmrun-'
|
versioneer.parentdir_prefix = 'nilmrun-'
|
||||||
@@ -69,12 +69,13 @@ setup(name='nilmrun',
|
|||||||
packages = [ 'nilmrun',
|
packages = [ 'nilmrun',
|
||||||
'nilmrun.scripts',
|
'nilmrun.scripts',
|
||||||
],
|
],
|
||||||
package_dir = { 'nilmrun': 'src',
|
package_dir = { 'nilmrun': 'nilmrun',
|
||||||
'nilmrun.scripts': 'scripts',
|
'nilmrun.scripts': 'scripts',
|
||||||
},
|
},
|
||||||
entry_points = {
|
entry_points = {
|
||||||
'console_scripts': [
|
'console_scripts': [
|
||||||
'nilmrun-server = nilmrun.scripts.nilmrun_server:main',
|
'nilmrun-server = nilmrun.scripts.nilmrun_server:main',
|
||||||
|
'nilm-trainola = nilmrun.trainola:main',
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
zip_safe = False,
|
zip_safe = False,
|
||||||
|
49
tests/runtests.py
Executable file
49
tests/runtests.py
Executable file
@@ -0,0 +1,49 @@
|
|||||||
|
#!/usr/bin/python
|
||||||
|
|
||||||
|
import nose
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import glob
|
||||||
|
from collections import OrderedDict
|
||||||
|
|
||||||
|
# Change into parent dir
|
||||||
|
os.chdir(os.path.dirname(os.path.realpath(__file__)) + "/..")
|
||||||
|
|
||||||
|
class JimOrderPlugin(nose.plugins.Plugin):
|
||||||
|
"""When searching for tests and encountering a directory that
|
||||||
|
contains a 'test.order' file, run tests listed in that file, in the
|
||||||
|
order that they're listed. Globs are OK in that file and duplicates
|
||||||
|
are removed."""
|
||||||
|
name = 'jimorder'
|
||||||
|
score = 10000
|
||||||
|
|
||||||
|
def prepareTestLoader(self, loader):
|
||||||
|
def wrap(func):
|
||||||
|
def wrapper(name, *args, **kwargs):
|
||||||
|
addr = nose.selector.TestAddress(
|
||||||
|
name, workingDir=loader.workingDir)
|
||||||
|
try:
|
||||||
|
order = os.path.join(addr.filename, "test.order")
|
||||||
|
except Exception:
|
||||||
|
order = None
|
||||||
|
if order and os.path.exists(order):
|
||||||
|
files = []
|
||||||
|
for line in open(order):
|
||||||
|
line = line.split('#')[0].strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
fn = os.path.join(addr.filename, line.strip())
|
||||||
|
files.extend(sorted(glob.glob(fn)) or [fn])
|
||||||
|
files = list(OrderedDict.fromkeys(files))
|
||||||
|
tests = [ wrapper(fn, *args, **kwargs) for fn in files ]
|
||||||
|
return loader.suiteClass(tests)
|
||||||
|
return func(name, *args, **kwargs)
|
||||||
|
return wrapper
|
||||||
|
loader.loadTestsFromName = wrap(loader.loadTestsFromName)
|
||||||
|
return loader
|
||||||
|
|
||||||
|
# Use setup.cfg for most of the test configuration. Adding
|
||||||
|
# --with-jimorder here means that a normal "nosetests" run will
|
||||||
|
# still work, it just won't support test.order.
|
||||||
|
nose.main(addplugins = [ JimOrderPlugin() ],
|
||||||
|
argv = sys.argv + ["--with-jimorder"])
|
3
tests/test.order
Normal file
3
tests/test.order
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
test_client.py
|
||||||
|
|
||||||
|
test_*.py
|
252
tests/test_client.py
Normal file
252
tests/test_client.py
Normal file
@@ -0,0 +1,252 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import nilmrun.server
|
||||||
|
|
||||||
|
from nilmdb.client.httpclient import HTTPClient, ClientError, ServerError
|
||||||
|
|
||||||
|
from nilmdb.utils.printf import *
|
||||||
|
|
||||||
|
from nose.plugins.skip import SkipTest
|
||||||
|
from nose.tools import *
|
||||||
|
from nose.tools import assert_raises
|
||||||
|
|
||||||
|
import itertools
|
||||||
|
import distutils.version
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import threading
|
||||||
|
import cStringIO
|
||||||
|
import simplejson as json
|
||||||
|
import unittest
|
||||||
|
import warnings
|
||||||
|
import time
|
||||||
|
import re
|
||||||
|
import urllib2
|
||||||
|
from urllib2 import urlopen, HTTPError
|
||||||
|
import requests
|
||||||
|
import pprint
|
||||||
|
|
||||||
|
from testutil.helpers import *
|
||||||
|
|
||||||
|
testurl = "http://localhost:32181/"
|
||||||
|
|
||||||
|
def setup_module():
|
||||||
|
global test_server
|
||||||
|
|
||||||
|
# Start web app on a custom port
|
||||||
|
test_server = nilmrun.server.Server(host = "127.0.0.1",
|
||||||
|
port = 32181,
|
||||||
|
force_traceback = True)
|
||||||
|
test_server.start(blocking = False)
|
||||||
|
|
||||||
|
def teardown_module():
|
||||||
|
global test_server
|
||||||
|
# Close web app
|
||||||
|
test_server.stop()
|
||||||
|
|
||||||
|
class TestClient(object):
|
||||||
|
|
||||||
|
def wait_end(self, client, pid, timeout = 5):
|
||||||
|
start = time.time()
|
||||||
|
status = None
|
||||||
|
while (time.time() - start) < timeout:
|
||||||
|
status = client.get("/process/status", { "pid": pid })
|
||||||
|
if status["alive"] == False:
|
||||||
|
return status
|
||||||
|
raise AssertionError("process " + str(pid) + " didn't die in " +
|
||||||
|
str(timeout) + " seconds: " + repr(status))
|
||||||
|
|
||||||
|
def test_client_01_basic(self):
|
||||||
|
client = HTTPClient(baseurl = testurl)
|
||||||
|
version = client.get("/version")
|
||||||
|
eq_(distutils.version.LooseVersion(version),
|
||||||
|
distutils.version.LooseVersion(nilmrun.__version__))
|
||||||
|
|
||||||
|
in_("This is NilmRun", client.get("/"))
|
||||||
|
|
||||||
|
with assert_raises(ClientError):
|
||||||
|
client.get("/favicon.ico")
|
||||||
|
|
||||||
|
def test_client_02_manager(self):
|
||||||
|
client = HTTPClient(baseurl = testurl)
|
||||||
|
|
||||||
|
eq_(client.get("/process/list"), [])
|
||||||
|
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
client.get("/process/status", { "pid": 12345 })
|
||||||
|
in_("No such PID", str(e.exception))
|
||||||
|
with assert_raises(ClientError):
|
||||||
|
client.get("/process/remove", { "pid": 12345 })
|
||||||
|
in_("No such PID", str(e.exception))
|
||||||
|
|
||||||
|
def test_client_03_process_basic(self):
|
||||||
|
client = HTTPClient(baseurl = testurl, post_json = True)
|
||||||
|
|
||||||
|
# start dummy filter
|
||||||
|
pid = client.post("/filter/dummy", { "data": 30 })
|
||||||
|
eq_(client.get("/process/list"), [pid])
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Verify that status looks OK
|
||||||
|
status = client.get("/process/status", { "pid": pid, "clear": True })
|
||||||
|
for x in [ "pid", "alive", "exitcode", "name",
|
||||||
|
"start_time", "parameters", "log" ]:
|
||||||
|
in_(x, status)
|
||||||
|
in_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
|
||||||
|
eq_(status["alive"], True)
|
||||||
|
eq_(status["exitcode"], None)
|
||||||
|
|
||||||
|
# Check that the log got cleared
|
||||||
|
status = client.get("/process/status", { "pid": pid })
|
||||||
|
nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
|
||||||
|
|
||||||
|
# See that it ended properly
|
||||||
|
status = self.wait_end(client, pid)
|
||||||
|
in_("dummy 27\ndummy 28\ndummy 29\n", status["log"])
|
||||||
|
eq_(status["exitcode"], 0)
|
||||||
|
|
||||||
|
# Remove it
|
||||||
|
killstatus = client.post("/process/remove", { "pid": pid })
|
||||||
|
eq_(status, killstatus)
|
||||||
|
eq_(client.get("/process/list"), [])
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
client.post("/process/remove", { "pid": pid })
|
||||||
|
in_("No such PID", str(e.exception))
|
||||||
|
|
||||||
|
def test_client_04_process_terminate(self):
|
||||||
|
client = HTTPClient(baseurl = testurl, post_json = True)
|
||||||
|
|
||||||
|
# Trigger exception in filter
|
||||||
|
pid = client.post("/filter/dummy", { "data": -1 })
|
||||||
|
time.sleep(0.5)
|
||||||
|
status = client.get("/process/status", { "pid": pid })
|
||||||
|
eq_(status["alive"], False)
|
||||||
|
eq_(status["exitcode"], 1)
|
||||||
|
in_("Exception: test exception", status["log"])
|
||||||
|
client.post("/process/remove", { "pid": pid })
|
||||||
|
|
||||||
|
# Kill a running filter by removing it early
|
||||||
|
newpid = client.post("/filter/dummy", { "data": 50 })
|
||||||
|
ne_(newpid, pid)
|
||||||
|
time.sleep(0.5)
|
||||||
|
start = time.time()
|
||||||
|
status = client.post("/process/remove", { "pid": newpid })
|
||||||
|
elapsed = time.time() - start
|
||||||
|
# Should have died in slightly over 1 second
|
||||||
|
assert(0.5 < elapsed < 2)
|
||||||
|
eq_(status["alive"], False)
|
||||||
|
ne_(status["exitcode"], 0)
|
||||||
|
|
||||||
|
# No more
|
||||||
|
eq_(client.get("/process/list"), [])
|
||||||
|
|
||||||
|
# Try to remove a running filter that ignored SIGTERM
|
||||||
|
pid = client.post("/filter/dummy", { "data": 0 })
|
||||||
|
start = time.time()
|
||||||
|
status = client.post("/process/remove", { "pid": pid })
|
||||||
|
elapsed = time.time() - start
|
||||||
|
# Should have died in slightly over 2 seconds
|
||||||
|
assert(1.5 < elapsed < 3)
|
||||||
|
eq_(status["alive"], False)
|
||||||
|
ne_(status["exitcode"], 0)
|
||||||
|
|
||||||
|
def test_client_05_trainola_simple(self):
|
||||||
|
client = HTTPClient(baseurl = testurl, post_json = True)
|
||||||
|
pid = client.post("/filter/trainola", { "data": {} })
|
||||||
|
status = self.wait_end(client, pid)
|
||||||
|
ne_(status["exitcode"], 0)
|
||||||
|
status = client.post("/process/remove", { "pid": pid })
|
||||||
|
|
||||||
|
@unittest.skip("needs a running nilmdb")
|
||||||
|
def test_client_06_trainola(self):
|
||||||
|
client = HTTPClient(baseurl = testurl, post_json = True)
|
||||||
|
|
||||||
|
data = { "url": "http://bucket.mit.edu/nilmdb",
|
||||||
|
"stream": "/sharon/prep-a",
|
||||||
|
"start": 1366111383280463,
|
||||||
|
"end": 1366126163457797,
|
||||||
|
"columns": [ { "name": "P1", "index": 0 },
|
||||||
|
{ "name": "Q1", "index": 1 },
|
||||||
|
{ "name": "P3", "index": 2 } ],
|
||||||
|
"exemplars": [
|
||||||
|
{ "name": "Boiler Pump ON",
|
||||||
|
"url": "http://bucket.mit.edu/nilmdb",
|
||||||
|
"stream": "/sharon/prep-a",
|
||||||
|
"start": 1366260494269078,
|
||||||
|
"end": 1366260608185031,
|
||||||
|
"columns": [ { "name": "P1", "index": 0 },
|
||||||
|
{ "name": "Q1", "index": 1 }
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{ "name": "Boiler Pump OFF",
|
||||||
|
"url": "http://bucket.mit.edu/nilmdb",
|
||||||
|
"stream": "/sharon/prep-a",
|
||||||
|
"start": 1366260864215764,
|
||||||
|
"end": 1366260870882998,
|
||||||
|
"columns": [ { "name": "P1", "index": 0 },
|
||||||
|
{ "name": "Q1", "index": 1 }
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
# start trainola
|
||||||
|
pid = client.post("/filter/trainola", { "data": data })
|
||||||
|
|
||||||
|
# wait for it to finish
|
||||||
|
for i in range(60):
|
||||||
|
time.sleep(1)
|
||||||
|
if i == 2:
|
||||||
|
status = client.get("/process/status", { "pid": pid,
|
||||||
|
"clear": True })
|
||||||
|
in_("Loading stream data", status['log'])
|
||||||
|
elif i == 3:
|
||||||
|
status = client.get("/process/status", { "pid": pid })
|
||||||
|
nin_("Loading stream data", status['log'])
|
||||||
|
else:
|
||||||
|
status = client.get("/process/status", { "pid": pid })
|
||||||
|
if status["alive"] == False:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
client.post("/process/remove", {"pid": pid })
|
||||||
|
raise AssertionError("took too long")
|
||||||
|
if i < 3:
|
||||||
|
raise AssertionError("too fast?")
|
||||||
|
|
||||||
|
def test_client_07_process_command(self):
|
||||||
|
client = HTTPClient(baseurl = testurl, post_json = True)
|
||||||
|
eq_(client.get("/process/list"), [])
|
||||||
|
|
||||||
|
def do(args, kill):
|
||||||
|
pid = client.post("/process/command", { "args": args } )
|
||||||
|
eq_(client.get("/process/list"), [pid])
|
||||||
|
if kill:
|
||||||
|
time.sleep(1)
|
||||||
|
status = client.get("/process/status", { "pid": pid })
|
||||||
|
if not status["alive"]:
|
||||||
|
raise AssertionError("died before we could kill it")
|
||||||
|
status = client.post("/process/remove", { "pid": pid })
|
||||||
|
if status["alive"]:
|
||||||
|
raise AssertionError("didn't get killed")
|
||||||
|
else:
|
||||||
|
self.wait_end(client, pid)
|
||||||
|
status = client.post("/process/remove", { "pid": pid })
|
||||||
|
return status
|
||||||
|
|
||||||
|
# Simple command
|
||||||
|
status = do(["pwd"], False)
|
||||||
|
eq_(status["exitcode"], 0)
|
||||||
|
eq_("/tmp\n", status["log"])
|
||||||
|
|
||||||
|
# Command with args
|
||||||
|
status = do(["expr", "1", "+", "2"], False)
|
||||||
|
eq_(status["exitcode"], 0)
|
||||||
|
eq_("3\n", status["log"])
|
||||||
|
|
||||||
|
# Missing command
|
||||||
|
status = do(["/no-such-command-blah-blah"], False)
|
||||||
|
ne_(status["exitcode"], 0)
|
||||||
|
|
||||||
|
# Kill a slow command
|
||||||
|
status = do(["sleep", "60"], True)
|
||||||
|
ne_(status["exitcode"], 0)
|
1
tests/testutil/__init__.py
Normal file
1
tests/testutil/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
# empty
|
41
tests/testutil/helpers.py
Normal file
41
tests/testutil/helpers.py
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
# Just some helpers for test functions
|
||||||
|
|
||||||
|
def myrepr(x):
|
||||||
|
if isinstance(x, basestring):
|
||||||
|
return '"' + x + '"'
|
||||||
|
else:
|
||||||
|
return repr(x)
|
||||||
|
|
||||||
|
def eq_(a, b):
|
||||||
|
if not a == b:
|
||||||
|
raise AssertionError("%s != %s" % (myrepr(a), myrepr(b)))
|
||||||
|
|
||||||
|
def lt_(a, b):
|
||||||
|
if not a < b:
|
||||||
|
raise AssertionError("%s is not less than %s" % (myrepr(a), myrepr(b)))
|
||||||
|
|
||||||
|
def in_(a, b):
|
||||||
|
if a not in b:
|
||||||
|
raise AssertionError("%s not in %s" % (myrepr(a), myrepr(b)))
|
||||||
|
|
||||||
|
def nin_(a, b):
|
||||||
|
if a in b:
|
||||||
|
raise AssertionError("unexpected %s in %s" % (myrepr(a), myrepr(b)))
|
||||||
|
|
||||||
|
def in2_(a1, a2, b):
|
||||||
|
if a1 not in b and a2 not in b:
|
||||||
|
raise AssertionError("(%s or %s) not in %s" % (myrepr(a1), myrepr(a2),
|
||||||
|
myrepr(b)))
|
||||||
|
|
||||||
|
def ne_(a, b):
|
||||||
|
if not a != b:
|
||||||
|
raise AssertionError("unexpected %s == %s" % (myrepr(a), myrepr(b)))
|
||||||
|
|
||||||
|
def lines_(a, n):
|
||||||
|
l = a.count('\n')
|
||||||
|
if not l == n:
|
||||||
|
if len(a) > 5000:
|
||||||
|
a = a[0:5000] + " ... truncated"
|
||||||
|
raise AssertionError("wanted %d lines, got %d in output: '%s'"
|
||||||
|
% (n, l, a))
|
||||||
|
|
Reference in New Issue
Block a user