You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

607 lines
24 KiB

  1. # -*- coding: utf-8 -*-
  2. import nilmdb
  3. from nilmdb.utils.printf import *
  4. from nilmdb.utils import timestamper
  5. from nilmdb.client import ClientError, ServerError
  6. from nilmdb.utils import datetime_tz
  7. from nose.plugins.skip import SkipTest
  8. from nose.tools import *
  9. from nose.tools import assert_raises
  10. import itertools
  11. import distutils.version
  12. import os
  13. import sys
  14. import threading
  15. import cStringIO
  16. import simplejson as json
  17. import unittest
  18. import warnings
  19. import resource
  20. import time
  21. import re
  22. from testutil.helpers import *
  23. testdb = "tests/client-testdb"
  24. testurl = "http://localhost:32180/"
  25. def setup_module():
  26. global test_server, test_db
  27. # Clear out DB
  28. recursive_unlink(testdb)
  29. # Start web app on a custom port
  30. test_db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(testdb, sync = False)
  31. test_server = nilmdb.Server(test_db, host = "127.0.0.1",
  32. port = 32180, stoppable = False,
  33. fast_shutdown = True,
  34. force_traceback = False)
  35. test_server.start(blocking = False)
  36. def teardown_module():
  37. global test_server, test_db
  38. # Close web app
  39. test_server.stop()
  40. test_db.close()
  41. class TestClient(object):
  42. def test_client_01_basic(self):
  43. # Test a fake host
  44. client = nilmdb.Client(url = "http://localhost:1/")
  45. with assert_raises(nilmdb.client.ServerError):
  46. client.version()
  47. client.close()
  48. # Then a fake URL on a real host
  49. client = nilmdb.Client(url = "http://localhost:32180/fake/")
  50. with assert_raises(nilmdb.client.ClientError):
  51. client.version()
  52. client.close()
  53. # Now a real URL with no http:// prefix
  54. client = nilmdb.Client(url = "localhost:32180")
  55. version = client.version()
  56. client.close()
  57. # Now use the real URL
  58. client = nilmdb.Client(url = testurl)
  59. version = client.version()
  60. eq_(distutils.version.LooseVersion(version),
  61. distutils.version.LooseVersion(test_server.version))
  62. # Bad URLs should give 404, not 500
  63. with assert_raises(ClientError):
  64. client.http.get("/stream/create")
  65. client.close()
  66. def test_client_02_createlist(self):
  67. # Basic stream tests, like those in test_nilmdb:test_stream
  68. client = nilmdb.Client(url = testurl)
  69. # Database starts empty
  70. eq_(client.stream_list(), [])
  71. # Bad path
  72. with assert_raises(ClientError):
  73. client.stream_create("foo/bar/baz", "PrepData")
  74. with assert_raises(ClientError):
  75. client.stream_create("/foo", "PrepData")
  76. # Bad layout type
  77. with assert_raises(ClientError):
  78. client.stream_create("/newton/prep", "NoSuchLayout")
  79. # Bad method types
  80. with assert_raises(ClientError):
  81. client.http.put("/stream/list","")
  82. # Try a bunch of times to make sure the request body is getting consumed
  83. for x in range(10):
  84. with assert_raises(ClientError):
  85. client.http.post("/stream/list")
  86. client = nilmdb.Client(url = testurl)
  87. # Create three streams
  88. client.stream_create("/newton/prep", "PrepData")
  89. client.stream_create("/newton/raw", "RawData")
  90. client.stream_create("/newton/zzz/rawnotch", "RawNotchedData")
  91. # Verify we got 3 streams
  92. eq_(client.stream_list(), [ ["/newton/prep", "PrepData"],
  93. ["/newton/raw", "RawData"],
  94. ["/newton/zzz/rawnotch", "RawNotchedData"]
  95. ])
  96. # Match just one type or one path
  97. eq_(client.stream_list(layout="RawData"),
  98. [ ["/newton/raw", "RawData"] ])
  99. eq_(client.stream_list(path="/newton/raw"),
  100. [ ["/newton/raw", "RawData"] ])
  101. # Try messing with resource limits to trigger errors and get
  102. # more coverage. Here, make it so we can only create files 1
  103. # byte in size, which will trigger an IOError in the server when
  104. # we create a table.
  105. limit = resource.getrlimit(resource.RLIMIT_FSIZE)
  106. resource.setrlimit(resource.RLIMIT_FSIZE, (1, limit[1]))
  107. with assert_raises(ServerError) as e:
  108. client.stream_create("/newton/hello", "RawData")
  109. resource.setrlimit(resource.RLIMIT_FSIZE, limit)
  110. client.close()
  111. def test_client_03_metadata(self):
  112. client = nilmdb.Client(url = testurl)
  113. # Set / get metadata
  114. eq_(client.stream_get_metadata("/newton/prep"), {})
  115. eq_(client.stream_get_metadata("/newton/raw"), {})
  116. meta1 = { "description": "The Data",
  117. "v_scale": "1.234" }
  118. meta2 = { "description": "The Data" }
  119. meta3 = { "v_scale": "1.234" }
  120. client.stream_set_metadata("/newton/prep", meta1)
  121. client.stream_update_metadata("/newton/prep", {})
  122. client.stream_update_metadata("/newton/raw", meta2)
  123. client.stream_update_metadata("/newton/raw", meta3)
  124. eq_(client.stream_get_metadata("/newton/prep"), meta1)
  125. eq_(client.stream_get_metadata("/newton/raw"), meta1)
  126. eq_(client.stream_get_metadata("/newton/raw",
  127. [ "description" ] ), meta2)
  128. eq_(client.stream_get_metadata("/newton/raw",
  129. [ "description", "v_scale" ] ), meta1)
  130. # missing key
  131. eq_(client.stream_get_metadata("/newton/raw", "descr"),
  132. { "descr": None })
  133. eq_(client.stream_get_metadata("/newton/raw", [ "descr" ]),
  134. { "descr": None })
  135. # test wrong types (list instead of dict)
  136. with assert_raises(ClientError):
  137. client.stream_set_metadata("/newton/prep", [1,2,3])
  138. with assert_raises(ClientError):
  139. client.stream_update_metadata("/newton/prep", [1,2,3])
  140. client.close()
  141. def test_client_04_insert(self):
  142. client = nilmdb.Client(url = testurl)
  143. # Limit _max_data to 1 MB, since our test file is 1.5 MB
  144. old_max_data = nilmdb.client.client.StreamInserter._max_data
  145. nilmdb.client.client.StreamInserter._max_data = 1 * 1024 * 1024
  146. datetime_tz.localtz_set("America/New_York")
  147. testfile = "tests/data/prep-20120323T1000"
  148. start = datetime_tz.datetime_tz.smartparse("20120323T1000")
  149. start = start.totimestamp()
  150. rate = 120
  151. # First try a nonexistent path
  152. data = timestamper.TimestamperRate(testfile, start, 120)
  153. with assert_raises(ClientError) as e:
  154. result = client.stream_insert("/newton/no-such-path", data)
  155. in_("404 Not Found", str(e.exception))
  156. # Now try reversed timestamps
  157. data = timestamper.TimestamperRate(testfile, start, 120)
  158. data = reversed(list(data))
  159. with assert_raises(ClientError) as e:
  160. result = client.stream_insert("/newton/prep", data)
  161. in_("400 Bad Request", str(e.exception))
  162. in_("timestamp is not monotonically increasing", str(e.exception))
  163. # Now try empty data (no server request made)
  164. empty = cStringIO.StringIO("")
  165. data = timestamper.TimestamperRate(empty, start, 120)
  166. result = client.stream_insert("/newton/prep", data)
  167. eq_(result, None)
  168. # It's OK to insert an empty interval
  169. client.http.put("stream/insert", "", { "path": "/newton/prep",
  170. "start": 1, "end": 2 })
  171. eq_(list(client.stream_intervals("/newton/prep")), [[1, 2]])
  172. client.stream_remove("/newton/prep")
  173. eq_(list(client.stream_intervals("/newton/prep")), [])
  174. # Timestamps can be negative too
  175. client.http.put("stream/insert", "", { "path": "/newton/prep",
  176. "start": -2, "end": -1 })
  177. eq_(list(client.stream_intervals("/newton/prep")), [[-2, -1]])
  178. client.stream_remove("/newton/prep")
  179. eq_(list(client.stream_intervals("/newton/prep")), [])
  180. # Intervals that end at zero shouldn't be any different
  181. client.http.put("stream/insert", "", { "path": "/newton/prep",
  182. "start": -1, "end": 0 })
  183. eq_(list(client.stream_intervals("/newton/prep")), [[-1, 0]])
  184. client.stream_remove("/newton/prep")
  185. eq_(list(client.stream_intervals("/newton/prep")), [])
  186. # Try forcing a server request with equal start and end
  187. with assert_raises(ClientError) as e:
  188. client.http.put("stream/insert", "", { "path": "/newton/prep",
  189. "start": 0, "end": 0 })
  190. in_("400 Bad Request", str(e.exception))
  191. in_("start must precede end", str(e.exception))
  192. # Specify start/end (starts too late)
  193. data = timestamper.TimestamperRate(testfile, start, 120)
  194. with assert_raises(ClientError) as e:
  195. result = client.stream_insert("/newton/prep", data,
  196. start + 5, start + 120)
  197. in_("400 Bad Request", str(e.exception))
  198. in_("Data timestamp 1332511200.0 < start time 1332511205.0",
  199. str(e.exception))
  200. # Specify start/end (ends too early)
  201. data = timestamper.TimestamperRate(testfile, start, 120)
  202. with assert_raises(ClientError) as e:
  203. result = client.stream_insert("/newton/prep", data,
  204. start, start + 1)
  205. in_("400 Bad Request", str(e.exception))
  206. # Client chunks the input, so the exact timestamp here might change
  207. # if the chunk positions change.
  208. assert(re.search("Data timestamp 13325[0-9]+\.[0-9]+ "
  209. ">= end time 1332511201.0", str(e.exception))
  210. is not None)
  211. # Now do the real load
  212. data = timestamper.TimestamperRate(testfile, start, 120)
  213. result = client.stream_insert("/newton/prep", data,
  214. start, start + 119.999777)
  215. # Verify the intervals. Should be just one, even if the data
  216. # was inserted in chunks, due to nilmdb interval concatenation.
  217. intervals = list(client.stream_intervals("/newton/prep"))
  218. eq_(intervals, [[start, start + 119.999777]])
  219. # Try some overlapping data -- just insert it again
  220. data = timestamper.TimestamperRate(testfile, start, 120)
  221. with assert_raises(ClientError) as e:
  222. result = client.stream_insert("/newton/prep", data)
  223. in_("400 Bad Request", str(e.exception))
  224. in_("verlap", str(e.exception))
  225. nilmdb.client.client.StreamInserter._max_data = old_max_data
  226. client.close()
  227. def test_client_05_extractremove(self):
  228. # Misc tests for extract and remove. Most of them are in test_cmdline.
  229. client = nilmdb.Client(url = testurl)
  230. for x in client.stream_extract("/newton/prep", 999123, 999124):
  231. raise AssertionError("shouldn't be any data for this request")
  232. with assert_raises(ClientError) as e:
  233. client.stream_remove("/newton/prep", 123, 120)
  234. # Test count
  235. eq_(client.stream_count("/newton/prep"), 14400)
  236. client.close()
  237. def test_client_06_generators(self):
  238. # A lot of the client functionality is already tested by test_cmdline,
  239. # but this gets a bit more coverage that cmdline misses.
  240. client = nilmdb.Client(url = testurl)
  241. # Trigger a client error in generator
  242. start = datetime_tz.datetime_tz.smartparse("20120323T2000")
  243. end = datetime_tz.datetime_tz.smartparse("20120323T1000")
  244. for function in [ client.stream_intervals, client.stream_extract ]:
  245. with assert_raises(ClientError) as e:
  246. function("/newton/prep",
  247. start.totimestamp(),
  248. end.totimestamp()).next()
  249. in_("400 Bad Request", str(e.exception))
  250. in_("start must precede end", str(e.exception))
  251. # Trigger a curl error in generator
  252. with assert_raises(ServerError) as e:
  253. client.http.get_gen("http://nosuchurl/").next()
  254. # Trigger a curl error in generator
  255. with assert_raises(ServerError) as e:
  256. client.http.get_gen("http://nosuchurl/").next()
  257. # Check 404 for missing streams
  258. for function in [ client.stream_intervals, client.stream_extract ]:
  259. with assert_raises(ClientError) as e:
  260. function("/no/such/stream").next()
  261. in_("404 Not Found", str(e.exception))
  262. in_("No such stream", str(e.exception))
  263. client.close()
  264. def test_client_07_headers(self):
  265. # Make sure that /stream/intervals and /stream/extract
  266. # properly return streaming, chunked, text/plain response.
  267. # Pokes around in client.http internals a bit to look at the
  268. # response headers.
  269. client = nilmdb.Client(url = testurl)
  270. http = client.http
  271. # Use a warning rather than returning a test failure for the
  272. # transfer-encoding, so that we can still disable chunked
  273. # responses for debugging.
  274. def headers():
  275. h = ""
  276. for (k, v) in http._last_response.headers.items():
  277. h += k + ": " + v + "\n"
  278. return h.lower()
  279. # Intervals
  280. x = http.get("stream/intervals", { "path": "/newton/prep" })
  281. if "transfer-encoding: chunked" not in headers():
  282. warnings.warn("Non-chunked HTTP response for /stream/intervals")
  283. if "content-type: application/x-json-stream" not in headers():
  284. raise AssertionError("/stream/intervals content type "
  285. "is not application/x-json-stream:\n" +
  286. headers())
  287. # Extract
  288. x = http.get("stream/extract",
  289. { "path": "/newton/prep",
  290. "start": "123",
  291. "end": "124" })
  292. if "transfer-encoding: chunked" not in headers():
  293. warnings.warn("Non-chunked HTTP response for /stream/extract")
  294. if "content-type: text/plain;charset=utf-8" not in headers():
  295. raise AssertionError("/stream/extract is not text/plain:\n" +
  296. headers())
  297. # Make sure Access-Control-Allow-Origin gets set
  298. if "access-control-allow-origin: " not in headers():
  299. raise AssertionError("No Access-Control-Allow-Origin (CORS) "
  300. "header in /stream/extract response:\n" +
  301. headers())
  302. client.close()
  303. def test_client_08_unicode(self):
  304. # Basic Unicode tests
  305. client = nilmdb.Client(url = testurl)
  306. # Delete streams that exist
  307. for stream in client.stream_list():
  308. client.stream_destroy(stream[0])
  309. # Database is empty
  310. eq_(client.stream_list(), [])
  311. # Create Unicode stream, match it
  312. raw = [ u"/düsseldorf/raw", u"uint16_6" ]
  313. prep = [ u"/düsseldorf/prep", u"uint16_6" ]
  314. client.stream_create(*raw)
  315. eq_(client.stream_list(), [raw])
  316. eq_(client.stream_list(layout=raw[1]), [raw])
  317. eq_(client.stream_list(path=raw[0]), [raw])
  318. client.stream_create(*prep)
  319. eq_(client.stream_list(), [prep, raw])
  320. # Set / get metadata with Unicode keys and values
  321. eq_(client.stream_get_metadata(raw[0]), {})
  322. eq_(client.stream_get_metadata(prep[0]), {})
  323. meta1 = { u"alpha": u"α",
  324. u"β": u"beta" }
  325. meta2 = { u"alpha": u"α" }
  326. meta3 = { u"β": u"beta" }
  327. client.stream_set_metadata(prep[0], meta1)
  328. client.stream_update_metadata(prep[0], {})
  329. client.stream_update_metadata(raw[0], meta2)
  330. client.stream_update_metadata(raw[0], meta3)
  331. eq_(client.stream_get_metadata(prep[0]), meta1)
  332. eq_(client.stream_get_metadata(raw[0]), meta1)
  333. eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
  334. eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
  335. client.close()
  336. def test_client_09_closing(self):
  337. # Make sure we actually close sockets correctly. New
  338. # connections will block for a while if they're not, since the
  339. # server will stop accepting new connections.
  340. for test in [1, 2]:
  341. start = time.time()
  342. for i in range(50):
  343. if time.time() - start > 15:
  344. raise AssertionError("Connections seem to be blocking... "
  345. "probably not closing properly.")
  346. if test == 1:
  347. # explicit close
  348. client = nilmdb.Client(url = testurl)
  349. with assert_raises(ClientError) as e:
  350. client.stream_remove("/newton/prep", 123, 120)
  351. client.close() # remove this to see the failure
  352. elif test == 2:
  353. # use the context manager
  354. with nilmdb.Client(url = testurl) as c:
  355. with assert_raises(ClientError) as e:
  356. c.stream_remove("/newton/prep", 123, 120)
  357. def test_client_10_context(self):
  358. # Test using the client's stream insertion context manager to
  359. # insert data.
  360. client = nilmdb.Client(testurl)
  361. client.stream_create("/context/test", "uint16_1")
  362. with client.stream_insert_context("/context/test") as ctx:
  363. # override _max_data to trigger frequent server updates
  364. ctx._max_data = 15
  365. with assert_raises(ValueError):
  366. ctx.insert_line("100 1")
  367. ctx.insert_line("100 1\n")
  368. ctx.insert_iter([ "101 1\n",
  369. "102 1\n",
  370. "103 1\n" ])
  371. ctx.insert_line("104 1\n")
  372. ctx.insert_line("105 1\n")
  373. ctx.finalize()
  374. ctx.insert_line("106 1\n")
  375. ctx.update_end(106.5)
  376. ctx.finalize()
  377. ctx.update_start(106.8)
  378. ctx.insert_line("107 1\n")
  379. ctx.insert_line("108 1\n")
  380. ctx.insert_line("109 1\n")
  381. ctx.insert_line("110 1\n")
  382. ctx.insert_line("111 1\n")
  383. ctx.update_end(113)
  384. ctx.insert_line("112 1\n")
  385. ctx.update_end(114)
  386. ctx.insert_line("113 1\n")
  387. ctx.update_end(115)
  388. ctx.insert_line("114 1\n")
  389. ctx.finalize()
  390. with assert_raises(ClientError):
  391. with client.stream_insert_context("/context/test", 100, 200) as ctx:
  392. ctx.insert_line("115 1\n")
  393. with assert_raises(ClientError):
  394. with client.stream_insert_context("/context/test", 200, 300) as ctx:
  395. ctx.insert_line("115 1\n")
  396. with client.stream_insert_context("/context/test", 200, 300) as ctx:
  397. # make sure our override wasn't permanent
  398. ne_(ctx._max_data, 15)
  399. ctx.insert_line("225 1\n")
  400. ctx.finalize()
  401. eq_(list(client.stream_intervals("/context/test")),
  402. [ [ 100, 105.000001 ],
  403. [ 106, 106.5 ],
  404. [ 106.8, 115 ],
  405. [ 200, 300 ] ])
  406. client.stream_destroy("/context/test")
  407. client.close()
  408. def test_client_11_emptyintervals(self):
  409. # Empty intervals are ok! If recording detection events
  410. # by inserting rows into the database, we want to be able to
  411. # have an interval where no events occurred. Test them here.
  412. client = nilmdb.Client(testurl)
  413. client.stream_create("/empty/test", "uint16_1")
  414. def info():
  415. result = []
  416. for interval in list(client.stream_intervals("/empty/test")):
  417. result.append((client.stream_count("/empty/test", *interval),
  418. interval))
  419. return result
  420. eq_(info(), [])
  421. # Insert a region with just a few points
  422. with client.stream_insert_context("/empty/test") as ctx:
  423. ctx.update_start(100)
  424. ctx.insert_line("140 1\n")
  425. ctx.insert_line("150 1\n")
  426. ctx.insert_line("160 1\n")
  427. ctx.update_end(200)
  428. ctx.finalize()
  429. eq_(info(), [(3, [100, 200])])
  430. # Delete chunk, which will leave one data point and two intervals
  431. client.stream_remove("/empty/test", 145, 175)
  432. eq_(info(), [(1, [100, 145]),
  433. (0, [175, 200])])
  434. # Try also creating a completely empty interval from scratch,
  435. # in a few different ways.
  436. client.stream_insert_block("/empty/test", "", 300, 350)
  437. client.stream_insert("/empty/test", [], 400, 450)
  438. with client.stream_insert_context("/empty/test", 500, 550):
  439. pass
  440. # If enough timestamps aren't provided, empty streams won't be created.
  441. client.stream_insert("/empty/test", [])
  442. with client.stream_insert_context("/empty/test"):
  443. pass
  444. client.stream_insert("/empty/test", [], start = 600)
  445. with client.stream_insert_context("/empty/test", start = 700):
  446. pass
  447. client.stream_insert("/empty/test", [], end = 850)
  448. with client.stream_insert_context("/empty/test", end = 950):
  449. pass
  450. # Try various things that might cause problems
  451. with client.stream_insert_context("/empty/test", 1000, 1050):
  452. ctx.finalize() # inserts [1000, 1050]
  453. ctx.finalize() # nothing
  454. ctx.finalize() # nothing
  455. ctx.insert_line("1100 1\n")
  456. ctx.finalize() # inserts [1100, 1100.000001]
  457. ctx.update_start(1199)
  458. ctx.insert_line("1200 1\n")
  459. ctx.update_end(1250)
  460. ctx.finalize() # inserts [1199, 1250]
  461. ctx.update_start(1299)
  462. ctx.finalize() # nothing
  463. ctx.update_end(1350)
  464. ctx.finalize() # nothing
  465. ctx.update_start(1400)
  466. ctx.update_end(1450)
  467. ctx.finalize()
  468. # implicit last finalize inserts [1400, 1450]
  469. # Check everything
  470. eq_(info(), [(1, [100, 145]),
  471. (0, [175, 200]),
  472. (0, [300, 350]),
  473. (0, [400, 450]),
  474. (0, [500, 550]),
  475. (0, [1000, 1050]),
  476. (1, [1100, 1100.000001]),
  477. (1, [1199, 1250]),
  478. (0, [1400, 1450]),
  479. ])
  480. # Clean up
  481. client.stream_destroy("/empty/test")
  482. client.close()
  483. def test_client_12_persistent(self):
  484. # Check that connections are persistent when they should be.
  485. # This is pretty hard to test; we have to poke deep into
  486. # the Requests library.
  487. with nilmdb.Client(url = testurl) as c:
  488. def connections():
  489. try:
  490. poolmanager = c.http._last_response.connection.poolmanager
  491. pool = poolmanager.pools[('http','localhost',32180)]
  492. return (pool.num_connections, pool.num_requests)
  493. except:
  494. raise SkipTest("can't get connection info")
  495. # First request makes a connection
  496. c.stream_create("/persist/test", "uint16_1")
  497. eq_(connections(), (1, 1))
  498. # Non-generator
  499. c.stream_list("/persist/test")
  500. eq_(connections(), (1, 2))
  501. c.stream_list("/persist/test")
  502. eq_(connections(), (1, 3))
  503. # Generators
  504. for x in c.stream_intervals("/persist/test"):
  505. pass
  506. eq_(connections(), (1, 4))
  507. for x in c.stream_intervals("/persist/test"):
  508. pass
  509. eq_(connections(), (1, 5))
  510. # Clean up
  511. c.stream_destroy("/persist/test")
  512. eq_(connections(), (1, 6))