You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

435 lines
15 KiB

  1. # -*- coding: utf-8 -*-
  2. import nilmrun.server
  3. from nilmdb.client.httpclient import HTTPClient, ClientError, ServerError
  4. from nilmdb.utils.printf import *
  5. from nose.plugins.skip import SkipTest
  6. from nose.tools import *
  7. from nose.tools import assert_raises
  8. import itertools
  9. import distutils.version
  10. import os
  11. import sys
  12. import threading
  13. import io
  14. import json
  15. import unittest
  16. import warnings
  17. import time
  18. import re
  19. import urllib.request, urllib.error, urllib.parse
  20. from urllib.request import urlopen
  21. from urllib.error import HTTPError
  22. import requests
  23. import pprint
  24. import textwrap
  25. from testutil.helpers import *
  26. testurl = "http://localhost:32181/"
  27. #testurl = "http://bucket.mit.edu/nilmrun/"
  28. def setup_module():
  29. global test_server
  30. # Start web app on a custom port
  31. test_server = nilmrun.server.Server(host = "127.0.0.1",
  32. port = 32181,
  33. force_traceback = True)
  34. test_server.start(blocking = False)
  35. def teardown_module():
  36. global test_server
  37. # Close web app
  38. test_server.stop()
  39. class TestClient(object):
  40. def wait_kill(self, client, pid, timeout = 1):
  41. time.sleep(timeout)
  42. status = client.get("process/status", { "pid": pid })
  43. if not status["alive"]:
  44. raise AssertionError("died before we could kill it")
  45. status = client.post("process/remove", { "pid": pid })
  46. if status["alive"]:
  47. raise AssertionError("didn't get killed")
  48. return status
  49. def wait_end(self, client, pid, timeout = 5, remove = True):
  50. start = time.time()
  51. status = None
  52. while (time.time() - start) < timeout:
  53. status = client.get("process/status", { "pid": pid })
  54. if status["alive"] == False:
  55. break
  56. time.sleep(0.1)
  57. else:
  58. raise AssertionError("process " + str(pid) + " didn't die in " +
  59. str(timeout) + " seconds: " + repr(status))
  60. if remove:
  61. status = client.post("process/remove", { "pid": pid })
  62. return status
  63. def test_client_01_basic(self):
  64. client = HTTPClient(baseurl = testurl)
  65. version = client.get("version")
  66. eq_(distutils.version.LooseVersion(version),
  67. distutils.version.LooseVersion(nilmrun.__version__))
  68. in_("This is NilmRun", client.get(""))
  69. with assert_raises(ClientError):
  70. client.get("favicon.ico")
  71. def test_client_02_manager(self):
  72. client = HTTPClient(baseurl = testurl)
  73. eq_(client.get("process/list"), [])
  74. with assert_raises(ClientError) as e:
  75. client.get("process/status", { "pid": 12345 })
  76. in_("No such PID", str(e.exception))
  77. with assert_raises(ClientError):
  78. client.get("process/remove", { "pid": 12345 })
  79. in_("No such PID", str(e.exception))
  80. def test_client_03_run_command(self):
  81. client = HTTPClient(baseurl = testurl, post_json = True)
  82. eq_(client.get("process/list"), [])
  83. def do(argv, kill):
  84. pid = client.post("run/command", { "argv": argv } )
  85. eq_(client.get("process/list"), [pid])
  86. if kill:
  87. return self.wait_kill(client, pid)
  88. return self.wait_end(client, pid)
  89. # Simple command
  90. status = do(["pwd"], False)
  91. eq_(status["exitcode"], 0)
  92. eq_("/tmp\n", status["log"])
  93. # Command with args
  94. status = do(["expr", "1", "+", "2"], False)
  95. eq_(status["exitcode"], 0)
  96. eq_("3\n", status["log"])
  97. # Missing command
  98. with assert_raises(ClientError) as e:
  99. do(["/no-such-command-blah-blah"], False)
  100. in_("No such file or directory", str(e.exception))
  101. # Kill a slow command
  102. status = do(["sleep", "60"], True)
  103. ne_(status["exitcode"], 0)
  104. def _run_testfilter(self, client, args):
  105. code = textwrap.dedent("""
  106. from nilmdb.utils.printf import *
  107. import time
  108. import signal
  109. import json
  110. import sys
  111. # This is just for testing the process management.
  112. def test(n):
  113. n = int(n)
  114. if n < 0: # raise an exception
  115. raise Exception("test exception")
  116. if n == 0: # ignore SIGTERM and count to 100
  117. n = 100
  118. signal.signal(signal.SIGTERM, signal.SIG_IGN)
  119. for x in range(n):
  120. s = sprintf("dummy %d\\n", x)
  121. if x & 1:
  122. sys.stdout.write(s)
  123. else:
  124. sys.stderr.write(s)
  125. time.sleep(0.1)
  126. test(json.loads(sys.argv[1]))
  127. """)
  128. jsonargs = json.dumps(args)
  129. return client.post("run/code", { "code": code, "args": [ jsonargs ] })
  130. def test_client_04_process_basic(self):
  131. client = HTTPClient(baseurl = testurl, post_json = True)
  132. # start dummy filter
  133. pid = self._run_testfilter(client, 30)
  134. eq_(client.get("process/list"), [pid])
  135. time.sleep(1)
  136. # Verify that status looks OK
  137. status = client.get("process/status", { "pid": pid, "clear": True })
  138. for x in [ "pid", "alive", "exitcode", "start_time", "log" ]:
  139. in_(x, status)
  140. in_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
  141. eq_(status["alive"], True)
  142. eq_(status["exitcode"], None)
  143. # Check that the log got cleared
  144. status = client.get("process/status", { "pid": pid })
  145. nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
  146. # See that it ended properly
  147. status = self.wait_end(client, pid, remove = False)
  148. in_("dummy 27\ndummy 28\ndummy 29\n", status["log"])
  149. eq_(status["exitcode"], 0)
  150. # Remove it
  151. killstatus = client.post("process/remove", { "pid": pid })
  152. eq_(status, killstatus)
  153. eq_(client.get("process/list"), [])
  154. with assert_raises(ClientError) as e:
  155. client.post("process/remove", { "pid": pid })
  156. in_("No such PID", str(e.exception))
  157. def test_client_05_process_terminate(self):
  158. client = HTTPClient(baseurl = testurl, post_json = True)
  159. # Trigger exception in filter
  160. pid = self._run_testfilter(client, -1)
  161. time.sleep(0.5)
  162. status = client.get("process/status", { "pid": pid })
  163. eq_(status["alive"], False)
  164. eq_(status["exitcode"], 1)
  165. in_("Exception: test exception", status["log"])
  166. client.post("process/remove", { "pid": pid })
  167. # Kill a running filter by removing it early
  168. newpid = self._run_testfilter(client, 50)
  169. ne_(newpid, pid)
  170. time.sleep(0.5)
  171. start = time.time()
  172. status = client.post("process/remove", { "pid": newpid })
  173. elapsed = time.time() - start
  174. # Should have died in slightly over 1 second
  175. assert(0.5 < elapsed < 2)
  176. eq_(status["alive"], False)
  177. ne_(status["exitcode"], 0)
  178. # No more
  179. eq_(client.get("process/list"), [])
  180. # Try to remove a running filter that ignored SIGTERM
  181. pid = self._run_testfilter(client, 0)
  182. start = time.time()
  183. status = client.post("process/remove", { "pid": pid })
  184. elapsed = time.time() - start
  185. # Should have died in slightly over 2 seconds
  186. assert(1.5 < elapsed < 3)
  187. eq_(status["alive"], False)
  188. ne_(status["exitcode"], 0)
  189. @unittest.skip("needs a running nilmdb; trainola moved to nilmtools")
  190. def test_client_06_trainola(self):
  191. client = HTTPClient(baseurl = testurl, post_json = True)
  192. data = { "url": "http://bucket.mit.edu/nilmdb",
  193. "dest_stream": "/sharon/prep-a-matches",
  194. "stream": "/sharon/prep-a",
  195. "start": 1366111383280463,
  196. "end": 1366126163457797,
  197. "columns": [ { "name": "P1", "index": 0 },
  198. { "name": "Q1", "index": 1 },
  199. { "name": "P3", "index": 2 } ],
  200. "exemplars": [
  201. { "name": "Boiler Pump ON",
  202. "url": "http://bucket.mit.edu/nilmdb",
  203. "stream": "/sharon/prep-a",
  204. "start": 1366260494269078,
  205. "end": 1366260608185031,
  206. "dest_column": 0,
  207. "columns": [ { "name": "P1", "index": 0 },
  208. { "name": "Q1", "index": 1 }
  209. ]
  210. },
  211. { "name": "Boiler Pump OFF",
  212. "url": "http://bucket.mit.edu/nilmdb",
  213. "stream": "/sharon/prep-a",
  214. "start": 1366260864215764,
  215. "end": 1366260870882998,
  216. "dest_column": 1,
  217. "columns": [ { "name": "P1", "index": 0 },
  218. { "name": "Q1", "index": 1 }
  219. ]
  220. }
  221. ]
  222. }
  223. pid = client.post("run/code", { "code": "import nilmtools.trainola\n" +
  224. "nilmtools.trainola.main()",
  225. "args": [ data ] })
  226. while True:
  227. status = client.get("process/status", { "pid": pid, "clear": 1 })
  228. sys.stdout.write(status["log"])
  229. sys.stdout.flush()
  230. if status["alive"] == False:
  231. break
  232. time.sleep(0.1)
  233. status = client.post("process/remove", { "pid": pid })
  234. os._exit(int(status["exitcode"]))
  235. def test_client_07_run_code(self):
  236. client = HTTPClient(baseurl = testurl, post_json = True)
  237. eq_(client.get("process/list"), [])
  238. def do(code, args, kill):
  239. if args is not None:
  240. pid = client.post("run/code", { "code": code, "args": args } )
  241. else:
  242. pid = client.post("run/code", { "code": code } )
  243. eq_(client.get("process/list"), [pid])
  244. if kill:
  245. return self.wait_kill(client, pid)
  246. return self.wait_end(client, pid)
  247. # basic code snippet
  248. code = textwrap.dedent("""
  249. print('hello')
  250. def foo(arg):
  251. print('world')
  252. """)
  253. status = do(code, [], False)
  254. eq_("hello\n", status["log"])
  255. eq_(status["exitcode"], 0)
  256. # compile error
  257. code = textwrap.dedent("""
  258. def foo(arg:
  259. print('hello')
  260. """)
  261. status = do(code, [], False)
  262. in_("SyntaxError", status["log"])
  263. eq_(status["exitcode"], 1)
  264. # traceback in user code should be formatted nicely
  265. code = textwrap.dedent("""
  266. def foo(arg):
  267. raise Exception(arg)
  268. foo(123)
  269. """)
  270. status = do(code, [], False)
  271. cleaned_log = re.sub('File "[^"]*",', 'File "",', status["log"])
  272. eq_('Traceback (most recent call last):\n' +
  273. ' File "", line 4, in <module>\n' +
  274. ' foo(123)\n' +
  275. ' File "", line 3, in foo\n' +
  276. ' raise Exception(arg)\n' +
  277. 'Exception: 123\n', cleaned_log)
  278. eq_(status["exitcode"], 1)
  279. # argument handling (strings come in as unicode)
  280. code = textwrap.dedent("""
  281. import sys
  282. print(sys.argv[1], sys.argv[2])
  283. sys.exit(0) # also test raising SystemExit
  284. """)
  285. with assert_raises(ClientError) as e:
  286. do(code, ["hello", 123], False)
  287. in_("400 Bad Request", str(e.exception))
  288. status = do(code, ["hello", "123"], False)
  289. eq_(status["log"], "hello 123\n")
  290. eq_(status["exitcode"], 0)
  291. # try killing a long-running process
  292. code = textwrap.dedent("""
  293. import time
  294. print('hello')
  295. time.sleep(60)
  296. print('world')
  297. """)
  298. status = do(code, [], True)
  299. eq_(status["log"], "hello\n")
  300. ne_(status["exitcode"], 0)
  301. # default arguments are empty
  302. code = textwrap.dedent("""
  303. import sys
  304. print('args:', len(sys.argv[1:]))
  305. """)
  306. status = do(code, None, False)
  307. eq_(status["log"], "args: 0\n")
  308. eq_(status["exitcode"], 0)
  309. def test_client_08_bad_types(self):
  310. client = HTTPClient(baseurl = testurl, post_json = True)
  311. with assert_raises(ClientError) as e:
  312. client.post("run/code", { "code": "asdf", "args": "qwer" })
  313. in_("must be a list", str(e.exception))
  314. with assert_raises(ClientError) as e:
  315. client.post("run/command", { "argv": "asdf" })
  316. in_("must be a list", str(e.exception))
  317. def test_client_09_info(self):
  318. client = HTTPClient(baseurl = testurl, post_json = True)
  319. # start some processes
  320. a = client.post("run/command", { "argv": ["sleep","60"] } )
  321. b = client.post("run/command", { "argv": ["sh","-c","sleep 2;true"] } )
  322. c = client.post("run/command", { "argv": [
  323. "sh","-c","dd if=/dev/zero of=/dev/null;true"] } )
  324. d = client.post("run/command", { "argv": [
  325. "dd", "if=/dev/zero", "of=/dev/null" ] } )
  326. info = client.get("process/info")
  327. eq_(info["pids"][a]["procs"], 1)
  328. eq_(info["pids"][b]["procs"], 2)
  329. eq_(info["pids"][c]["procs"], 2)
  330. eq_(info["pids"][d]["procs"], 1)
  331. eq_(info["total"]["procs"], 6)
  332. lt_(info["pids"][a]["cpu_percent"], 50)
  333. lt_(20, info["pids"][c]["cpu_percent"])
  334. lt_(80, info["system"]["cpu_percent"])
  335. for x in range(10):
  336. time.sleep(1)
  337. info = client.get("process/info")
  338. if info["pids"][b]["procs"] != 2:
  339. break
  340. else:
  341. raise Exception("process B didn't die: " + str(info["pids"][b]))
  342. # kill all processes
  343. for pid in client.get("process/list"):
  344. client.post("process/remove", { "pid": pid })
  345. def test_client_10_unicode(self):
  346. client = HTTPClient(baseurl = testurl, post_json = True)
  347. eq_(client.get("process/list"), [])
  348. def verify(cmd, result):
  349. pid = client.post("run/command", { "argv": [
  350. "/bin/bash", "-c", cmd ] })
  351. eq_(client.get("process/list"), [pid])
  352. status = self.wait_end(client, pid)
  353. eq_(result, status["log"])
  354. # Unicode should work
  355. verify("echo -n hello", "hello")
  356. verify("echo -n ☠", "☠")
  357. verify("echo -ne \\\\xe2\\\\x98\\\\xa0", "☠")
  358. # Programs that spit out invalid UTF-8 should get replacement
  359. # markers
  360. verify("echo -ne \\\\xae", "\ufffd")
  361. def test_client_11_atexit(self):
  362. # Leave a directory and running process behind, for the atexit
  363. # handler to clean up. Here we trigger the atexit manually,
  364. # since it's hard to trigger it as part of the test suite.
  365. client = HTTPClient(baseurl = testurl, post_json = True)
  366. code = textwrap.dedent("""
  367. import time
  368. time.sleep(10)
  369. """)
  370. client.post("run/code", { "code": code, "args": [ "hello"] })
  371. # Trigger atexit function
  372. test_server._manager._atexit()
  373. # Ensure no processes exit
  374. eq_(client.get("process/list"), [])