You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

410 lines
14 KiB

  1. # -*- coding: utf-8 -*-
  2. import nilmrun.server
  3. from nilmdb.client.httpclient import HTTPClient, ClientError, ServerError
  4. from nilmdb.utils.printf import *
  5. from nose.plugins.skip import SkipTest
  6. from nose.tools import *
  7. from nose.tools import assert_raises
  8. import itertools
  9. import distutils.version
  10. import os
  11. import sys
  12. import threading
  13. import cStringIO
  14. import simplejson as json
  15. import unittest
  16. import warnings
  17. import time
  18. import re
  19. import urllib2
  20. from urllib2 import urlopen, HTTPError
  21. import requests
  22. import pprint
  23. import textwrap
  24. from testutil.helpers import *
  25. testurl = "http://localhost:32181/"
  26. #testurl = "http://bucket.mit.edu/nilmrun/"
  27. def setup_module():
  28. global test_server
  29. # Start web app on a custom port
  30. test_server = nilmrun.server.Server(host = "127.0.0.1",
  31. port = 32181,
  32. force_traceback = True)
  33. test_server.start(blocking = False)
  34. def teardown_module():
  35. global test_server
  36. # Close web app
  37. test_server.stop()
  38. class TestClient(object):
  39. def wait_kill(self, client, pid, timeout = 1):
  40. time.sleep(timeout)
  41. status = client.get("process/status", { "pid": pid })
  42. if not status["alive"]:
  43. raise AssertionError("died before we could kill it")
  44. status = client.post("process/remove", { "pid": pid })
  45. if status["alive"]:
  46. raise AssertionError("didn't get killed")
  47. return status
  48. def wait_end(self, client, pid, timeout = 5, remove = True):
  49. start = time.time()
  50. status = None
  51. while (time.time() - start) < timeout:
  52. status = client.get("process/status", { "pid": pid })
  53. if status["alive"] == False:
  54. break
  55. time.sleep(0.1)
  56. else:
  57. raise AssertionError("process " + str(pid) + " didn't die in " +
  58. str(timeout) + " seconds: " + repr(status))
  59. if remove:
  60. status = client.post("process/remove", { "pid": pid })
  61. return status
  62. def test_client_01_basic(self):
  63. client = HTTPClient(baseurl = testurl)
  64. version = client.get("version")
  65. eq_(distutils.version.LooseVersion(version),
  66. distutils.version.LooseVersion(nilmrun.__version__))
  67. in_("This is NilmRun", client.get(""))
  68. with assert_raises(ClientError):
  69. client.get("favicon.ico")
  70. def test_client_02_manager(self):
  71. client = HTTPClient(baseurl = testurl)
  72. eq_(client.get("process/list"), [])
  73. with assert_raises(ClientError) as e:
  74. client.get("process/status", { "pid": 12345 })
  75. in_("No such PID", str(e.exception))
  76. with assert_raises(ClientError):
  77. client.get("process/remove", { "pid": 12345 })
  78. in_("No such PID", str(e.exception))
  79. def test_client_03_run_command(self):
  80. client = HTTPClient(baseurl = testurl, post_json = True)
  81. eq_(client.get("process/list"), [])
  82. def do(argv, kill):
  83. pid = client.post("run/command", { "argv": argv } )
  84. eq_(client.get("process/list"), [pid])
  85. if kill:
  86. return self.wait_kill(client, pid)
  87. return self.wait_end(client, pid)
  88. # Simple command
  89. status = do(["pwd"], False)
  90. eq_(status["exitcode"], 0)
  91. eq_("/tmp\n", status["log"])
  92. # Command with args
  93. status = do(["expr", "1", "+", "2"], False)
  94. eq_(status["exitcode"], 0)
  95. eq_("3\n", status["log"])
  96. # Missing command
  97. with assert_raises(ClientError) as e:
  98. do(["/no-such-command-blah-blah"], False)
  99. in_("No such file or directory", str(e.exception))
  100. # Kill a slow command
  101. status = do(["sleep", "60"], True)
  102. ne_(status["exitcode"], 0)
  103. def _run_testfilter(self, client, args):
  104. code = textwrap.dedent("""
  105. import nilmrun.testfilter
  106. import simplejson as json
  107. import sys
  108. nilmrun.testfilter.test(json.loads(sys.argv[1]))
  109. """)
  110. jsonargs = json.dumps(args)
  111. return client.post("run/code", { "code": code, "args": [ jsonargs ] })
  112. def test_client_04_process_basic(self):
  113. client = HTTPClient(baseurl = testurl, post_json = True)
  114. # start dummy filter
  115. pid = self._run_testfilter(client, 30)
  116. eq_(client.get("process/list"), [pid])
  117. time.sleep(1)
  118. # Verify that status looks OK
  119. status = client.get("process/status", { "pid": pid, "clear": True })
  120. for x in [ "pid", "alive", "exitcode", "start_time", "log" ]:
  121. in_(x, status)
  122. in_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
  123. eq_(status["alive"], True)
  124. eq_(status["exitcode"], None)
  125. # Check that the log got cleared
  126. status = client.get("process/status", { "pid": pid })
  127. nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
  128. # See that it ended properly
  129. status = self.wait_end(client, pid, remove = False)
  130. in_("dummy 27\ndummy 28\ndummy 29\n", status["log"])
  131. eq_(status["exitcode"], 0)
  132. # Remove it
  133. killstatus = client.post("process/remove", { "pid": pid })
  134. eq_(status, killstatus)
  135. eq_(client.get("process/list"), [])
  136. with assert_raises(ClientError) as e:
  137. client.post("process/remove", { "pid": pid })
  138. in_("No such PID", str(e.exception))
  139. def test_client_05_process_terminate(self):
  140. client = HTTPClient(baseurl = testurl, post_json = True)
  141. # Trigger exception in filter
  142. pid = self._run_testfilter(client, -1)
  143. time.sleep(0.5)
  144. status = client.get("process/status", { "pid": pid })
  145. eq_(status["alive"], False)
  146. eq_(status["exitcode"], 1)
  147. in_("Exception: test exception", status["log"])
  148. client.post("process/remove", { "pid": pid })
  149. # Kill a running filter by removing it early
  150. newpid = self._run_testfilter(client, 50)
  151. ne_(newpid, pid)
  152. time.sleep(0.5)
  153. start = time.time()
  154. status = client.post("process/remove", { "pid": newpid })
  155. elapsed = time.time() - start
  156. # Should have died in slightly over 1 second
  157. assert(0.5 < elapsed < 2)
  158. eq_(status["alive"], False)
  159. ne_(status["exitcode"], 0)
  160. # No more
  161. eq_(client.get("process/list"), [])
  162. # Try to remove a running filter that ignored SIGTERM
  163. pid = self._run_testfilter(client, 0)
  164. start = time.time()
  165. status = client.post("process/remove", { "pid": pid })
  166. elapsed = time.time() - start
  167. # Should have died in slightly over 2 seconds
  168. assert(1.5 < elapsed < 3)
  169. eq_(status["alive"], False)
  170. ne_(status["exitcode"], 0)
  171. @unittest.skip("needs a running nilmdb; trainola moved to nilmtools")
  172. def test_client_06_trainola(self):
  173. client = HTTPClient(baseurl = testurl, post_json = True)
  174. data = { "url": "http://bucket.mit.edu/nilmdb",
  175. "dest_stream": "/sharon/prep-a-matches",
  176. "stream": "/sharon/prep-a",
  177. "start": 1366111383280463,
  178. "end": 1366126163457797,
  179. "columns": [ { "name": "P1", "index": 0 },
  180. { "name": "Q1", "index": 1 },
  181. { "name": "P3", "index": 2 } ],
  182. "exemplars": [
  183. { "name": "Boiler Pump ON",
  184. "url": "http://bucket.mit.edu/nilmdb",
  185. "stream": "/sharon/prep-a",
  186. "start": 1366260494269078,
  187. "end": 1366260608185031,
  188. "dest_column": 0,
  189. "columns": [ { "name": "P1", "index": 0 },
  190. { "name": "Q1", "index": 1 }
  191. ]
  192. },
  193. { "name": "Boiler Pump OFF",
  194. "url": "http://bucket.mit.edu/nilmdb",
  195. "stream": "/sharon/prep-a",
  196. "start": 1366260864215764,
  197. "end": 1366260870882998,
  198. "dest_column": 1,
  199. "columns": [ { "name": "P1", "index": 0 },
  200. { "name": "Q1", "index": 1 }
  201. ]
  202. }
  203. ]
  204. }
  205. pid = client.post("run/code", { "code": "import nilmtools.trainola\n" +
  206. "nilmtools.trainola.main()",
  207. "args": [ data ] })
  208. while True:
  209. status = client.get("process/status", { "pid": pid, "clear": 1 })
  210. sys.stdout.write(status["log"])
  211. sys.stdout.flush()
  212. if status["alive"] == False:
  213. break
  214. time.sleep(0.1)
  215. status = client.post("process/remove", { "pid": pid })
  216. os._exit(int(status["exitcode"]))
  217. def test_client_07_run_code(self):
  218. client = HTTPClient(baseurl = testurl, post_json = True)
  219. eq_(client.get("process/list"), [])
  220. def do(code, args, kill):
  221. if args is not None:
  222. pid = client.post("run/code", { "code": code, "args": args } )
  223. else:
  224. pid = client.post("run/code", { "code": code } )
  225. eq_(client.get("process/list"), [pid])
  226. if kill:
  227. return self.wait_kill(client, pid)
  228. return self.wait_end(client, pid)
  229. # basic code snippet
  230. code = textwrap.dedent("""
  231. print 'hello'
  232. def foo(arg):
  233. print 'world'
  234. """)
  235. status = do(code, [], False)
  236. eq_("hello\n", status["log"])
  237. eq_(status["exitcode"], 0)
  238. # compile error
  239. code = textwrap.dedent("""
  240. def foo(arg:
  241. print 'hello'
  242. """)
  243. status = do(code, [], False)
  244. in_("SyntaxError", status["log"])
  245. eq_(status["exitcode"], 1)
  246. # traceback in user code should be formatted nicely
  247. code = textwrap.dedent("""
  248. def foo(arg):
  249. raise Exception(arg)
  250. foo(123)
  251. """)
  252. status = do(code, [], False)
  253. cleaned_log = re.sub('File "[^"]*",', 'File "",', status["log"])
  254. eq_('Traceback (most recent call last):\n' +
  255. ' File "", line 4, in <module>\n' +
  256. ' foo(123)\n' +
  257. ' File "", line 3, in foo\n' +
  258. ' raise Exception(arg)\n' +
  259. 'Exception: 123\n', cleaned_log)
  260. eq_(status["exitcode"], 1)
  261. # argument handling (strings come in as unicode)
  262. code = textwrap.dedent("""
  263. import sys
  264. print sys.argv[1].encode('ascii'), sys.argv[2]
  265. sys.exit(0) # also test raising SystemExit
  266. """)
  267. with assert_raises(ClientError) as e:
  268. do(code, ["hello", 123], False)
  269. in_("400 Bad Request", str(e.exception))
  270. status = do(code, ["hello", "123"], False)
  271. eq_(status["log"], "hello 123\n")
  272. eq_(status["exitcode"], 0)
  273. # try killing a long-running process
  274. code = textwrap.dedent("""
  275. import time
  276. print 'hello'
  277. time.sleep(60)
  278. print 'world'
  279. """)
  280. status = do(code, [], True)
  281. eq_(status["log"], "hello\n")
  282. ne_(status["exitcode"], 0)
  283. # default arguments are empty
  284. code = textwrap.dedent("""
  285. import sys
  286. print 'args:', len(sys.argv[1:])
  287. """)
  288. status = do(code, None, False)
  289. eq_(status["log"], "args: 0\n")
  290. eq_(status["exitcode"], 0)
  291. def test_client_08_bad_types(self):
  292. client = HTTPClient(baseurl = testurl, post_json = True)
  293. with assert_raises(ClientError) as e:
  294. client.post("run/code", { "code": "asdf", "args": "qwer" })
  295. in_("must be a list", str(e.exception))
  296. with assert_raises(ClientError) as e:
  297. client.post("run/command", { "argv": "asdf" })
  298. in_("must be a list", str(e.exception))
  299. def test_client_09_info(self):
  300. client = HTTPClient(baseurl = testurl, post_json = True)
  301. # start some processes
  302. a = client.post("run/command", { "argv": ["sleep","60"] } )
  303. b = client.post("run/command", { "argv": ["sh","-c","sleep 2;true"] } )
  304. c = client.post("run/command", { "argv": ["sh","-c","burnP5;true"] } )
  305. d = client.post("run/command", { "argv": ["burnP5" ] } )
  306. info = client.get("process/info")
  307. eq_(info["pids"][a]["procs"], 1)
  308. eq_(info["pids"][b]["procs"], 2)
  309. eq_(info["pids"][c]["procs"], 2)
  310. eq_(info["pids"][d]["procs"], 1)
  311. eq_(info["total"]["procs"], 6)
  312. lt_(info["pids"][a]["cpu_percent"], 50)
  313. lt_(20, info["pids"][c]["cpu_percent"])
  314. lt_(80, info["system"]["cpu_percent"])
  315. time.sleep(2)
  316. info = client.get("process/info")
  317. eq_(info["pids"][b]["procs"], 0)
  318. # kill all processes
  319. for pid in client.get("process/list"):
  320. client.post("process/remove", { "pid": pid })
  321. def test_client_10_unicode(self):
  322. client = HTTPClient(baseurl = testurl, post_json = True)
  323. eq_(client.get("process/list"), [])
  324. def verify(cmd, result):
  325. pid = client.post("run/command", { "argv": [ "sh", "-c", cmd ] })
  326. eq_(client.get("process/list"), [pid])
  327. status = self.wait_end(client, pid)
  328. eq_(result, status["log"])
  329. # Unicode should work
  330. verify("echo -n hello", "hello")
  331. verify(u"echo -n ☠", u"☠")
  332. verify("echo -ne \\\\xe2\\\\x98\\\\xa0", u"☠")
  333. # Programs that spit out invalid UTF-8 should get replacement
  334. # markers
  335. verify("echo -ne \\\\xae", u"\ufffd")
  336. def test_client_11_atexit(self):
  337. # Leave a directory and running process behind, for the atexit
  338. # handler to clean up. Here we trigger the atexit manually,
  339. # since it's hard to trigger it as part of the test suite.
  340. client = HTTPClient(baseurl = testurl, post_json = True)
  341. code = textwrap.dedent("""
  342. import time
  343. time.sleep(10)
  344. """)
  345. client.post("run/code", { "code": code, "args": [ "hello"] })
  346. # Trigger atexit function
  347. test_server._manager._atexit()
  348. # Ensure no processes exit
  349. eq_(client.get("process/list"), [])