# -*- coding: utf-8 -*- import nilmrun.server from nilmdb.client.httpclient import HTTPClient, ClientError, ServerError from nilmdb.utils.printf import * from nose.plugins.skip import SkipTest from nose.tools import * from nose.tools import assert_raises import itertools import distutils.version import os import sys import threading import io import json import unittest import warnings import time import re import urllib.request, urllib.error, urllib.parse from urllib.request import urlopen from urllib.error import HTTPError import requests import pprint import textwrap from testutil.helpers import * testurl = "http://localhost:32181/" #testurl = "http://bucket.mit.edu/nilmrun/" def setup_module(): global test_server # Start web app on a custom port test_server = nilmrun.server.Server(host = "127.0.0.1", port = 32181, force_traceback = True) test_server.start(blocking = False) def teardown_module(): global test_server # Close web app test_server.stop() class TestClient(object): def wait_kill(self, client, pid, timeout = 1): time.sleep(timeout) status = client.get("process/status", { "pid": pid }) if not status["alive"]: raise AssertionError("died before we could kill it") status = client.post("process/remove", { "pid": pid }) if status["alive"]: raise AssertionError("didn't get killed") return status def wait_end(self, client, pid, timeout = 5, remove = True): start = time.time() status = None while (time.time() - start) < timeout: status = client.get("process/status", { "pid": pid }) if status["alive"] == False: break time.sleep(0.1) else: raise AssertionError("process " + str(pid) + " didn't die in " + str(timeout) + " seconds: " + repr(status)) if remove: status = client.post("process/remove", { "pid": pid }) return status def test_client_01_basic(self): client = HTTPClient(baseurl = testurl) version = client.get("version") eq_(distutils.version.LooseVersion(version), distutils.version.LooseVersion(nilmrun.__version__)) in_("This is NilmRun", client.get("")) with assert_raises(ClientError): client.get("favicon.ico") def test_client_02_manager(self): client = HTTPClient(baseurl = testurl) eq_(client.get("process/list"), []) with assert_raises(ClientError) as e: client.get("process/status", { "pid": 12345 }) in_("No such PID", str(e.exception)) with assert_raises(ClientError): client.get("process/remove", { "pid": 12345 }) in_("No such PID", str(e.exception)) def test_client_03_run_command(self): client = HTTPClient(baseurl = testurl, post_json = True) eq_(client.get("process/list"), []) def do(argv, kill): pid = client.post("run/command", { "argv": argv } ) eq_(client.get("process/list"), [pid]) if kill: return self.wait_kill(client, pid) return self.wait_end(client, pid) # Simple command status = do(["pwd"], False) eq_(status["exitcode"], 0) eq_("/tmp\n", status["log"]) # Command with args status = do(["expr", "1", "+", "2"], False) eq_(status["exitcode"], 0) eq_("3\n", status["log"]) # Missing command with assert_raises(ClientError) as e: do(["/no-such-command-blah-blah"], False) in_("No such file or directory", str(e.exception)) # Kill a slow command status = do(["sleep", "60"], True) ne_(status["exitcode"], 0) def _run_testfilter(self, client, args): code = textwrap.dedent(""" from nilmdb.utils.printf import * import time import signal import json import sys # This is just for testing the process management. def test(n): n = int(n) if n < 0: # raise an exception raise Exception("test exception") if n == 0: # ignore SIGTERM and count to 100 n = 100 signal.signal(signal.SIGTERM, signal.SIG_IGN) for x in range(n): s = sprintf("dummy %d\\n", x) if x & 1: sys.stdout.write(s) else: sys.stderr.write(s) time.sleep(0.1) test(json.loads(sys.argv[1])) """) jsonargs = json.dumps(args) return client.post("run/code", { "code": code, "args": [ jsonargs ] }) def test_client_04_process_basic(self): client = HTTPClient(baseurl = testurl, post_json = True) # start dummy filter pid = self._run_testfilter(client, 30) eq_(client.get("process/list"), [pid]) time.sleep(1) # Verify that status looks OK status = client.get("process/status", { "pid": pid, "clear": True }) for x in [ "pid", "alive", "exitcode", "start_time", "log" ]: in_(x, status) in_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"]) eq_(status["alive"], True) eq_(status["exitcode"], None) # Check that the log got cleared status = client.get("process/status", { "pid": pid }) nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"]) # See that it ended properly status = self.wait_end(client, pid, remove = False) in_("dummy 27\ndummy 28\ndummy 29\n", status["log"]) eq_(status["exitcode"], 0) # Remove it killstatus = client.post("process/remove", { "pid": pid }) eq_(status, killstatus) eq_(client.get("process/list"), []) with assert_raises(ClientError) as e: client.post("process/remove", { "pid": pid }) in_("No such PID", str(e.exception)) def test_client_05_process_terminate(self): client = HTTPClient(baseurl = testurl, post_json = True) # Trigger exception in filter pid = self._run_testfilter(client, -1) time.sleep(0.5) status = client.get("process/status", { "pid": pid }) eq_(status["alive"], False) eq_(status["exitcode"], 1) in_("Exception: test exception", status["log"]) client.post("process/remove", { "pid": pid }) # Kill a running filter by removing it early newpid = self._run_testfilter(client, 50) ne_(newpid, pid) time.sleep(0.5) start = time.time() status = client.post("process/remove", { "pid": newpid }) elapsed = time.time() - start # Should have died in slightly over 1 second assert(0.5 < elapsed < 2) eq_(status["alive"], False) ne_(status["exitcode"], 0) # No more eq_(client.get("process/list"), []) # Try to remove a running filter that ignored SIGTERM pid = self._run_testfilter(client, 0) start = time.time() status = client.post("process/remove", { "pid": pid }) elapsed = time.time() - start # Should have died in slightly over 2 seconds assert(1.5 < elapsed < 3) eq_(status["alive"], False) ne_(status["exitcode"], 0) @unittest.skip("needs a running nilmdb; trainola moved to nilmtools") def test_client_06_trainola(self): client = HTTPClient(baseurl = testurl, post_json = True) data = { "url": "http://bucket.mit.edu/nilmdb", "dest_stream": "/sharon/prep-a-matches", "stream": "/sharon/prep-a", "start": 1366111383280463, "end": 1366126163457797, "columns": [ { "name": "P1", "index": 0 }, { "name": "Q1", "index": 1 }, { "name": "P3", "index": 2 } ], "exemplars": [ { "name": "Boiler Pump ON", "url": "http://bucket.mit.edu/nilmdb", "stream": "/sharon/prep-a", "start": 1366260494269078, "end": 1366260608185031, "dest_column": 0, "columns": [ { "name": "P1", "index": 0 }, { "name": "Q1", "index": 1 } ] }, { "name": "Boiler Pump OFF", "url": "http://bucket.mit.edu/nilmdb", "stream": "/sharon/prep-a", "start": 1366260864215764, "end": 1366260870882998, "dest_column": 1, "columns": [ { "name": "P1", "index": 0 }, { "name": "Q1", "index": 1 } ] } ] } pid = client.post("run/code", { "code": "import nilmtools.trainola\n" + "nilmtools.trainola.main()", "args": [ data ] }) while True: status = client.get("process/status", { "pid": pid, "clear": 1 }) sys.stdout.write(status["log"]) sys.stdout.flush() if status["alive"] == False: break time.sleep(0.1) status = client.post("process/remove", { "pid": pid }) os._exit(int(status["exitcode"])) def test_client_07_run_code(self): client = HTTPClient(baseurl = testurl, post_json = True) eq_(client.get("process/list"), []) def do(code, args, kill): if args is not None: pid = client.post("run/code", { "code": code, "args": args } ) else: pid = client.post("run/code", { "code": code } ) eq_(client.get("process/list"), [pid]) if kill: return self.wait_kill(client, pid) return self.wait_end(client, pid) # basic code snippet code = textwrap.dedent(""" print('hello') def foo(arg): print('world') """) status = do(code, [], False) eq_("hello\n", status["log"]) eq_(status["exitcode"], 0) # compile error code = textwrap.dedent(""" def foo(arg: print('hello') """) status = do(code, [], False) in_("SyntaxError", status["log"]) eq_(status["exitcode"], 1) # traceback in user code should be formatted nicely code = textwrap.dedent(""" def foo(arg): raise Exception(arg) foo(123) """) status = do(code, [], False) cleaned_log = re.sub('File "[^"]*",', 'File "",', status["log"]) eq_('Traceback (most recent call last):\n' + ' File "", line 4, in \n' + ' foo(123)\n' + ' File "", line 3, in foo\n' + ' raise Exception(arg)\n' + 'Exception: 123\n', cleaned_log) eq_(status["exitcode"], 1) # argument handling (strings come in as unicode) code = textwrap.dedent(""" import sys print(sys.argv[1], sys.argv[2]) sys.exit(0) # also test raising SystemExit """) with assert_raises(ClientError) as e: do(code, ["hello", 123], False) in_("400 Bad Request", str(e.exception)) status = do(code, ["hello", "123"], False) eq_(status["log"], "hello 123\n") eq_(status["exitcode"], 0) # try killing a long-running process code = textwrap.dedent(""" import time print('hello') time.sleep(60) print('world') """) status = do(code, [], True) eq_(status["log"], "hello\n") ne_(status["exitcode"], 0) # default arguments are empty code = textwrap.dedent(""" import sys print('args:', len(sys.argv[1:])) """) status = do(code, None, False) eq_(status["log"], "args: 0\n") eq_(status["exitcode"], 0) def test_client_08_bad_types(self): client = HTTPClient(baseurl = testurl, post_json = True) with assert_raises(ClientError) as e: client.post("run/code", { "code": "asdf", "args": "qwer" }) in_("must be a list", str(e.exception)) with assert_raises(ClientError) as e: client.post("run/command", { "argv": "asdf" }) in_("must be a list", str(e.exception)) def test_client_09_info(self): client = HTTPClient(baseurl = testurl, post_json = True) # start some processes a = client.post("run/command", { "argv": ["sleep","60"] } ) b = client.post("run/command", { "argv": ["sh","-c","sleep 2;true"] } ) c = client.post("run/command", { "argv": [ "sh","-c","dd if=/dev/zero of=/dev/null;true"] } ) d = client.post("run/command", { "argv": [ "dd", "if=/dev/zero", "of=/dev/null" ] } ) info = client.get("process/info") eq_(info["pids"][a]["procs"], 1) eq_(info["pids"][b]["procs"], 2) eq_(info["pids"][c]["procs"], 2) eq_(info["pids"][d]["procs"], 1) eq_(info["total"]["procs"], 6) lt_(info["pids"][a]["cpu_percent"], 50) lt_(20, info["pids"][c]["cpu_percent"]) lt_(80, info["system"]["cpu_percent"]) for x in range(10): time.sleep(1) info = client.get("process/info") if info["pids"][b]["procs"] != 2: break else: raise Exception("process B didn't die: " + str(info["pids"][b])) # kill all processes for pid in client.get("process/list"): client.post("process/remove", { "pid": pid }) def test_client_10_unicode(self): client = HTTPClient(baseurl = testurl, post_json = True) eq_(client.get("process/list"), []) def verify(cmd, result): pid = client.post("run/command", { "argv": [ "/bin/bash", "-c", cmd ] }) eq_(client.get("process/list"), [pid]) status = self.wait_end(client, pid) eq_(result, status["log"]) # Unicode should work verify("echo -n hello", "hello") verify("echo -n ☠", "☠") verify("echo -ne \\\\xe2\\\\x98\\\\xa0", "☠") # Programs that spit out invalid UTF-8 should get replacement # markers verify("echo -ne \\\\xae", "\ufffd") def test_client_11_atexit(self): # Leave a directory and running process behind, for the atexit # handler to clean up. Here we trigger the atexit manually, # since it's hard to trigger it as part of the test suite. client = HTTPClient(baseurl = testurl, post_json = True) code = textwrap.dedent(""" import time time.sleep(10) """) client.post("run/code", { "code": code, "args": [ "hello"] }) # Trigger atexit function test_server._manager._atexit() # Ensure no processes exit eq_(client.get("process/list"), [])