|
- # -*- coding: utf-8 -*-
-
- import nilmrun.server
-
- from nilmdb.client.httpclient import HTTPClient, ClientError, ServerError
-
- from nilmdb.utils.printf import *
-
- from nose.plugins.skip import SkipTest
- from nose.tools import *
- from nose.tools import assert_raises
-
- import itertools
- import distutils.version
- import os
- import sys
- import threading
- import io
- import json
- import unittest
- import warnings
- import time
- import re
- import urllib.request, urllib.error, urllib.parse
- from urllib.request import urlopen
- from urllib.error import HTTPError
- import requests
- import pprint
- import textwrap
-
- from testutil.helpers import *
-
- testurl = "http://localhost:32181/"
- #testurl = "http://bucket.mit.edu/nilmrun/"
-
- def setup_module():
- global test_server
-
- # Start web app on a custom port
- test_server = nilmrun.server.Server(host = "127.0.0.1",
- port = 32181,
- force_traceback = True)
- test_server.start(blocking = False)
-
- def teardown_module():
- global test_server
- # Close web app
- test_server.stop()
-
- class TestClient(object):
-
- def wait_kill(self, client, pid, timeout = 1):
- time.sleep(timeout)
- status = client.get("process/status", { "pid": pid })
- if not status["alive"]:
- raise AssertionError("died before we could kill it")
- status = client.post("process/remove", { "pid": pid })
- if status["alive"]:
- raise AssertionError("didn't get killed")
- return status
-
- def wait_end(self, client, pid, timeout = 5, remove = True):
- start = time.time()
- status = None
- while (time.time() - start) < timeout:
- status = client.get("process/status", { "pid": pid })
- if status["alive"] == False:
- break
- time.sleep(0.1)
- else:
- raise AssertionError("process " + str(pid) + " didn't die in " +
- str(timeout) + " seconds: " + repr(status))
- if remove:
- status = client.post("process/remove", { "pid": pid })
- return status
-
- def test_client_01_basic(self):
- client = HTTPClient(baseurl = testurl)
- version = client.get("version")
- eq_(distutils.version.LooseVersion(version),
- distutils.version.LooseVersion(nilmrun.__version__))
-
- in_("This is NilmRun", client.get(""))
-
- with assert_raises(ClientError):
- client.get("favicon.ico")
-
- def test_client_02_manager(self):
- client = HTTPClient(baseurl = testurl)
-
- eq_(client.get("process/list"), [])
-
- with assert_raises(ClientError) as e:
- client.get("process/status", { "pid": 12345 })
- in_("No such PID", str(e.exception))
- with assert_raises(ClientError):
- client.get("process/remove", { "pid": 12345 })
- in_("No such PID", str(e.exception))
-
- def test_client_03_run_command(self):
- client = HTTPClient(baseurl = testurl, post_json = True)
- eq_(client.get("process/list"), [])
-
- def do(argv, kill):
- pid = client.post("run/command", { "argv": argv } )
- eq_(client.get("process/list"), [pid])
- if kill:
- return self.wait_kill(client, pid)
- return self.wait_end(client, pid)
-
- # Simple command
- status = do(["pwd"], False)
- eq_(status["exitcode"], 0)
- eq_("/tmp\n", status["log"])
-
- # Command with args
- status = do(["expr", "1", "+", "2"], False)
- eq_(status["exitcode"], 0)
- eq_("3\n", status["log"])
-
- # Missing command
- with assert_raises(ClientError) as e:
- do(["/no-such-command-blah-blah"], False)
- in_("No such file or directory", str(e.exception))
-
- # Kill a slow command
- status = do(["sleep", "60"], True)
- ne_(status["exitcode"], 0)
-
- def _run_testfilter(self, client, args):
- code = textwrap.dedent("""
- from nilmdb.utils.printf import *
- import time
- import signal
- import json
- import sys
- # This is just for testing the process management.
- def test(n):
- n = int(n)
- if n < 0: # raise an exception
- raise Exception("test exception")
- if n == 0: # ignore SIGTERM and count to 100
- n = 100
- signal.signal(signal.SIGTERM, signal.SIG_IGN)
- for x in range(n):
- s = sprintf("dummy %d\\n", x)
- if x & 1:
- sys.stdout.write(s)
- else:
- sys.stderr.write(s)
- time.sleep(0.1)
- test(json.loads(sys.argv[1]))
- """)
- jsonargs = json.dumps(args)
- return client.post("run/code", { "code": code, "args": [ jsonargs ] })
-
- def test_client_04_process_basic(self):
- client = HTTPClient(baseurl = testurl, post_json = True)
-
- # start dummy filter
- pid = self._run_testfilter(client, 30)
- eq_(client.get("process/list"), [pid])
- time.sleep(1)
-
- # Verify that status looks OK
- status = client.get("process/status", { "pid": pid, "clear": True })
- for x in [ "pid", "alive", "exitcode", "start_time", "log" ]:
- in_(x, status)
- in_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
- eq_(status["alive"], True)
- eq_(status["exitcode"], None)
-
- # Check that the log got cleared
- status = client.get("process/status", { "pid": pid })
- nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
-
- # See that it ended properly
- status = self.wait_end(client, pid, remove = False)
- in_("dummy 27\ndummy 28\ndummy 29\n", status["log"])
- eq_(status["exitcode"], 0)
-
- # Remove it
- killstatus = client.post("process/remove", { "pid": pid })
- eq_(status, killstatus)
- eq_(client.get("process/list"), [])
- with assert_raises(ClientError) as e:
- client.post("process/remove", { "pid": pid })
- in_("No such PID", str(e.exception))
-
- def test_client_05_process_terminate(self):
- client = HTTPClient(baseurl = testurl, post_json = True)
-
- # Trigger exception in filter
- pid = self._run_testfilter(client, -1)
- time.sleep(0.5)
- status = client.get("process/status", { "pid": pid })
- eq_(status["alive"], False)
- eq_(status["exitcode"], 1)
- in_("Exception: test exception", status["log"])
- client.post("process/remove", { "pid": pid })
-
- # Kill a running filter by removing it early
- newpid = self._run_testfilter(client, 50)
- ne_(newpid, pid)
- time.sleep(0.5)
- start = time.time()
- status = client.post("process/remove", { "pid": newpid })
- elapsed = time.time() - start
- # Should have died in slightly over 1 second
- assert(0.5 < elapsed < 2)
- eq_(status["alive"], False)
- ne_(status["exitcode"], 0)
-
- # No more
- eq_(client.get("process/list"), [])
-
- # Try to remove a running filter that ignored SIGTERM
- pid = self._run_testfilter(client, 0)
- start = time.time()
- status = client.post("process/remove", { "pid": pid })
- elapsed = time.time() - start
- # Should have died in slightly over 2 seconds
- assert(1.5 < elapsed < 3)
- eq_(status["alive"], False)
- ne_(status["exitcode"], 0)
-
- @unittest.skip("needs a running nilmdb; trainola moved to nilmtools")
- def test_client_06_trainola(self):
- client = HTTPClient(baseurl = testurl, post_json = True)
- data = { "url": "http://bucket.mit.edu/nilmdb",
- "dest_stream": "/sharon/prep-a-matches",
- "stream": "/sharon/prep-a",
- "start": 1366111383280463,
- "end": 1366126163457797,
- "columns": [ { "name": "P1", "index": 0 },
- { "name": "Q1", "index": 1 },
- { "name": "P3", "index": 2 } ],
- "exemplars": [
- { "name": "Boiler Pump ON",
- "url": "http://bucket.mit.edu/nilmdb",
- "stream": "/sharon/prep-a",
- "start": 1366260494269078,
- "end": 1366260608185031,
- "dest_column": 0,
- "columns": [ { "name": "P1", "index": 0 },
- { "name": "Q1", "index": 1 }
- ]
- },
- { "name": "Boiler Pump OFF",
- "url": "http://bucket.mit.edu/nilmdb",
- "stream": "/sharon/prep-a",
- "start": 1366260864215764,
- "end": 1366260870882998,
- "dest_column": 1,
- "columns": [ { "name": "P1", "index": 0 },
- { "name": "Q1", "index": 1 }
- ]
- }
- ]
- }
- pid = client.post("run/code", { "code": "import nilmtools.trainola\n" +
- "nilmtools.trainola.main()",
- "args": [ data ] })
- while True:
- status = client.get("process/status", { "pid": pid, "clear": 1 })
- sys.stdout.write(status["log"])
- sys.stdout.flush()
- if status["alive"] == False:
- break
- time.sleep(0.1)
- status = client.post("process/remove", { "pid": pid })
- os._exit(int(status["exitcode"]))
-
- def test_client_07_run_code(self):
- client = HTTPClient(baseurl = testurl, post_json = True)
- eq_(client.get("process/list"), [])
-
- def do(code, args, kill):
- if args is not None:
- pid = client.post("run/code", { "code": code, "args": args } )
- else:
- pid = client.post("run/code", { "code": code } )
- eq_(client.get("process/list"), [pid])
- if kill:
- return self.wait_kill(client, pid)
- return self.wait_end(client, pid)
-
- # basic code snippet
- code = textwrap.dedent("""
- print('hello')
- def foo(arg):
- print('world')
- """)
- status = do(code, [], False)
- eq_("hello\n", status["log"])
- eq_(status["exitcode"], 0)
-
- # compile error
- code = textwrap.dedent("""
- def foo(arg:
- print('hello')
- """)
- status = do(code, [], False)
- in_("SyntaxError", status["log"])
- eq_(status["exitcode"], 1)
-
- # traceback in user code should be formatted nicely
- code = textwrap.dedent("""
- def foo(arg):
- raise Exception(arg)
- foo(123)
- """)
- status = do(code, [], False)
- cleaned_log = re.sub('File "[^"]*",', 'File "",', status["log"])
- eq_('Traceback (most recent call last):\n' +
- ' File "", line 4, in <module>\n' +
- ' foo(123)\n' +
- ' File "", line 3, in foo\n' +
- ' raise Exception(arg)\n' +
- 'Exception: 123\n', cleaned_log)
- eq_(status["exitcode"], 1)
-
- # argument handling (strings come in as unicode)
- code = textwrap.dedent("""
- import sys
- print(sys.argv[1], sys.argv[2])
- sys.exit(0) # also test raising SystemExit
- """)
- with assert_raises(ClientError) as e:
- do(code, ["hello", 123], False)
- in_("400 Bad Request", str(e.exception))
- status = do(code, ["hello", "123"], False)
- eq_(status["log"], "hello 123\n")
- eq_(status["exitcode"], 0)
-
- # try killing a long-running process
- code = textwrap.dedent("""
- import time
- print('hello')
- time.sleep(60)
- print('world')
- """)
- status = do(code, [], True)
- eq_(status["log"], "hello\n")
- ne_(status["exitcode"], 0)
-
- # default arguments are empty
- code = textwrap.dedent("""
- import sys
- print('args:', len(sys.argv[1:]))
- """)
- status = do(code, None, False)
- eq_(status["log"], "args: 0\n")
- eq_(status["exitcode"], 0)
-
- def test_client_08_bad_types(self):
- client = HTTPClient(baseurl = testurl, post_json = True)
-
- with assert_raises(ClientError) as e:
- client.post("run/code", { "code": "asdf", "args": "qwer" })
- in_("must be a list", str(e.exception))
-
- with assert_raises(ClientError) as e:
- client.post("run/command", { "argv": "asdf" })
- in_("must be a list", str(e.exception))
-
- def test_client_09_info(self):
- client = HTTPClient(baseurl = testurl, post_json = True)
-
- # start some processes
- a = client.post("run/command", { "argv": ["sleep","60"] } )
- b = client.post("run/command", { "argv": ["sh","-c","sleep 2;true"] } )
- c = client.post("run/command", { "argv": [
- "sh","-c","dd if=/dev/zero of=/dev/null;true"] } )
- d = client.post("run/command", { "argv": [
- "dd", "if=/dev/zero", "of=/dev/null" ] } )
-
- info = client.get("process/info")
- eq_(info["pids"][a]["procs"], 1)
- eq_(info["pids"][b]["procs"], 2)
- eq_(info["pids"][c]["procs"], 2)
- eq_(info["pids"][d]["procs"], 1)
- eq_(info["total"]["procs"], 6)
- lt_(info["pids"][a]["cpu_percent"], 50)
- lt_(20, info["pids"][c]["cpu_percent"])
- lt_(80, info["system"]["cpu_percent"])
-
- for x in range(10):
- time.sleep(1)
- info = client.get("process/info")
- if info["pids"][b]["procs"] != 2:
- break
- else:
- raise Exception("process B didn't die: " + str(info["pids"][b]))
-
- # kill all processes
- for pid in client.get("process/list"):
- client.post("process/remove", { "pid": pid })
-
- def test_client_10_unicode(self):
- client = HTTPClient(baseurl = testurl, post_json = True)
- eq_(client.get("process/list"), [])
- def verify(cmd, result):
- pid = client.post("run/command", { "argv": [
- "/bin/bash", "-c", cmd ] })
- eq_(client.get("process/list"), [pid])
- status = self.wait_end(client, pid)
- eq_(result, status["log"])
-
- # Unicode should work
- verify("echo -n hello", "hello")
- verify("echo -n ☠", "☠")
- verify("echo -ne \\\\xe2\\\\x98\\\\xa0", "☠")
-
- # Programs that spit out invalid UTF-8 should get replacement
- # markers
- verify("echo -ne \\\\xae", "\ufffd")
-
- def test_client_11_atexit(self):
- # Leave a directory and running process behind, for the atexit
- # handler to clean up. Here we trigger the atexit manually,
- # since it's hard to trigger it as part of the test suite.
- client = HTTPClient(baseurl = testurl, post_json = True)
- code = textwrap.dedent("""
- import time
- time.sleep(10)
- """)
- client.post("run/code", { "code": code, "args": [ "hello"] })
-
- # Trigger atexit function
- test_server._manager._atexit()
-
- # Ensure no processes exit
- eq_(client.get("process/list"), [])
|