35 Commits

Author SHA1 Message Date
5b878378f3 Translate UTF-8 in command output more robustly 2013-07-22 13:03:09 -04:00
5cd38f1ba9 Don't spin so fast in tests while waiting 2013-07-21 19:49:44 -04:00
d7551bde0b Make 'args' optional to /run/code 2013-07-21 19:49:30 -04:00
40fd377a38 Remove 'name' from spawned processes 2013-07-21 19:49:15 -04:00
6e7f3ac704 Remove nilm-trainola script 2013-07-18 12:28:32 -04:00
29adb47a33 Fix test order 2013-07-18 11:01:27 -04:00
7c605a469a Cleanup dependencies 2013-07-18 11:00:53 -04:00
f5225f88f9 Add max CPU percentage 2013-07-17 18:48:55 -04:00
32e59310ef Fix for dead processes 2013-07-17 18:19:52 -04:00
5a33ef48cc Add /process/info request 2013-07-17 18:12:44 -04:00
18a5cd6334 Improve boolean parameter parsing 2013-07-15 14:39:28 -04:00
7ec4d60d38 Fix WSGI docs 2013-07-11 16:36:18 -04:00
b2bdf784ac Make test URLs relative 2013-07-11 11:46:02 -04:00
e0709f0d17 Remove multiprocessing due to mod_wsgi incompatibility; use subprocess
Multiprocessing and Apache's mod_wsgi don't play nicely.  Switch to
manually managing processes via subprocess.Popen etc instead.  When
running arbitrary code, we write it to an external file, and running
functions directly is no longer supported.
2013-07-11 11:39:22 -04:00
18d3cff772 Update WSGI docs 2013-07-10 14:16:35 -04:00
a7b9656916 Remove parameters from status output 2013-07-10 11:35:17 -04:00
2e9ec63675 Don't catch SystemExit from a subprocess 2013-07-09 13:15:27 -04:00
6d295b840a Delete trainola; it will live in nilmtools repo 2013-07-08 11:57:45 -04:00
74a05d05d6 Clear out traceback object to avoid reference cycles 2013-07-08 11:44:19 -04:00
35b20c90a5 Rename and reorganize stuff 2013-07-08 11:33:27 -04:00
6d9ee7b405 Ability to run user provided code now 2013-07-07 20:18:52 -04:00
721d6c4936 Add run_code_string function, untested 2013-07-07 15:36:34 -04:00
973d328e1e Rename some URLs (/process/command -> /run/command, etc) 2013-07-07 14:12:48 -04:00
0338d40226 One fewer process when spawning commands 2013-07-07 14:09:47 -04:00
734e1d9b52 Add /process/command; fix killing of forked processes
Now an entire process tree is killed in /process/remove, as long as
each child hasn't changed its process group.
2013-07-07 12:13:13 -04:00
30a3559253 Add "force" option for /process/remove 2013-07-06 16:03:18 -04:00
caad5dec04 Add warning about lack of auth 2013-07-06 15:41:29 -04:00
e89a48dbb7 Mention SSL stuff in docs 2013-07-06 15:37:10 -04:00
e2c9575937 More tests 2013-07-05 22:14:43 -04:00
258fe2358d Filled out test coverage, fixed lots of bugs 2013-07-05 21:58:30 -04:00
f73de35ee6 Work in progress on the process manager. Very finicky.
The multiprocessing environment is really finicky.
I'm seeing deadlocks in the process at exit, that are probably related
to tracebacks being printed and still getting redirected to the
logfile or something.
2013-07-05 19:35:09 -04:00
65e48caf5f Start test framework; work on server and manager 2013-07-05 16:47:50 -04:00
a9bac7d9a0 Start thread management stuff 2013-07-05 15:55:41 -04:00
afd21bfef2 Improve trainola matcher 2013-07-05 15:55:28 -04:00
b228c3e35f First implementation of trainola 2013-07-04 13:49:15 -04:00
19 changed files with 987 additions and 37 deletions

10
.coveragerc Normal file
View File

