435 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			435 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| import nilmrun.server
 | |
| 
 | |
| from nilmdb.client.httpclient import HTTPClient, ClientError, ServerError
 | |
| 
 | |
| from nilmdb.utils.printf import *
 | |
| 
 | |
| from nose.plugins.skip import SkipTest
 | |
| from nose.tools import *
 | |
| from nose.tools import assert_raises
 | |
| 
 | |
| import itertools
 | |
| import distutils.version
 | |
| import os
 | |
| import sys
 | |
| import threading
 | |
| import io
 | |
| import json
 | |
| import unittest
 | |
| import warnings
 | |
| import time
 | |
| import re
 | |
| import urllib.request, urllib.error, urllib.parse
 | |
| from urllib.request import urlopen
 | |
| from urllib.error import HTTPError
 | |
| import requests
 | |
| import pprint
 | |
| import textwrap
 | |
| 
 | |
| from testutil.helpers import *
 | |
| 
 | |
| testurl = "http://localhost:32181/"
 | |
| #testurl = "http://bucket.mit.edu/nilmrun/"
 | |
| 
 | |
| def setup_module():
 | |
|     global test_server
 | |
| 
 | |
|     # Start web app on a custom port
 | |
|     test_server = nilmrun.server.Server(host = "127.0.0.1",
 | |
|                                         port = 32181,
 | |
|                                         force_traceback = True)
 | |
|     test_server.start(blocking = False)
 | |
| 
 | |
| def teardown_module():
 | |
|     global test_server
 | |
|     # Close web app
 | |
|     test_server.stop()
 | |
| 
 | |
| class TestClient(object):
 | |
| 
 | |
|     def wait_kill(self, client, pid, timeout = 1):
 | |
|         time.sleep(timeout)
 | |
|         status = client.get("process/status", { "pid": pid })
 | |
|         if not status["alive"]:
 | |
|             raise AssertionError("died before we could kill it")
 | |
|         status = client.post("process/remove", { "pid": pid })
 | |
|         if status["alive"]:
 | |
|             raise AssertionError("didn't get killed")
 | |
|         return status
 | |
| 
 | |
|     def wait_end(self, client, pid, timeout = 5, remove = True):
 | |
|         start = time.time()
 | |
|         status = None
 | |
|         while (time.time() - start) < timeout:
 | |
|             status = client.get("process/status", { "pid": pid })
 | |
|             if status["alive"] == False:
 | |
|                 break
 | |
|             time.sleep(0.1)
 | |
|         else:
 | |
|             raise AssertionError("process " + str(pid) + " didn't die in " +
 | |
|                                  str(timeout) + " seconds: " + repr(status))
 | |
|         if remove:
 | |
|             status = client.post("process/remove", { "pid": pid })
 | |
|         return status
 | |
| 
 | |
|     def test_client_01_basic(self):
 | |
|         client = HTTPClient(baseurl = testurl)
 | |
|         version = client.get("version")
 | |
|         eq_(distutils.version.LooseVersion(version),
 | |
|             distutils.version.LooseVersion(nilmrun.__version__))
 | |
| 
 | |
|         in_("This is NilmRun", client.get(""))
 | |
| 
 | |
|         with assert_raises(ClientError):
 | |
|             client.get("favicon.ico")
 | |
| 
 | |
|     def test_client_02_manager(self):
 | |
|         client = HTTPClient(baseurl = testurl)
 | |
| 
 | |
|         eq_(client.get("process/list"), [])
 | |
| 
 | |
|         with assert_raises(ClientError) as e:
 | |
|             client.get("process/status", { "pid": 12345 })
 | |
|         in_("No such PID", str(e.exception))
 | |
|         with assert_raises(ClientError):
 | |
|             client.get("process/remove", { "pid": 12345 })
 | |
|         in_("No such PID", str(e.exception))
 | |
| 
 | |
|     def test_client_03_run_command(self):
 | |
|         client = HTTPClient(baseurl = testurl, post_json = True)
 | |
