nilmrun/tests/test_nilmrun.py

410 lines
14 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
import nilmrun.server
from nilmdb.client.httpclient import HTTPClient, ClientError, ServerError
from nilmdb.utils.printf import *
from nose.plugins.skip import SkipTest
from nose.tools import *
from nose.tools import assert_raises
import itertools
import distutils.version
import os
import sys
import threading
import cStringIO
import simplejson as json
import unittest
import warnings
import time
import re
import urllib2
from urllib2 import urlopen, HTTPError
import requests
import pprint
2013-07-07 20:18:52 -04:00
import textwrap
from testutil.helpers import *
testurl = "http://localhost:32181/"
2013-07-11 11:46:02 -04:00
#testurl = "http://bucket.mit.edu/nilmrun/"
def setup_module():
global test_server
# Start web app on a custom port
test_server = nilmrun.server.Server(host = "127.0.0.1",
port = 32181,
force_traceback = True)
test_server.start(blocking = False)
def teardown_module():
global test_server
# Close web app
test_server.stop()
class TestClient(object):
2013-07-07 20:18:52 -04:00
def wait_kill(self, client, pid, timeout = 1):
time.sleep(timeout)
2013-07-11 11:46:02 -04:00
status = client.get("process/status", { "pid": pid })
2013-07-07 20:18:52 -04:00
if not status["alive"]:
raise AssertionError("died before we could kill it")
2013-07-11 11:46:02 -04:00
status = client.post("process/remove", { "pid": pid })
2013-07-07 20:18:52 -04:00
if status["alive"]:
raise AssertionError("didn't get killed")
return status
def wait_end(self, client, pid, timeout = 5, remove = True):
start = time.time()
status = None
while (time.time() - start) < timeout:
2013-07-11 11:46:02 -04:00
status = client.get("process/status", { "pid": pid })
if status["alive"] == False:
2013-07-07 20:18:52 -04:00
break
time.sleep(0.1)
2013-07-07 20:18:52 -04:00
else:
raise AssertionError("process " + str(pid) + " didn't die in " +
str(timeout) + " seconds: " + repr(status))
if remove:
2013-07-11 11:46:02 -04:00
status = client.post("process/remove", { "pid": pid })
2013-07-07 20:18:52 -04:00
return status
def test_client_01_basic(self):
client = HTTPClient(baseurl = testurl)
2013-07-11 11:46:02 -04:00
version = client.get("version")
eq_(distutils.version.LooseVersion(version),
distutils.version.LooseVersion(nilmrun.__version__))
2013-07-11 11:46:02 -04:00
in_("This is NilmRun", client.get(""))
with assert_raises(ClientError):
2013-07-11 11:46:02 -04:00
client.get("favicon.ico")
def test_client_02_manager(self):
client = HTTPClient(baseurl = testurl)
2013-07-11 11:46:02 -04:00
eq_(client.get("process/list"), [])
with assert_raises(ClientError) as e:
2013-07-11 11:46:02 -04:00
client.get("process/status", { "pid": 12345 })
in_("No such PID", str(e.exception))
with assert_raises(ClientError):
2013-07-11 11:46:02 -04:00
client.get("process/remove", { "pid": 12345 })
in_("No such PID", str(e.exception))
def test_client_03_run_command(self):
client = HTTPClient(baseurl = testurl, post_json = True)
2013-07-11 11:46:02 -04:00
eq_(client.get("process/list"), [])
def do(argv, kill):
2013-07-11 11:46:02 -04:00
pid = client.post("run/command", { "argv": argv } )
eq_(client.get("process/list"), [pid])
if kill:
return self.wait_kill(client, pid)
return self.wait_end(client, pid)
# Simple command
status = do(["pwd"], False)
eq_(status["exitcode"], 0)
eq_("/tmp\n", status["log"])
# Command with args
status = do(["expr", "1", "+", "2"], False)
eq_(status["exitcode"], 0)
eq_("3\n", status["log"])
# Missing command
with assert_raises(ClientError) as e:
do(["/no-such-command-blah-blah"], False)
in_("No such file or directory", str(e.exception))
# Kill a slow command
status = do(["sleep", "60"], True)
ne_(status["exitcode"], 0)
def _run_testfilter(self, client, args):
code = textwrap.dedent("""
import nilmrun.testfilter
import simplejson as json
import sys
nilmrun.testfilter.test(json.loads(sys.argv[1]))
""")
jsonargs = json.dumps(args)
2013-07-11 11:46:02 -04:00
return client.post("run/code", { "code": code, "args": [ jsonargs ] })
def test_client_04_process_basic(self):
client = HTTPClient(baseurl = testurl, post_json = True)
# start dummy filter
pid = self._run_testfilter(client, 30)
2013-07-11 11:46:02 -04:00
eq_(client.get("process/list"), [pid])
time.sleep(1)
# Verify that status looks OK
2013-07-11 11:46:02 -04:00
status = client.get("process/status", { "pid": pid, "clear": True })
2013-07-21 19:49:15 -04:00
for x in [ "pid", "alive", "exitcode", "start_time", "log" ]:
in_(x, status)
in_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
eq_(status["alive"], True)
eq_(status["exitcode"], None)
# Check that the log got cleared
2013-07-11 11:46:02 -04:00
status = client.get("process/status", { "pid": pid })
nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
# See that it ended properly
2013-07-07 20:18:52 -04:00
status = self.wait_end(client, pid, remove = False)
in_("dummy 27\ndummy 28\ndummy 29\n", status["log"])
eq_(status["exitcode"], 0)
# Remove it
2013-07-11 11:46:02 -04:00
killstatus = client.post("process/remove", { "pid": pid })
eq_(status, killstatus)
2013-07-11 11:46:02 -04:00
eq_(client.get("process/list"), [])
with assert_raises(ClientError) as e:
2013-07-11 11:46:02 -04:00
client.post("process/remove", { "pid": pid })
in_("No such PID", str(e.exception))
def test_client_05_process_terminate(self):
client = HTTPClient(baseurl = testurl, post_json = True)
# Trigger exception in filter
pid = self._run_testfilter(client, -1)
time.sleep(0.5)
2013-07-11 11:46:02 -04:00
status = client.get("process/status", { "pid": pid })
eq_(status["alive"], False)
eq_(status["exitcode"], 1)
in_("Exception: test exception", status["log"])
2013-07-11 11:46:02 -04:00
client.post("process/remove", { "pid": pid })
# Kill a running filter by removing it early
newpid = self._run_testfilter(client, 50)
ne_(newpid, pid)
time.sleep(0.5)
start = time.time()
2013-07-11 11:46:02 -04:00
status = client.post("process/remove", { "pid": newpid })
elapsed = time.time() - start
# Should have died in slightly over 1 second
assert(0.5 < elapsed < 2)
eq_(status["alive"], False)
ne_(status["exitcode"], 0)
# No more
2013-07-11 11:46:02 -04:00
eq_(client.get("process/list"), [])
# Try to remove a running filter that ignored SIGTERM
pid = self._run_testfilter(client, 0)
start = time.time()
2013-07-11 11:46:02 -04:00
status = client.post("process/remove", { "pid": pid })
elapsed = time.time() - start
# Should have died in slightly over 2 seconds
assert(1.5 < elapsed < 3)
eq_(status["alive"], False)
2013-07-06 16:03:18 -04:00
ne_(status["exitcode"], 0)
2013-07-08 11:33:27 -04:00
@unittest.skip("needs a running nilmdb; trainola moved to nilmtools")
def test_client_06_trainola(self):
client = HTTPClient(baseurl = testurl, post_json = True)
data = { "url": "http://bucket.mit.edu/nilmdb",
2013-07-10 11:35:13 -04:00
"dest_stream": "/sharon/prep-a-matches",
"stream": "/sharon/prep-a",
"start": 1366111383280463,
"end": 1366126163457797,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 },
{ "name": "P3", "index": 2 } ],
"exemplars": [
{ "name": "Boiler Pump ON",
"url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366260494269078,
"end": 1366260608185031,
2013-07-10 11:35:13 -04:00
"dest_column": 0,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
},
{ "name": "Boiler Pump OFF",
"url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366260864215764,
"end": 1366260870882998,
2013-07-10 11:35:13 -04:00
"dest_column": 1,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
}
]
}
2013-07-11 11:46:02 -04:00
pid = client.post("run/code", { "code": "import nilmtools.trainola\n" +
2013-07-10 11:35:13 -04:00
"nilmtools.trainola.main()",
"args": [ data ] })
while True:
2013-07-11 11:46:02 -04:00
status = client.get("process/status", { "pid": pid, "clear": 1 })
2013-07-10 11:35:13 -04:00
sys.stdout.write(status["log"])
sys.stdout.flush()
if status["alive"] == False:
break
time.sleep(0.1)
2013-07-11 11:46:02 -04:00
status = client.post("process/remove", { "pid": pid })
2013-07-10 11:35:13 -04:00
os._exit(int(status["exitcode"]))
2013-07-10 11:35:13 -04:00
def test_client_07_run_code(self):
2013-07-07 20:18:52 -04:00
client = HTTPClient(baseurl = testurl, post_json = True)
2013-07-11 11:46:02 -04:00
eq_(client.get("process/list"), [])
2013-07-07 20:18:52 -04:00
def do(code, args, kill):
2013-07-21 19:49:30 -04:00
if args is not None:
pid = client.post("run/code", { "code": code, "args": args } )
else:
pid = client.post("run/code", { "code": code } )
2013-07-11 11:46:02 -04:00
eq_(client.get("process/list"), [pid])
2013-07-07 20:18:52 -04:00
if kill:
return self.wait_kill(client, pid)
return self.wait_end(client, pid)
# basic code snippet
code = textwrap.dedent("""
2013-07-07 20:18:52 -04:00
print 'hello'
def foo(arg):
print 'world'
""")
status = do(code, [], False)
eq_("hello\n", status["log"])
eq_(status["exitcode"], 0)
# compile error
code = textwrap.dedent("""
2013-07-07 20:18:52 -04:00
def foo(arg:
print 'hello'
""")
status = do(code, [], False)
in_("SyntaxError", status["log"])
eq_(status["exitcode"], 1)
# traceback in user code should be formatted nicely
code = textwrap.dedent("""
2013-07-07 20:18:52 -04:00
def foo(arg):
raise Exception(arg)
foo(123)
""")
status = do(code, [], False)
cleaned_log = re.sub('File "[^"]*",', 'File "",', status["log"])
2013-07-07 20:18:52 -04:00
eq_('Traceback (most recent call last):\n' +
' File "", line 4, in <module>\n' +
2013-07-07 20:18:52 -04:00
' foo(123)\n' +
' File "", line 3, in foo\n' +
2013-07-07 20:18:52 -04:00
' raise Exception(arg)\n' +
'Exception: 123\n', cleaned_log)
2013-07-07 20:18:52 -04:00
eq_(status["exitcode"], 1)
# argument handling (strings come in as unicode)
code = textwrap.dedent("""
2013-07-07 20:18:52 -04:00
import sys
print sys.argv[1].encode('ascii'), sys.argv[2]
sys.exit(0) # also test raising SystemExit
2013-07-07 20:18:52 -04:00
""")
with assert_raises(ClientError) as e:
do(code, ["hello", 123], False)
in_("400 Bad Request", str(e.exception))
status = do(code, ["hello", "123"], False)
2013-07-07 20:18:52 -04:00
eq_(status["log"], "hello 123\n")
eq_(status["exitcode"], 0)
# try killing a long-running process
code = textwrap.dedent("""
2013-07-07 20:18:52 -04:00
import time
print 'hello'
time.sleep(60)
print 'world'
""")
status = do(code, [], True)
eq_(status["log"], "hello\n")
ne_(status["exitcode"], 0)
2013-07-21 19:49:30 -04:00
# default arguments are empty
code = textwrap.dedent("""
import sys
print 'args:', len(sys.argv[1:])
""")
status = do(code, None, False)
eq_(status["log"], "args: 0\n")
eq_(status["exitcode"], 0)
def test_client_08_bad_types(self):
client = HTTPClient(baseurl = testurl, post_json = True)
with assert_raises(ClientError) as e:
2013-07-11 11:46:02 -04:00
client.post("run/code", { "code": "asdf", "args": "qwer" })
in_("must be a list", str(e.exception))
with assert_raises(ClientError) as e:
2013-07-11 11:46:02 -04:00
client.post("run/command", { "argv": "asdf" })
in_("must be a list", str(e.exception))
2013-07-17 18:12:44 -04:00
2013-07-18 11:01:27 -04:00
def test_client_09_info(self):
2013-07-17 18:12:44 -04:00
client = HTTPClient(baseurl = testurl, post_json = True)
# start some processes
a = client.post("run/command", { "argv": ["sleep","60"] } )
2013-07-17 18:19:52 -04:00
b = client.post("run/command", { "argv": ["sh","-c","sleep 2;true"] } )
2013-07-17 18:12:44 -04:00
c = client.post("run/command", { "argv": ["sh","-c","burnP5;true"] } )
d = client.post("run/command", { "argv": ["burnP5" ] } )
info = client.get("process/info")
eq_(info["pids"][a]["procs"], 1)
eq_(info["pids"][b]["procs"], 2)
eq_(info["pids"][c]["procs"], 2)
eq_(info["pids"][d]["procs"], 1)
eq_(info["total"]["procs"], 6)
lt_(info["pids"][a]["cpu_percent"], 50)
lt_(20, info["pids"][c]["cpu_percent"])
lt_(80, info["system"]["cpu_percent"])
2013-07-17 18:19:52 -04:00
time.sleep(2)
info = client.get("process/info")
eq_(info["pids"][b]["procs"], 0)
2013-07-17 18:12:44 -04:00
# kill all processes
for pid in client.get("process/list"):
2013-07-17 18:19:52 -04:00
client.post("process/remove", { "pid": pid })
def test_client_10_unicode(self):
client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("process/list"), [])
def verify(cmd, result):
pid = client.post("run/command", { "argv": [ "sh", "-c", cmd ] })
eq_(client.get("process/list"), [pid])
status = self.wait_end(client, pid)
eq_(result, status["log"])
# Unicode should work
verify("echo -n hello", "hello")
verify(u"echo -n ☠", u"")
verify("echo -ne \\\\xe2\\\\x98\\\\xa0", u"")
# Programs that spit out invalid UTF-8 should get replacement
# markers
verify("echo -ne \\\\xae", u"\ufffd")
def test_client_11_atexit(self):
# Leave a directory and running process behind, for the atexit
# handler to clean up. Here we trigger the atexit manually,
# since it's hard to trigger it as part of the test suite.
client = HTTPClient(baseurl = testurl, post_json = True)
code = textwrap.dedent("""
import time
time.sleep(10)
""")
client.post("run/code", { "code": code, "args": [ "hello"] })
# Trigger atexit function
test_server._manager._atexit()
# Ensure no processes exit
eq_(client.get("process/list"), [])