@@ -0,0 +1,10 @@
# -*- conf -*-
[run]
# branch = True
[report]
exclude_lines =
pragma: no cover
if 0:
omit = scripts,nilmrun/_version.py,nilmrun/filters/*

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.coverage
build/
*.pyc
dist/

View File

@@ -1,18 +1,15 @@
URL="http://localhost/nilmdb"
# By default, run the tests.
all: test
all:
ifeq ($(INSIDE_EMACS), t)
@make test
else
@echo "Try 'make install'"
endif
test:
echo No tests yet
test2:
nilmrun/trainola.py data.js
version:
python setup.py version
build:
python setup.py build_ext --inplace
dist: sdist
sdist:
python setup.py sdist
@@ -23,11 +20,29 @@ install:
develop:
python setup.py develop
docs:
make -C docs
lint:
pylint --rcfile=.pylintrc nilmdb
test:
ifeq ($(INSIDE_EMACS), t)
# Use the slightly more flexible script
python setup.py build_ext --inplace
python tests/runtests.py
else
# Let setup.py check dependencies, build stuff, and run the test
python setup.py nosetests
endif
clean::
rm -f .coverage
find . -name '*pyc' | xargs rm -f
rm -rf nilmtools.egg-info/ build/ MANIFEST.in
make -C docs clean
gitclean::
git clean -dXf
.PHONY: all test version dist sdist install clean gitclean
.PHONY: all version dist sdist install docs lint test clean gitclean

View File

@@ -6,10 +6,8 @@ Prerequisites:
# Runtime and build environments
sudo apt-get install python2.7 python-setuptools
# Base dependencies
sudo apt-get install python-numpy python-scipy
nilmdb (1.8.0+)
# Plus nilmdb and its dependencies
nilmdb (1.8.2+)
Install:

28
data.js Normal file
View File

@@ -0,0 +1,28 @@
{ "url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366111383280463,
"end": 1366126163457797,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 },
{ "name": "P3", "index": 2 } ],
"exemplars": [
{ "name": "Boiler Pump ON",
"url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366260494269078,
"end": 1366260608185031,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
},
{ "name": "Boiler Pump OFF",
"url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366260864215764,
"end": 1366260870882998,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
}
]
}

View File

@@ -13,16 +13,21 @@ First, create a WSGI script `/home/nilm/nilmrun.wsgi` containing:
The first parameter is the path part of the URL.
Then, set up Apache with a configuration like:
Then, set up Apache with a configuration like below. SSL and access
control/authentication are strongly recommended since this can execute
arbitrary commands.
<VirtualHost *:443>
SSLEngine On
<VirtualHost>
WSGIScriptAlias /nilmrun /home/nilm/nilmrun.wsgi
WSGIApplicationGroup nilmrun-appgroup
WSGIProcessGroup nilmrun-procgroup
WSGIDaemonProcess nilmrun-procgroup threads=32 user=nilm group=nilm
<Location /nilmrun>
WSGIProcessGroup nilmrun-procgroup
WSGIApplicationGroup nilmrun-appgroup
SSLRequireSSL
# Access control example:
<Location /nilmrun>
Order deny,allow
Deny from all
Allow from 1.2.3.4

View File

@@ -1,3 +1,4 @@
import nilmrun.processmanager
from ._version import get_versions
__version__ = get_versions()['version']

View File

@@ -181,7 +181,7 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False)
tag_prefix = "nilmrun-"
parentdir_prefix = "nilmrun-"
versionfile_source = "src/_version.py"
versionfile_source = "nilmrun/_version.py"
def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
variables = { "refnames": git_refnames, "full": git_full }

256
nilmrun/processmanager.py Normal file
View File

@@ -0,0 +1,256 @@
#!/usr/bin/python
from nilmdb.utils.printf import *
import threading
import subprocess
import cStringIO
import sys
import os
import signal
import time
import uuid
import psutil
import tempfile
import atexit
import shutil
class ProcessError(Exception):
pass
class LogReceiver(object):
"""Spawn a thread that listens to a pipe for log messages,
and stores them locally."""
def __init__(self, pipe):
self.pipe = pipe
self.log = cStringIO.StringIO()
self.thread = threading.Thread(target = self.run)
self.thread.start()
def run(self):
while True:
data = os.read(self.pipe, 65536)
if not data:
os.close(self.pipe)
return
self.log.write(data)
def getvalue(self):
return self.log.getvalue()
def clear(self):
self.log = cStringIO.StringIO()
class Process(object):
"""Spawn and manage a subprocess, and capture its output."""
def __init__(self, argv, tempfile = None):
self.start_time = None
# Use a pipe for communicating log data
(rpipe, wpipe) = os.pipe()
self._log = LogReceiver(rpipe)
# Stdin is null
nullfd = os.open(os.devnull, os.O_RDONLY)
# Spawn the new process
try:
self._process = subprocess.Popen(args = argv, stdin = nullfd,
stdout = wpipe, stderr = wpipe,
close_fds = True, cwd = "/tmp")
except (OSError, TypeError) as e:
raise ProcessError(str(e))
finally:
# Close the FDs we don't need
os.close(wpipe)
os.close(nullfd)
# Get process info
self.start_time = time.time()
self.pid = str(uuid.uuid1(self._process.pid or 0))
def _join(self, timeout = 1.0):
start = time.time()
while True:
if self._process.poll() is not None:
return True
if (time.time() - start) >= timeout:
return False
time.sleep(0.1)
def terminate(self, timeout = 1.0):
"""Terminate a process, and all of its children that are in the same
process group."""
try:
# First give it some time to die on its own
if self._join(timeout):
return True
def getpgid(pid):
try:
return os.getpgid(pid)
except OSError: # pragma: no cover
return None
# Find all children
group = getpgid(self._process.pid)
main = psutil.Process(self._process.pid)
allproc = [ main ] + main.get_children(recursive = True)
# Kill with SIGTERM, if they're still in this process group
for proc in allproc:
if getpgid(proc.pid) == group:
os.kill(proc.pid, signal.SIGTERM)
# Wait for it to die again
if self._join(timeout):
return True
# One more try with SIGKILL
for proc in allproc:
if getpgid(proc.pid) == group:
os.kill(proc.pid, signal.SIGKILL)
# See if it worked
return self._join(timeout)
except psutil.Error: # pragma: no cover (race condition)
return True
def clear_log(self):
self._log.clear()
@property
def log(self):
return self._log.getvalue()
@property
def alive(self):
return self._process.poll() is None
@property
def exitcode(self):
return self._process.returncode
def get_info_prepare(self):
"""Prepare the process list and measurement for .get_info.
Call .get_info() about a second later."""
try:
main = psutil.Process(self._process.pid)
self._process_list = [ main ] + main.get_children(recursive = True)
for proc in self._process_list:
proc.get_cpu_percent(0)
except psutil.Error: # pragma: no cover (race condition)
self._process_list = [ ]
@staticmethod
def get_empty_info():
return { "cpu_percent": 0,
"cpu_user": 0,
"cpu_sys": 0,
"mem_phys": 0,
"mem_virt": 0,
"io_read": 0,
"io_write": 0,
"procs": 0 }
def get_info(self):
"""Return a dictionary with info about the process CPU and memory
usage. Call .get_info_prepare() about a second before this."""
d = self.get_empty_info()
for proc in self._process_list:
try:
d["cpu_percent"] += proc.get_cpu_percent(0)
cpuinfo = proc.get_cpu_times()
d["cpu_user"] += cpuinfo.user
d["cpu_sys"] += cpuinfo.system
meminfo = proc.get_memory_info()
d["mem_phys"] += meminfo.rss
d["mem_virt"] += meminfo.vms
ioinfo = proc.get_io_counters()
d["io_read"] += ioinfo.read_bytes
d["io_write"] += ioinfo.write_bytes
d["procs"] += 1
except psutil.Error:
pass
return d
class ProcessManager(object):
"""Track and manage a collection of Process objects"""
def __init__(self):
self.processes = {}
self.tmpfiles = {}
self.tmpdir = tempfile.mkdtemp(prefix = "nilmrun-usercode-")
atexit.register(shutil.rmtree, self.tmpdir)
def __iter__(self):
return iter(self.processes.keys())
def __getitem__(self, key):
return self.processes[key]
def run_code(self, code, args):
"""Evaluate 'code' as if it were placed into a Python file and
executed. The arguments, which must be strings, will be
accessible in the code as sys.argv[1:]."""
# The easiest way to do this, by far, is to just write the
# code to a file.
(fd, path) = tempfile.mkstemp(prefix = "nilmrun-usercode-",
suffix = ".py", dir=self.tmpdir)
with os.fdopen(fd, 'w') as f:
f.write(code)
argv = [ sys.executable, "-B", "-s", "-u", path ] + args
pid = self.run_command(argv)
self.tmpfiles[pid] = path
return pid
def run_command(self, argv):
"""Execute a command line program"""
new = Process(argv)
self.processes[new.pid] = new
return new.pid
def terminate(self, pid):
return self.processes[pid].terminate()
def remove(self, pid):
if pid in self.tmpfiles:
try:
os.unlink(self.tmpfiles[pid])
except OSError: # pragma: no cover
pass
del self.tmpfiles[pid]
del self.processes[pid]
def get_info(self):
"""Get info about all running PIDs"""
info = { "total" : Process.get_empty_info(),
"pids" : {},
"system" : {}
}
# Trigger CPU usage collection
for pid in self:
self[pid].get_info_prepare()
psutil.cpu_percent(0, percpu = True)
# Give it some time
time.sleep(1)
# Retrieve info for system
info["system"]["cpu_percent"] = sum(psutil.cpu_percent(0, percpu=True))
info["system"]["cpu_max"] = 100.0 * psutil.NUM_CPUS
info["system"]["procs"] = len(psutil.get_pid_list())
# psutil > 0.6.0's psutil.virtual_memory() would be better here,
# but this should give the same info.
meminfo = psutil.phymem_usage()
info["system"]["mem_total"] = meminfo.total
info["system"]["mem_used"] = int(meminfo.total * meminfo.percent / 100)
# Retrieve info for each PID
for pid in self:
info["pids"][pid] = self[pid].get_info()
# Update totals
for key in info["total"]:
info["total"][key] += info["pids"][pid][key]
return info

View File

@@ -5,10 +5,8 @@ import sys
import os
import socket
import simplejson as json
import decorator
import psutil
import traceback
import argparse
import time
import nilmdb
from nilmdb.utils.printf import *
@@ -22,14 +20,16 @@ from nilmdb.server.serverutil import (
json_error_page,
cherrypy_start,
cherrypy_stop,
bool_param,
)
import nilmrun
import nilmrun.testfilter
# Add CORS_allow tool
cherrypy.tools.CORS_allow = cherrypy.Tool('on_start_resource', CORS_allow)
# CherryPy apps
class NilmRunApp(object):
class App(object):
"""Root application for NILM runner"""
def __init__(self):
@@ -40,7 +40,7 @@ class NilmRunApp(object):
def index(self):
cherrypy.response.headers['Content-Type'] = 'text/plain'
msg = sprintf("This is NilmRun version %s, running on host %s.\n",
nilmdb.__version__, socket.getfqdn())
nilmrun.__version__, socket.getfqdn())
return msg
# /favicon.ico
@@ -54,6 +54,107 @@ class NilmRunApp(object):
def version(self):
return nilmrun.__version__
class AppProcess(object):
def __init__(self, manager):
self.manager = manager
def process_status(self, pid):
# We need to convert the log (which is bytes) to Unicode
# characters, in order to send it via JSON. Treat it as UTF-8
# but replace invalid characters with markers.
log = self.manager[pid].log.decode('utf-8', errors='replace')
return {
"pid": pid,
"alive": self.manager[pid].alive,
"exitcode": self.manager[pid].exitcode,
"start_time": self.manager[pid].start_time,
"log": log
}
# /process/status
@cherrypy.expose
@cherrypy.tools.json_out()
def status(self, pid, clear = False):
"""Return status about a process. If clear = True, also clear
the log."""
clear = bool_param(clear)
if pid not in self.manager:
raise cherrypy.HTTPError("404 Not Found", "No such PID")
status = self.process_status(pid)
if clear:
self.manager[pid].clear_log()
return status
# /process/list
@cherrypy.expose
@cherrypy.tools.json_out()
def list(self):
"""Return a list of processes in the manager."""
return list(self.manager)
# /process/info
@cherrypy.expose
@cherrypy.tools.json_out()
def info(self):
"""Return detailed CPU and memory info about the system and
all processes"""
return self.manager.get_info()
# /process/remove
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@cherrypy.tools.CORS_allow(methods = ["POST"])
def remove(self, pid):
"""Remove a process from the manager, killing it if necessary."""
if pid not in self.manager:
raise cherrypy.HTTPError("404 Not Found", "No such PID")
if not self.manager.terminate(pid): # pragma: no cover
raise cherrypy.HTTPError("503 Service Unavailable",
"Failed to stop process")
status = self.process_status(pid)
self.manager.remove(pid)
return status
class AppRun(object):
def __init__(self, manager):
self.manager = manager
# /run/command
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@exception_to_httperror(nilmrun.processmanager.ProcessError)
@cherrypy.tools.CORS_allow(methods = ["POST"])
def command(self, argv):
"""Execute an arbitrary program on the server. argv is a
list of the program and its arguments: 'argv[0]' is the program
and 'argv[1:]' are arguments"""
if not isinstance(argv, list):
raise cherrypy.HTTPError("400 Bad Request",
"argv must be a list of strings")
return self.manager.run_command(argv)
# /run/code
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@exception_to_httperror(nilmrun.processmanager.ProcessError)
@cherrypy.tools.CORS_allow(methods = ["POST"])
def code(self, code, args = None):
"""Execute arbitrary Python code. 'code' is a formatted string.
It will be run as if it were written into a Python file and
executed. 'args' is a list of strings, and they are passed
on the command line as additional arguments (i.e., they end up
in sys.argv[1:])"""
if args is None:
args = []
if not isinstance(args, list):
raise cherrypy.HTTPError("400 Bad Request",
"args must be a list of strings")
return self.manager.run_code(code, args)
class Server(object):
def __init__(self, host = '127.0.0.1', port = 8080,
embedded = True, # hide diagnostics and output, etc
@@ -104,7 +205,10 @@ class Server(object):
cherrypy._cperror._ie_friendly_error_sizes = {}
# Build up the application and mount it
root = NilmRunApp()
manager = nilmrun.processmanager.ProcessManager()
root = App()
root.process = AppProcess(manager)
root.run = AppRun(manager)
cherrypy.tree.apps = {}
cherrypy.tree.mount(root, basepath, config = { "/" : app_config })

22
nilmrun/testfilter.py Normal file
View File

@@ -0,0 +1,22 @@
#!/usr/bin/python
from nilmdb.utils.printf import *
import time
import signal
import sys
# This is just for testing the process management.
def test(n):
n = int(n)
if n < 0: # raise an exception
raise Exception("test exception")
if n == 0: # ignore SIGTERM and count to 100
n = 100
signal.signal(signal.SIGTERM, signal.SIG_IGN)
for x in range(n):
s = sprintf("dummy %d\n", x)
if x & 1:
sys.stdout.write(s)
else:
sys.stderr.write(s)
time.sleep(0.1)

View File

@@ -20,7 +20,7 @@ def main():
help = 'Only listen on the given address',
default = '0.0.0.0')
group.add_argument('-p', '--port', help = 'Listen on the given port',
type = int, default = 12380)
type = int, default = 12381)
group.add_argument('-q', '--quiet', help = 'Silence output',
action = 'store_true')
group.add_argument('-t', '--traceback',
@@ -41,12 +41,14 @@ def main():
# Print info
if not args.quiet:
print "Version: %s" % nilmrun.__version__
print "NilmRun version: %s" % nilmrun.__version__
print ("Note: This server does not do any authentication! " +
"Anyone who can connect can run arbitrary commands.")
if args.address == '0.0.0.0' or args.address == '::':
host = socket.getfqdn()
else:
host = args.address
print "NilmRun Server URL: http://%s:%d/" % ( host, args.port)
print "Server URL: http://%s:%d/" % ( host, args.port)
print "----"
server.start(blocking = True)

22
setup.cfg Normal file
View File

@@ -0,0 +1,22 @@
[aliases]
test = nosetests
[nosetests]
# Note: values must be set to 1, and have no comments on the same line,
# for "python setup.py nosetests" to work correctly.
nocapture=1
# Comment this out to see CherryPy logs on failure:
nologcapture=1
with-coverage=1
cover-inclusive=1
cover-package=nilmrun
cover-erase=1
# this works, puts html output in cover/ dir:
# cover-html=1
# need nose 1.1.3 for this:
# cover-branches=1
#debug=nose
#debug-log=nose.log
stop=1
verbosity=2
tests=tests

View File

@@ -30,7 +30,7 @@ except ImportError:
# Versioneer manages version numbers from git tags.
# https://github.com/warner/python-versioneer
import versioneer
versioneer.versionfile_source = 'src/_version.py'
versioneer.versionfile_source = 'nilmrun/_version.py'
versioneer.versionfile_build = 'nilmrun/_version.py'
versioneer.tag_prefix = 'nilmrun-'
versioneer.parentdir_prefix = 'nilmrun-'
@@ -61,15 +61,15 @@ setup(name='nilmrun',
long_description = "NILM Database Filter Runner",
license = "Proprietary",
author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.8.0',
'nilmtools >= 1.2.2',
'numpy',
'scipy',
install_requires = [ 'nilmdb >= 1.8.2',
'psutil >= 0.3.0',
'cherrypy >= 3.2',
'simplejson',
],
packages = [ 'nilmrun',
'nilmrun.scripts',
],
package_dir = { 'nilmrun': 'src',
package_dir = { 'nilmrun': 'nilmrun',
'nilmrun.scripts': 'scripts',
},
entry_points = {

49
tests/runtests.py Executable file
View File

@@ -0,0 +1,49 @@
#!/usr/bin/python
import nose
import os
import sys
import glob
from collections import OrderedDict
# Change into parent dir
os.chdir(os.path.dirname(os.path.realpath(__file__)) + "/..")
class JimOrderPlugin(nose.plugins.Plugin):
"""When searching for tests and encountering a directory that
contains a 'test.order' file, run tests listed in that file, in the
order that they're listed. Globs are OK in that file and duplicates
are removed."""
name = 'jimorder'
score = 10000
def prepareTestLoader(self, loader):
def wrap(func):
def wrapper(name, *args, **kwargs):
addr = nose.selector.TestAddress(
name, workingDir=loader.workingDir)
try:
order = os.path.join(addr.filename, "test.order")
except Exception:
order = None
if order and os.path.exists(order):
files = []
for line in open(order):
line = line.split('#')[0].strip()
if not line:
continue
fn = os.path.join(addr.filename, line.strip())
files.extend(sorted(glob.glob(fn)) or [fn])
files = list(OrderedDict.fromkeys(files))
tests = [ wrapper(fn, *args, **kwargs) for fn in files ]
return loader.suiteClass(tests)
return func(name, *args, **kwargs)
return wrapper
loader.loadTestsFromName = wrap(loader.loadTestsFromName)
return loader
# Use setup.cfg for most of the test configuration. Adding
# --with-jimorder here means that a normal "nosetests" run will
# still work, it just won't support test.order.
nose.main(addplugins = [ JimOrderPlugin() ],
argv = sys.argv + ["--with-jimorder"])

3
tests/test.order Normal file
View File

@@ -0,0 +1,3 @@
test_nilmrun.py
test_*.py

392
tests/test_nilmrun.py Normal file
View File

@@ -0,0 +1,392 @@
# -*- coding: utf-8 -*-
import nilmrun.server
from nilmdb.client.httpclient import HTTPClient, ClientError, ServerError
from nilmdb.utils.printf import *
from nose.plugins.skip import SkipTest
from nose.tools import *
from nose.tools import assert_raises
import itertools
import distutils.version
import os
import sys
import threading
import cStringIO
import simplejson as json
import unittest
import warnings
import time
import re
import urllib2
from urllib2 import urlopen, HTTPError
import requests
import pprint
import textwrap
from testutil.helpers import *
testurl = "http://localhost:32181/"
#testurl = "http://bucket.mit.edu/nilmrun/"
def setup_module():
global test_server
# Start web app on a custom port
test_server = nilmrun.server.Server(host = "127.0.0.1",
port = 32181,
force_traceback = True)
test_server.start(blocking = False)
def teardown_module():
global test_server
# Close web app
test_server.stop()
class TestClient(object):
def wait_kill(self, client, pid, timeout = 1):
time.sleep(timeout)
status = client.get("process/status", { "pid": pid })
if not status["alive"]:
raise AssertionError("died before we could kill it")
status = client.post("process/remove", { "pid": pid })
if status["alive"]:
raise AssertionError("didn't get killed")
return status
def wait_end(self, client, pid, timeout = 5, remove = True):
start = time.time()
status = None
while (time.time() - start) < timeout:
status = client.get("process/status", { "pid": pid })
if status["alive"] == False:
break
time.sleep(0.1)
else:
raise AssertionError("process " + str(pid) + " didn't die in " +
str(timeout) + " seconds: " + repr(status))
if remove:
status = client.post("process/remove", { "pid": pid })
return status
def test_client_01_basic(self):
client = HTTPClient(baseurl = testurl)
version = client.get("version")
eq_(distutils.version.LooseVersion(version),
distutils.version.LooseVersion(nilmrun.__version__))
in_("This is NilmRun", client.get(""))
with assert_raises(ClientError):
client.get("favicon.ico")
def test_client_02_manager(self):
client = HTTPClient(baseurl = testurl)
eq_(client.get("process/list"), [])
with assert_raises(ClientError) as e:
client.get("process/status", { "pid": 12345 })
in_("No such PID", str(e.exception))
with assert_raises(ClientError):
client.get("process/remove", { "pid": 12345 })
in_("No such PID", str(e.exception))
def test_client_03_run_command(self):
client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("process/list"), [])
def do(argv, kill):
pid = client.post("run/command", { "argv": argv } )
eq_(client.get("process/list"), [pid])
if kill:
return self.wait_kill(client, pid)
return self.wait_end(client, pid)
# Simple command
status = do(["pwd"], False)
eq_(status["exitcode"], 0)
eq_("/tmp\n", status["log"])
# Command with args
status = do(["expr", "1", "+", "2"], False)
eq_(status["exitcode"], 0)
eq_("3\n", status["log"])
# Missing command
with assert_raises(ClientError) as e:
do(["/no-such-command-blah-blah"], False)
in_("No such file or directory", str(e.exception))
# Kill a slow command
status = do(["sleep", "60"], True)
ne_(status["exitcode"], 0)
def _run_testfilter(self, client, args):
code = textwrap.dedent("""
import nilmrun.testfilter
import simplejson as json
import sys
nilmrun.testfilter.test(json.loads(sys.argv[1]))
""")
jsonargs = json.dumps(args)
return client.post("run/code", { "code": code, "args": [ jsonargs ] })
def test_client_04_process_basic(self):
client = HTTPClient(baseurl = testurl, post_json = True)
# start dummy filter
pid = self._run_testfilter(client, 30)
eq_(client.get("process/list"), [pid])
time.sleep(1)
# Verify that status looks OK
status = client.get("process/status", { "pid": pid, "clear": True })
for x in [ "pid", "alive", "exitcode", "start_time", "log" ]:
in_(x, status)
in_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
eq_(status["alive"], True)
eq_(status["exitcode"], None)
# Check that the log got cleared
status = client.get("process/status", { "pid": pid })
nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
# See that it ended properly
status = self.wait_end(client, pid, remove = False)
in_("dummy 27\ndummy 28\ndummy 29\n", status["log"])
eq_(status["exitcode"], 0)
# Remove it
killstatus = client.post("process/remove", { "pid": pid })
eq_(status, killstatus)
eq_(client.get("process/list"), [])
with assert_raises(ClientError) as e:
client.post("process/remove", { "pid": pid })
in_("No such PID", str(e.exception))
def test_client_05_process_terminate(self):
client = HTTPClient(baseurl = testurl, post_json = True)
# Trigger exception in filter
pid = self._run_testfilter(client, -1)
time.sleep(0.5)
status = client.get("process/status", { "pid": pid })
eq_(status["alive"], False)
eq_(status["exitcode"], 1)
in_("Exception: test exception", status["log"])
client.post("process/remove", { "pid": pid })
# Kill a running filter by removing it early
newpid = self._run_testfilter(client, 50)
ne_(newpid, pid)
time.sleep(0.5)
start = time.time()
status = client.post("process/remove", { "pid": newpid })
elapsed = time.time() - start
# Should have died in slightly over 1 second
assert(0.5 < elapsed < 2)
eq_(status["alive"], False)
ne_(status["exitcode"], 0)
# No more
eq_(client.get("process/list"), [])
# Try to remove a running filter that ignored SIGTERM
pid = self._run_testfilter(client, 0)
start = time.time()
status = client.post("process/remove", { "pid": pid })
elapsed = time.time() - start
# Should have died in slightly over 2 seconds
assert(1.5 < elapsed < 3)
eq_(status["alive"], False)
ne_(status["exitcode"], 0)
@unittest.skip("needs a running nilmdb; trainola moved to nilmtools")
def test_client_06_trainola(self):
client = HTTPClient(baseurl = testurl, post_json = True)
data = { "url": "http://bucket.mit.edu/nilmdb",
"dest_stream": "/sharon/prep-a-matches",
"stream": "/sharon/prep-a",
"start": 1366111383280463,
"end": 1366126163457797,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 },
{ "name": "P3", "index": 2 } ],
"exemplars": [
{ "name": "Boiler Pump ON",
"url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366260494269078,
"end": 1366260608185031,
"dest_column": 0,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
},
{ "name": "Boiler Pump OFF",
"url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366260864215764,
"end": 1366260870882998,
"dest_column": 1,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
}
]
}
pid = client.post("run/code", { "code": "import nilmtools.trainola\n" +
"nilmtools.trainola.main()",
"args": [ data ] })
while True:
status = client.get("process/status", { "pid": pid, "clear": 1 })
sys.stdout.write(status["log"])
sys.stdout.flush()
if status["alive"] == False:
break
time.sleep(0.1)
status = client.post("process/remove", { "pid": pid })
os._exit(int(status["exitcode"]))
def test_client_07_run_code(self):
client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("process/list"), [])
def do(code, args, kill):
if args is not None:
pid = client.post("run/code", { "code": code, "args": args } )
else:
pid = client.post("run/code", { "code": code } )
eq_(client.get("process/list"), [pid])
if kill:
return self.wait_kill(client, pid)
return self.wait_end(client, pid)
# basic code snippet
code = textwrap.dedent("""
print 'hello'
def foo(arg):
print 'world'
""")
status = do(code, [], False)
eq_("hello\n", status["log"])
eq_(status["exitcode"], 0)
# compile error
code = textwrap.dedent("""
def foo(arg:
print 'hello'
""")
status = do(code, [], False)
in_("SyntaxError", status["log"])
eq_(status["exitcode"], 1)
# traceback in user code should be formatted nicely
code = textwrap.dedent("""
def foo(arg):
raise Exception(arg)
foo(123)
""")
status = do(code, [], False)
cleaned_log = re.sub('File "[^"]*",', 'File "",', status["log"])
eq_('Traceback (most recent call last):\n' +
' File "", line 4, in <module>\n' +
' foo(123)\n' +
' File "", line 3, in foo\n' +
' raise Exception(arg)\n' +
'Exception: 123\n', cleaned_log)
eq_(status["exitcode"], 1)
# argument handling (strings come in as unicode)
code = textwrap.dedent("""
import sys
print sys.argv[1].encode('ascii'), sys.argv[2]
sys.exit(0) # also test raising SystemExit
""")
with assert_raises(ClientError) as e:
do(code, ["hello", 123], False)
in_("400 Bad Request", str(e.exception))
status = do(code, ["hello", "123"], False)
eq_(status["log"], "hello 123\n")
eq_(status["exitcode"], 0)
# try killing a long-running process
code = textwrap.dedent("""
import time
print 'hello'
time.sleep(60)
print 'world'
""")
status = do(code, [], True)
eq_(status["log"], "hello\n")
ne_(status["exitcode"], 0)
# default arguments are empty
code = textwrap.dedent("""
import sys
print 'args:', len(sys.argv[1:])
""")
status = do(code, None, False)
eq_(status["log"], "args: 0\n")
eq_(status["exitcode"], 0)
def test_client_08_bad_types(self):
client = HTTPClient(baseurl = testurl, post_json = True)
with assert_raises(ClientError) as e:
client.post("run/code", { "code": "asdf", "args": "qwer" })
in_("must be a list", str(e.exception))
with assert_raises(ClientError) as e:
client.post("run/command", { "argv": "asdf" })
in_("must be a list", str(e.exception))
def test_client_09_info(self):
client = HTTPClient(baseurl = testurl, post_json = True)
# start some processes
a = client.post("run/command", { "argv": ["sleep","60"] } )
b = client.post("run/command", { "argv": ["sh","-c","sleep 2;true"] } )
c = client.post("run/command", { "argv": ["sh","-c","burnP5;true"] } )
d = client.post("run/command", { "argv": ["burnP5" ] } )
info = client.get("process/info")
eq_(info["pids"][a]["procs"], 1)
eq_(info["pids"][b]["procs"], 2)
eq_(info["pids"][c]["procs"], 2)
eq_(info["pids"][d]["procs"], 1)
eq_(info["total"]["procs"], 6)
lt_(info["pids"][a]["cpu_percent"], 50)
lt_(20, info["pids"][c]["cpu_percent"])
lt_(80, info["system"]["cpu_percent"])
time.sleep(2)
info = client.get("process/info")
eq_(info["pids"][b]["procs"], 0)
# kill all processes
for pid in client.get("process/list"):
client.post("process/remove", { "pid": pid })
def test_client_10_unicode(self):
client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("process/list"), [])
def verify(cmd, result):
pid = client.post("run/command", { "argv": [ "sh", "-c", cmd ] })
eq_(client.get("process/list"), [pid])
status = self.wait_end(client, pid)
eq_(result, status["log"])
# Unicode should work
verify("echo -n hello", "hello")
verify(u"echo -n ☠", u"")
verify("echo -ne \\\\xe2\\\\x98\\\\xa0", u"")
# Programs that spit out invalid UTF-8 should get replacement
# markers
verify("echo -ne \\\\xae", u"\ufffd")

View File

@@ -0,0 +1 @@
# empty

41
tests/testutil/helpers.py Normal file
View File

@@ -0,0 +1,41 @@
# Just some helpers for test functions
def myrepr(x):
if isinstance(x, basestring):
return '"' + x + '"'
else:
return repr(x)
def eq_(a, b):
if not a == b:
raise AssertionError("%s != %s" % (myrepr(a), myrepr(b)))
def lt_(a, b):
if not a < b:
raise AssertionError("%s is not less than %s" % (myrepr(a), myrepr(b)))
def in_(a, b):
if a not in b:
raise AssertionError("%s not in %s" % (myrepr(a), myrepr(b)))
def nin_(a, b):
if a in b:
raise AssertionError("unexpected %s in %s" % (myrepr(a), myrepr(b)))
def in2_(a1, a2, b):
if a1 not in b and a2 not in b:
raise AssertionError("(%s or %s) not in %s" % (myrepr(a1), myrepr(a2),
myrepr(b)))
def ne_(a, b):
if not a != b:
raise AssertionError("unexpected %s == %s" % (myrepr(a), myrepr(b)))
def lines_(a, n):
l = a.count('\n')
if not l == n:
if len(a) > 5000:
a = a[0:5000] + " ... truncated"
raise AssertionError("wanted %d lines, got %d in output: '%s'"
% (n, l, a))