|         eq_(client.get("process/list"), [])
 | |
| 
 | |
|         def do(argv, kill):
 | |
|             pid = client.post("run/command", { "argv": argv } )
 | |
|             eq_(client.get("process/list"), [pid])
 | |
|             if kill:
 | |
|                 return self.wait_kill(client, pid)
 | |
|             return self.wait_end(client, pid)
 | |
| 
 | |
|         # Simple command
 | |
|         status = do(["pwd"], False)
 | |
|         eq_(status["exitcode"], 0)
 | |
|         eq_("/tmp\n", status["log"])
 | |
| 
 | |
|         # Command with args
 | |
|         status = do(["expr", "1", "+", "2"], False)
 | |
|         eq_(status["exitcode"], 0)
 | |
|         eq_("3\n", status["log"])
 | |
| 
 | |
|         # Missing command
 | |
|         with assert_raises(ClientError) as e:
 | |
|             do(["/no-such-command-blah-blah"], False)
 | |
|         in_("No such file or directory", str(e.exception))
 | |
| 
 | |
|         # Kill a slow command
 | |
|         status = do(["sleep", "60"], True)
 | |
|         ne_(status["exitcode"], 0)
 | |
| 
 | |
|     def _run_testfilter(self, client, args):
 | |
|         code = textwrap.dedent("""
 | |
|         from nilmdb.utils.printf import *
 | |
|         import time
 | |
|         import signal
 | |
|         import json
 | |
|         import sys
 | |
|         # This is just for testing the process management.
 | |
|         def test(n):
 | |
|             n = int(n)
 | |
|             if n < 0: # raise an exception
 | |
|                 raise Exception("test exception")
 | |
|             if n == 0: # ignore SIGTERM and count to 100
 | |
|                 n = 100
 | |
|                 signal.signal(signal.SIGTERM, signal.SIG_IGN)
 | |
|             for x in range(n):
 | |
|                 s = sprintf("dummy %d\\n", x)
 | |
|                 if x & 1:
 | |
|                     sys.stdout.write(s)
 | |
|                 else:
 | |
|                     sys.stderr.write(s)
 | |
|                 time.sleep(0.1)
 | |
|         test(json.loads(sys.argv[1]))
 | |
|         """)
 | |
|         jsonargs = json.dumps(args)
 | |
|         return client.post("run/code", { "code": code, "args": [ jsonargs ] })
 | |
| 
 | |
|     def test_client_04_process_basic(self):
 | |
|         client = HTTPClient(baseurl = testurl, post_json = True)
 | |
| 
 | |
|         # start dummy filter
 | |
|         pid = self._run_testfilter(client, 30)
 | |
|         eq_(client.get("process/list"), [pid])
 | |
|         time.sleep(1)
 | |
| 
 | |
|         # Verify that status looks OK
 | |
|         status = client.get("process/status", { "pid": pid, "clear": True })
 | |
|         for x in [ "pid", "alive", "exitcode", "start_time", "log" ]:
 | |
|             in_(x, status)
 | |
|         in_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
 | |
|         eq_(status["alive"], True)
 | |
|         eq_(status["exitcode"], None)
 | |
| 
 | |
|         # Check that the log got cleared
 | |
|         status = client.get("process/status", { "pid": pid })
 | |
|         nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
 | |
| 
 | |
|         # See that it ended properly
 | |
|         status = self.wait_end(client, pid, remove = False)
 | |
|         in_("dummy 27\ndummy 28\ndummy 29\n", status["log"])
 | |
|         eq_(status["exitcode"], 0)
 | |
| 
 | |
|         # Remove it
 | |
|         killstatus = client.post("process/remove", { "pid": pid })
 | |
|         eq_(status, killstatus)
 | |
|         eq_(client.get("process/list"), [])
 | |
|         with assert_raises(ClientError) as e:
 | |
|             client.post("process/remove", { "pid": pid })
 | |
|         in_("No such PID", str(e.exception))
 | |
| 
 | |
