nilmrun/tests/test_nilmrun.py

435 lines
15 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
import nilmrun.server
from nilmdb.client.httpclient import HTTPClient, ClientError, ServerError
from nilmdb.utils.printf import *
from nose.plugins.skip import SkipTest
from nose.tools import *
from nose.tools import assert_raises
import itertools
import distutils.version
import os
import sys
import threading
2020-08-03 16:54:33 -04:00
import io
import json
import unittest
import warnings
import time
import re
2020-08-03 16:54:33 -04:00
import urllib.request, urllib.error, urllib.parse
from urllib.request import urlopen
from urllib.error import HTTPError
import requests
import pprint
2013-07-07 20:18:52 -04:00
import textwrap
from testutil.helpers import *
testurl = "http://localhost:32181/"
2013-07-11 11:46:02 -04:00
#testurl = "http://bucket.mit.edu/nilmrun/"
def setup_module():
global test_server
# Start web app on a custom port
test_server = nilmrun.server.Server(host = "127.0.0.1",
port = 32181,
force_traceback = True)
test_server.start(blocking = False)
def teardown_module():
global test_server
# Close web app
test_server.stop()
class TestClient(object):
2013-07-07 20:18:52 -04:00
def wait_kill(self, client, pid, timeout = 1):
time.sleep(timeout)
2013-07-11 11:46:02 -04:00
status = client.get("process/status", { "pid": pid })
2013-07-07 20:18:52 -04:00
if not status["alive"]:
raise AssertionError("died before we could kill it")
2013-07-11 11:46:02 -04:00
status = client.post("process/remove", { "pid": pid })
2013-07-07 20:18:52 -04:00
if status["alive"]:
raise AssertionError("didn't get killed")
return status
def wait_end(self, client, pid, timeout = 5, remove = True):
start = time.time()
status = None
while (time.time() - start) < timeout:
2013-07-11 11:46:02 -04:00
status = client.get("process/status", { "pid": pid })
if status["alive"] == False:
2013-07-07 20:18:52 -04:00
break
time.sleep(0.1)
2013-07-07 20:18:52 -04:00
else:
raise AssertionError("process " + str(pid) + " didn't die in " +
str(timeout) + " seconds: " + repr(status))
if remove:
2013-07-11 11:46:02 -04:00
status = client.post("process/remove", { "pid": pid })
2013-07-07 20:18:52 -04:00
return status
def test_client_01_basic(self):
client = HTTPClient(baseurl = testurl)
2013-07-11 11:46:02 -04:00
version = client.get("version")
eq_(distutils.version.LooseVersion(version),
distutils.version.LooseVersion(nilmrun.__version__))
2013-07-11 11:46:02 -04:00
in_("This is NilmRun", client.get(""))
with assert_raises(ClientError):
2013-07-11 11:46:02 -04:00
client.get("favicon.ico")
def test_client_02_manager(self):
client = HTTPClient(baseurl = testurl)
2013-07-11 11:46:02 -04:00
eq_(client.get("process/list"), [])
with assert_raises(ClientError) as e:
2013-07-11 11:46:02 -04:00
client.get("process/status", { "pid": 12345 })
in_("No such PID", str(e.exception))
with assert_raises(ClientError):
2013-07-11 11:46:02 -04:00
client.get("process/remove", { "pid": 12345 })
in_("No such PID", str(e.exception))
def test_client_03_run_command(self):
client = HTTPClient(baseurl = testurl, post_json = True)
2013-07-11 11:46:02 -04:00
eq_(client.get("process/list"), [])
def do(argv, kill):
2013-07-11 11:46:02 -04:00
pid = client.post("run/command", { "argv": argv } )
eq_(client.get("process/list"), [pid])
if kill:
return self.wait_kill(client, pid)
return self.wait_end(client, pid)
# Simple command
status = do(["pwd"], False)
eq_(status["exitcode"], 0)
eq_("/tmp\n", status["log"])
# Command with args
status = do(["expr", "1", "+", "2"], False)
eq_(status["exitcode"], 0)
eq_("3\n", status["log"])
# Missing command
with assert_raises(ClientError) as e:
do(["/no-such-command-blah-blah"], False)
in_("No such file or directory", str(e.exception))
# Kill a slow command
status = do(["sleep", "60"], True)
ne_(status["exitcode"], 0)
def _run_testfilter(self, client, args):
code = textwrap.dedent("""
from nilmdb.utils.printf import *
import time
import signal
import json
import sys
# This is just for testing the process management.
def test(n):
n = int(n)
if n < 0: # raise an exception
raise Exception("test exception")
if n == 0: # ignore SIGTERM and count to 100
n = 100
signal.signal(signal.SIGTERM, signal.SIG_IGN)
for x in range(n):
s = sprintf("dummy %d\\n", x)
if x & 1:
sys.stdout.write(s)
else:
sys.stderr.write(s)
time.sleep(0.1)
test(json.loads(sys.argv[1]))
""")
jsonargs = json.dumps(args)
2013-07-11 11:46:02 -04:00
return client.post("run/code", { "code": code, "args": [ jsonargs ] })
def test_client_04_process_basic(self):
client = HTTPClient(baseurl = testurl, post_json = True)
# start dummy filter
pid = self._run_testfilter(client, 30)
2013-07-11 11:46:02 -04:00
eq_(client.get("process/list"), [pid])
time.sleep(1)
# Verify that status looks OK
2013-07-11 11:46:02 -04:00
status = client.get("process/status", { "pid": pid, "clear": True })
2013-07-21 19:49:15 -04:00
for x in [ "pid", "alive", "exitcode", "start_time", "log" ]:
in_(x, status)
in_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
eq_(status["alive"], True)
eq_(status["exitcode"], None)
# Check that the log got cleared
2013-07-11 11:46:02 -04:00
status = client.get("process/status", { "pid": pid })
nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
# See that it ended properly
2013-07-07 20:18:52 -04:00
status = self.wait_end(client, pid, remove = False)
in_("dummy 27\ndummy 28\ndummy 29\n", status["log"])
eq_(status["exitcode"], 0)
# Remove it
2013-07-11 11:46:02 -04:00
killstatus = client.post("process/remove", { "pid": pid })
eq_(status, killstatus)
2013-07-11 11:46:02 -04:00
eq_(client.get("process/list"), [])
with assert_raises(ClientError) as e:
2013-07-11 11:46:02 -04:00
client.post("process/remove", { "pid": pid })
in_("No such PID", str(e.exception))
def test_client_05_process_terminate(self):
client = HTTPClient(baseurl = testurl, post_json = True)
# Trigger exception in filter
pid = self._run_testfilter(client, -1)
time.sleep(0.5)
2013-07-11 11:46:02 -04:00
status = client.get("process/status", { "pid": pid })
eq_(status["alive"], False)
eq_(status["exitcode"], 1)
in_("Exception: test exception", status["log"])
2013-07-11 11:46:02 -04:00
client.post("process/remove", { "pid": pid })
# Kill a running filter by removing it early
newpid = self._run_testfilter(client, 50)
ne_(newpid, pid)
time.sleep(0.5)
start = time.time()
2013-07-11 11:46:02 -04:00
status = client.post("process/remove", { "pid": newpid })
elapsed = time.time() - start
# Should have died in slightly over 1 second
assert(0.5 < elapsed < 2)
eq_(status["alive"], False)
ne_(status["exitcode"], 0)
# No more
2013-07-11 11:46:02 -04:00
eq_(client.get("process/list"), [])
# Try to remove a running filter that ignored SIGTERM
pid = self._run_testfilter(client, 0)
start = time.time()
2013-07-11 11:46:02 -04:00
status = client.post("process/remove", { "pid": pid })
elapsed = time.time() - start
# Should have died in slightly over 2 seconds
assert(1.5 < elapsed < 3)
eq_(status["alive"], False)
2013-07-06 16:03:18 -04:00
ne_(status["exitcode"], 0)
2013-07-08 11:33:27 -04:00
@unittest.skip("needs a running nilmdb; trainola moved to nilmtools")
def test_client_06_trainola(self):
client = HTTPClient(baseurl = testurl, post_json = True)
data = { "url": "http://bucket.mit.edu/nilmdb",
2013-07-10 11:35:13 -04:00
"dest_stream": "/sharon/prep-a-matches",
"stream": "/sharon/prep-a",
"start": 1366111383280463,
"end": 1366126163457797,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 },
{ "name": "P3", "index": 2 } ],
"exemplars": [
{ "name": "Boiler Pump ON",
"url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366260494269078,
"end": 1366260608185031,
2013-07-10 11:35:13 -04:00
"dest_column": 0,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
},
{ "name": "Boiler Pump OFF",
"url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366260864215764,
"end": 1366260870882998,
2013-07-10 11:35:13 -04:00
"dest_column": 1,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
}
]
}
2013-07-11 11:46:02 -04:00
pid = client.post("run/code", { "code": "import nilmtools.trainola\n" +
2013-07-10 11:35:13 -04:00
"nilmtools.trainola.main()",
"args": [ data ] })
while True:
2013-07-11 11:46:02 -04:00
status = client.get("process/status", { "pid": pid, "clear": 1 })
2013-07-10 11:35:13 -04:00
sys.stdout.write(status["log"])
sys.stdout.flush()
if status["alive"] == False:
break
time.sleep(0.1)
2013-07-11 11:46:02 -04:00
status = client.post("process/remove", { "pid": pid })
2013-07-10 11:35:13 -04:00
os._exit(int(status["exitcode"]))
2013-07-10 11:35:13 -04:00
def test_client_07_run_code(self):
2013-07-07 20:18:52 -04:00
client = HTTPClient(baseurl = testurl, post_json = True)
2013-07-11 11:46:02 -04:00
eq_(client.get("process/list"), [])
2013-07-07 20:18:52 -04:00
def do(code, args, kill):
2013-07-21 19:49:30 -04:00
if args is not None:
pid = client.post("run/code", { "code": code, "args": args } )
else:
pid = client.post("run/code", { "code": code } )
2013-07-11 11:46:02 -04:00
eq_(client.get("process/list"), [pid])
2013-07-07 20:18:52 -04:00
if kill:
return self.wait_kill(client, pid)
return self.wait_end(client, pid)
# basic code snippet
code = textwrap.dedent("""
2020-08-03 23:06:38 -04:00
print('hello')
2013-07-07 20:18:52 -04:00
def foo(arg):
2020-08-03 23:06:38 -04:00
print('world')
2013-07-07 20:18:52 -04:00
""")
status = do(code, [], False)
eq_("hello\n", status["log"])
eq_(status["exitcode"], 0)
# compile error
code = textwrap.dedent("""
2013-07-07 20:18:52 -04:00
def foo(arg:
2020-08-03 23:06:38 -04:00
print('hello')
2013-07-07 20:18:52 -04:00
""")
status = do(code, [], False)
in_("SyntaxError", status["log"])
eq_(status["exitcode"], 1)
# traceback in user code should be formatted nicely
code = textwrap.dedent("""
2013-07-07 20:18:52 -04:00
def foo(arg):
raise Exception(arg)
foo(123)
""")
status = do(code, [], False)
cleaned_log = re.sub('File "[^"]*",', 'File "",', status["log"])
2013-07-07 20:18:52 -04:00
eq_('Traceback (most recent call last):\n' +
' File "", line 4, in <module>\n' +
2013-07-07 20:18:52 -04:00
' foo(123)\n' +
' File "", line 3, in foo\n' +
2013-07-07 20:18:52 -04:00
' raise Exception(arg)\n' +
'Exception: 123\n', cleaned_log)
2013-07-07 20:18:52 -04:00
eq_(status["exitcode"], 1)
# argument handling (strings come in as unicode)
code = textwrap.dedent("""
2013-07-07 20:18:52 -04:00
import sys
2020-08-03 23:06:38 -04:00
print(sys.argv[1], sys.argv[2])
sys.exit(0) # also test raising SystemExit
2013-07-07 20:18:52 -04:00
""")
with assert_raises(ClientError) as e:
2020-08-03 16:52:02 -04:00
do(code, ["hello", 123], False)
in_("400 Bad Request", str(e.exception))
status = do(code, ["hello", "123"], False)
2013-07-07 20:18:52 -04:00
eq_(status["log"], "hello 123\n")
eq_(status["exitcode"], 0)
# try killing a long-running process
code = textwrap.dedent("""
2013-07-07 20:18:52 -04:00
import time
2020-08-03 23:06:38 -04:00
print('hello')
2013-07-07 20:18:52 -04:00
time.sleep(60)
2020-08-03 23:06:38 -04:00
print('world')
2013-07-07 20:18:52 -04:00
""")
status = do(code, [], True)
eq_(status["log"], "hello\n")
ne_(status["exitcode"], 0)
2013-07-21 19:49:30 -04:00
# default arguments are empty
code = textwrap.dedent("""
import sys
2020-08-03 23:06:38 -04:00
print('args:', len(sys.argv[1:]))
2013-07-21 19:49:30 -04:00
""")
status = do(code, None, False)
eq_(status["log"], "args: 0\n")
eq_(status["exitcode"], 0)
def test_client_08_bad_types(self):
client = HTTPClient(baseurl = testurl, post_json = True)
with assert_raises(ClientError) as e:
2013-07-11 11:46:02 -04:00
client.post("run/code", { "code": "asdf", "args": "qwer" })
in_("must be a list", str(e.exception))
with assert_raises(ClientError) as e:
2013-07-11 11:46:02 -04:00
client.post("run/command", { "argv": "asdf" })
in_("must be a list", str(e.exception))
2013-07-17 18:12:44 -04:00
2013-07-18 11:01:27 -04:00
def test_client_09_info(self):
2013-07-17 18:12:44 -04:00
client = HTTPClient(baseurl = testurl, post_json = True)
# start some processes
a = client.post("run/command", { "argv": ["sleep","60"] } )
2013-07-17 18:19:52 -04:00
b = client.post("run/command", { "argv": ["sh","-c","sleep 2;true"] } )
2013-08-08 17:47:06 -04:00
c = client.post("run/command", { "argv": [
"sh","-c","dd if=/dev/zero of=/dev/null;true"] } )
d = client.post("run/command", { "argv": [
"dd", "if=/dev/zero", "of=/dev/null" ] } )
2013-07-17 18:12:44 -04:00
info = client.get("process/info")
eq_(info["pids"][a]["procs"], 1)
eq_(info["pids"][b]["procs"], 2)
eq_(info["pids"][c]["procs"], 2)
eq_(info["pids"][d]["procs"], 1)
eq_(info["total"]["procs"], 6)
lt_(info["pids"][a]["cpu_percent"], 50)
lt_(20, info["pids"][c]["cpu_percent"])
lt_(80, info["system"]["cpu_percent"])
2013-08-07 12:32:21 -04:00
for x in range(10):
time.sleep(1)
info = client.get("process/info")
if info["pids"][b]["procs"] != 2:
break
else:
raise Exception("process B didn't die: " + str(info["pids"][b]))
2013-07-17 18:19:52 -04:00
2013-07-17 18:12:44 -04:00
# kill all processes
for pid in client.get("process/list"):
2013-07-17 18:19:52 -04:00
client.post("process/remove", { "pid": pid })
def test_client_10_unicode(self):
client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("process/list"), [])
def verify(cmd, result):
2013-08-08 17:47:06 -04:00
pid = client.post("run/command", { "argv": [
"/bin/bash", "-c", cmd ] })
eq_(client.get("process/list"), [pid])
status = self.wait_end(client, pid)
eq_(result, status["log"])
# Unicode should work
verify("echo -n hello", "hello")
2020-08-03 16:54:33 -04:00
verify("echo -n ☠", "")
verify("echo -ne \\\\xe2\\\\x98\\\\xa0", "")
# Programs that spit out invalid UTF-8 should get replacement
# markers
2020-08-03 16:54:33 -04:00
verify("echo -ne \\\\xae", "\ufffd")
def test_client_11_atexit(self):
# Leave a directory and running process behind, for the atexit
# handler to clean up. Here we trigger the atexit manually,
# since it's hard to trigger it as part of the test suite.
client = HTTPClient(baseurl = testurl, post_json = True)
code = textwrap.dedent("""
import time
time.sleep(10)
""")
client.post("run/code", { "code": code, "args": [ "hello"] })
# Trigger atexit function
test_server._manager._atexit()
# Ensure no processes exit
eq_(client.get("process/list"), [])