|     def test_client_05_process_terminate(self):
 | |
|         client = HTTPClient(baseurl = testurl, post_json = True)
 | |
| 
 | |
|         # Trigger exception in filter
 | |
|         pid = self._run_testfilter(client, -1)
 | |
|         time.sleep(0.5)
 | |
|         status = client.get("process/status", { "pid": pid })
 | |
|         eq_(status["alive"], False)
 | |
|         eq_(status["exitcode"], 1)
 | |
|         in_("Exception: test exception", status["log"])
 | |
|         client.post("process/remove", { "pid": pid })
 | |
| 
 | |
|         # Kill a running filter by removing it early
 | |
|         newpid = self._run_testfilter(client, 50)
 | |
|         ne_(newpid, pid)
 | |
|         time.sleep(0.5)
 | |
|         start = time.time()
 | |
|         status = client.post("process/remove", { "pid": newpid })
 | |
|         elapsed = time.time() - start
 | |
|         # Should have died in slightly over 1 second
 | |
|         assert(0.5 < elapsed < 2)
 | |
|         eq_(status["alive"], False)
 | |
|         ne_(status["exitcode"], 0)
 | |
| 
 | |
|         # No more
 | |
|         eq_(client.get("process/list"), [])
 | |
| 
 | |
|         # Try to remove a running filter that ignored SIGTERM
 | |
|         pid = self._run_testfilter(client, 0)
 | |
|         start = time.time()
 | |
|         status = client.post("process/remove", { "pid": pid })
 | |
|         elapsed = time.time() - start
 | |
|         # Should have died in slightly over 2 seconds
 | |
|         assert(1.5 < elapsed < 3)
 | |
|         eq_(status["alive"], False)
 | |
|         ne_(status["exitcode"], 0)
 | |
| 
 | |
|     @unittest.skip("needs a running nilmdb; trainola moved to nilmtools")
 | |
|     def test_client_06_trainola(self):
 | |
|         client = HTTPClient(baseurl = testurl, post_json = True)
 | |
|         data = { "url": "http://bucket.mit.edu/nilmdb",
 | |
|                  "dest_stream": "/sharon/prep-a-matches",
 | |
|                  "stream": "/sharon/prep-a",
 | |
|                  "start": 1366111383280463,
 | |
|                  "end": 1366126163457797,
 | |
|                  "columns": [ { "name": "P1", "index": 0 },
 | |
|                               { "name": "Q1", "index": 1 },
 | |
|                               { "name": "P3", "index": 2 } ],
 | |
|                  "exemplars": [
 | |
|                      { "name": "Boiler Pump ON",
 | |
|                        "url": "http://bucket.mit.edu/nilmdb",
 | |
|                        "stream": "/sharon/prep-a",
 | |
|                        "start": 1366260494269078,
 | |
|                        "end": 1366260608185031,
 | |
|                        "dest_column": 0,
 | |
|                        "columns": [ { "name": "P1", "index": 0 },
 | |
|                                     { "name": "Q1", "index": 1 }
 | |
|                                     ]
 | |
|                        },
 | |
|                      { "name": "Boiler Pump OFF",
 | |
|                        "url": "http://bucket.mit.edu/nilmdb",
 | |
|                        "stream": "/sharon/prep-a",
 | |
|                        "start": 1366260864215764,
 | |
|                        "end": 1366260870882998,
 | |
|                        "dest_column": 1,
 | |
|                        "columns": [ { "name": "P1", "index": 0 },
 | |
|                                     { "name": "Q1", "index": 1 }
 | |
|                                     ]
 | |
|                        }
 | |
|                      ]
 | |
|                  }
 | |
|         pid = client.post("run/code", { "code": "import nilmtools.trainola\n" +
 | |
|                                          "nilmtools.trainola.main()",
 | |
|                                          "args": [ data ] })
 | |
|         while True:
 | |
|             status = client.get("process/status", { "pid": pid, "clear": 1 })
 | |
|             sys.stdout.write(status["log"])
 | |
|             sys.stdout.flush()
 | |
|             if status["alive"] == False:
 | |
|                 break
 | |
|             time.sleep(0.1)
 | |
|         status = client.post("process/remove", { "pid": pid })
 | |
|         os._exit(int(status["exitcode"]))
 | |
| 
 | |
|     def test_client_07_run_code(self):
 | |
|         client = HTTPClient(baseurl = testurl, post_json = True)
 | |
|         eq_(client.get("process/list"), [])
 | |
| 
 | |
|         def do(code, args, kill):
 | |
|             if args is not None:
 | |
|                 pid = client.post("run/code", { "code": code, "args": args } )
 | |
|             else:
 | |
|                 pid = client.post("run/code", { "code": code } )
 | |
|             eq_(client.get("process/list"), [pid])
 | |
|             if kill:
 | |
|                 return self.wait_kill(client, pid)
 | |
|             return self.wait_end(client, pid)
 | |
| 
 | |
|         # basic code snippet
 | |
|         code = textwrap.dedent("""
 | |
|         print('hello')
 | |
|         def foo(arg):
 | |
|             print('world')
 | |
|         """)
 | |
|         status = do(code, [], False)
 | |
|         eq_("hello\n", status["log"])
 | |
|         eq_(status["exitcode"], 0)
 | |
| 
 | |
|         # compile error
 | |
|         code = textwrap.dedent("""
 | |
|         def foo(arg:
 | |
|             print('hello')
 | |
|         """)
 | |
|         status = do(code, [], False)
 | |
|         in_("SyntaxError", status["log"])
 | |
|         eq_(status["exitcode"], 1)
 | |
| 
 | |
|         # traceback in user code should be formatted nicely
 | |
|         code = textwrap.dedent("""
 | |
|         def foo(arg):
 | |
|             raise Exception(arg)
 | |
|         foo(123)
 | |
|         """)
 | |
|         status = do(code, [], False)
 | |
|         cleaned_log = re.sub('File "[^"]*",', 'File "",', status["log"])
 | |
|         eq_('Traceback (most recent call last):\n' +
 | |
|         '  File "", line 4, in <module>\n' +
 | |
|         '    foo(123)\n' +
 | |
|         '  File "", line 3, in foo\n' +
 | |
|         '    raise Exception(arg)\n' +
 | |
|         'Exception: 123\n', cleaned_log)
 | |
|         eq_(status["exitcode"], 1)
 | |
| 
 | |
|         # argument handling (strings come in as unicode)
 | |
|         code = textwrap.dedent("""
 | |
|         import sys
 | |
|         print(sys.argv[1], sys.argv[2])
 | |
|         sys.exit(0)  # also test raising SystemExit
 | |
|         """)
 | |
|         with assert_raises(ClientError) as e:
 | |
|             do(code, ["hello", 123], False)
 | |
|         in_("400 Bad Request", str(e.exception))
 | |
|         status = do(code, ["hello", "123"], False)
 | |
|         eq_(status["log"], "hello 123\n")
 | |
|         eq_(status["exitcode"], 0)
 | |
| 
 | |
|         # try killing a long-running process
 | |
|         code = textwrap.dedent("""
 | |
|         import time
 | |
|         print('hello')
 | |
|         time.sleep(60)
 | |
|         print('world')
 | |
|         """)
 | |
|         status = do(code, [], True)
 | |
|         eq_(status["log"], "hello\n")
 | |
|         ne_(status["exitcode"], 0)
 | |
| 
 | |
|         # default arguments are empty
 | |
|         code = textwrap.dedent("""
 | |
|         import sys
 | |
|         print('args:', len(sys.argv[1:]))
 | |
|         """)
 | |
|         status = do(code, None, False)
 | |
|         eq_(status["log"], "args: 0\n")
 | |
|         eq_(status["exitcode"], 0)
 | |
| 
 | |
|     def test_client_08_bad_types(self):
 | |
|         client = HTTPClient(baseurl = testurl, post_json = True)
 | |
| 
 | |
|         with assert_raises(ClientError) as e:
 | |
|             client.post("run/code", { "code": "asdf", "args": "qwer" })
 | |
|         in_("must be a list", str(e.exception))
 | |
| 
 | |
|         with assert_raises(ClientError) as e:
 | |
|             client.post("run/command", { "argv": "asdf" })
 | |
|         in_("must be a list", str(e.exception))
 | |
| 
 | |
|     def test_client_09_info(self):
 | |
|         client = HTTPClient(baseurl = testurl, post_json = True)
 | |
| 
 | |
|         # start some processes
 | |
|         a = client.post("run/command", { "argv": ["sleep","60"] } )
 | |
|         b = client.post("run/command", { "argv": ["sh","-c","sleep 2;true"] } )
 | |
|         c = client.post("run/command", { "argv": [
 | |
|             "sh","-c","dd if=/dev/zero of=/dev/null;true"] } )
 | |
|         d = client.post("run/command", { "argv": [
 | |
|             "dd", "if=/dev/zero", "of=/dev/null" ] } )
 | |
| 
 | |
|         info = client.get("process/info")
 | |
|         eq_(info["pids"][a]["procs"], 1)
 | |
|         eq_(info["pids"][b]["procs"], 2)
 | |
|         eq_(info["pids"][c]["procs"], 2)
 | |
|         eq_(info["pids"][d]["procs"], 1)
 | |
|         eq_(info["total"]["procs"], 6)
 | |
|         lt_(info["pids"][a]["cpu_percent"], 50)
 | |
|         lt_(20, info["pids"][c]["cpu_percent"])
 | |
|         lt_(80, info["system"]["cpu_percent"])
 | |
| 
 | |
|         for x in range(10):
 | |
|             time.sleep(1)
 | |
|             info = client.get("process/info")
 | |
|             if info["pids"][b]["procs"] != 2:
 | |
|                 break
 | |
|         else:
 | |
|             raise Exception("process B didn't die: " + str(info["pids"][b]))
 | |
| 
 | |
|         # kill all processes
 | |
|         for pid in client.get("process/list"):
 | |
|             client.post("process/remove", { "pid": pid })
 | |
| 
 | |
|     def test_client_10_unicode(self):
 | |
|         client = HTTPClient(baseurl = testurl, post_json = True)
 | |
|         eq_(client.get("process/list"), [])
 | |
|         def verify(cmd, result):
 | |
|             pid = client.post("run/command", { "argv": [
 | |
|                 "/bin/bash", "-c", cmd ] })
 | |
|             eq_(client.get("process/list"), [pid])
 | |
|             status = self.wait_end(client, pid)
 | |
|             eq_(result, status["log"])
 | |
| 
 | |
|         # Unicode should work
 | |
|         verify("echo -n hello", "hello")
 | |
|         verify("echo -n ☠", "☠")
 | |
|         verify("echo -ne \\\\xe2\\\\x98\\\\xa0", "☠")
 | |
| 
 | |
|         # Programs that spit out invalid UTF-8 should get replacement
 | |
|         # markers
 | |
|         verify("echo -ne \\\\xae", "\ufffd")
 | |
| 
 | |
|     def test_client_11_atexit(self):
 | |
|         # Leave a directory and running process behind, for the atexit
 | |
|         # handler to clean up.  Here we trigger the atexit manually,
 | |
|         # since it's hard to trigger it as part of the test suite.
 | |
|         client = HTTPClient(baseurl = testurl, post_json = True)
 | |
|         code = textwrap.dedent("""
 | |
|         import time
 | |
|         time.sleep(10)
 | |
|         """)
 | |
|         client.post("run/code", { "code": code, "args": [ "hello"] })
 | |
| 
 | |
|         # Trigger atexit function
 | |
|         test_server._manager._atexit()
 | |
| 
 | |
|         # Ensure no processes exit
 | |
|         eq_(client.get("process/list"), [])
 |