You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

785 lines
32 KiB

  1. # -*- coding: utf-8 -*-
  2. import nilmdb.server
  3. import nilmdb.client
  4. from nilmdb.utils.printf import *
  5. from nilmdb.utils import timestamper
  6. from nilmdb.client import ClientError, ServerError, Error
  7. from nilmdb.utils.sort import sort_human
  8. import datetime_tz
  9. from nose.plugins.skip import SkipTest
  10. from nose.tools import *
  11. from nose.tools import assert_raises
  12. import itertools
  13. import distutils.version
  14. import os
  15. import sys
  16. import threading
  17. import io
  18. import json
  19. import unittest
  20. import warnings
  21. import resource
  22. import time
  23. import re
  24. import struct
  25. from testutil.helpers import *
  26. testdb = "tests/client-testdb"
  27. testurl = "http://localhost:32180/"
  28. def setup_module():
  29. global test_server, test_db
  30. # Clear out DB
  31. recursive_unlink(testdb)
  32. # Start web app on a custom port
  33. test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb)
  34. test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
  35. port = 32180, stoppable = False,
  36. fast_shutdown = True,
  37. force_traceback = True)
  38. test_server.start(blocking = False)
  39. def teardown_module():
  40. global test_server, test_db
  41. # Close web app
  42. test_server.stop()
  43. test_db.close()
  44. class TestClient(object):
  45. def test_client_01_basic(self):
  46. # Test a fake host
  47. client = nilmdb.client.Client(url = "http://localhost:1/")
  48. with assert_raises(nilmdb.client.ServerError):
  49. client.version()
  50. client.close()
  51. # Then a fake URL on a real host
  52. client = nilmdb.client.Client(url = "http://localhost:32180/fake/")
  53. with assert_raises(nilmdb.client.ClientError):
  54. client.version()
  55. client.close()
  56. # Now a real URL with no http:// prefix
  57. client = nilmdb.client.Client(url = "localhost:32180")
  58. version = client.version()
  59. client.close()
  60. # Now use the real URL
  61. client = nilmdb.client.Client(url = testurl)
  62. version = client.version()
  63. eq_(distutils.version.LooseVersion(version),
  64. distutils.version.LooseVersion(test_server.version))
  65. # Bad URLs should give 404, not 500
  66. with assert_raises(ClientError):
  67. client.http.get("/stream/create")
  68. # Test error handling
  69. url = testurl
  70. args = { "url": url,
  71. "status": "400",
  72. "message": "Something went wrong",
  73. "traceback": None }
  74. with assert_raises(ClientError):
  75. client.http._handle_error(url, 400, json.dumps(args))
  76. with assert_raises(ClientError):
  77. client.http._handle_error(url, 400, "this is not JSON.. {")
  78. args["status"] = "500"
  79. with assert_raises(ServerError):
  80. client.http._handle_error(url, 500, json.dumps(args))
  81. args["message"] = None
  82. with assert_raises(ServerError):
  83. client.http._handle_error(url, 500, json.dumps(args))
  84. args["status"] = "600"
  85. with assert_raises(Error):
  86. client.http._handle_error(url, 600, json.dumps(args))
  87. # Use get_gen for an endpoint that doesn't have newlines,
  88. # for better test coverage.
  89. for line in client.http.get_gen("/version"):
  90. pass
  91. client.close()
  92. def test_client_02_createlist(self):
  93. # Basic stream tests, like those in test_nilmdb:test_stream
  94. client = nilmdb.client.Client(url = testurl)
  95. # Database starts empty
  96. eq_(client.stream_list(), [])
  97. # Bad path
  98. with assert_raises(ClientError):
  99. client.stream_create("foo/bar/baz", "float32_8")
  100. with assert_raises(ClientError):
  101. client.stream_create("/foo", "float32_8")
  102. # Bad layout type
  103. with assert_raises(ClientError):
  104. client.stream_create("/newton/prep", "NoSuchLayout")
  105. # Bad method types
  106. with assert_raises(ClientError):
  107. client.http.put("/stream/list",b"")
  108. # Try a bunch of times to make sure the request body is getting consumed
  109. for x in range(10):
  110. with assert_raises(ClientError):
  111. client.http.post("/stream/list")
  112. client = nilmdb.client.Client(url = testurl)
  113. # Create four streams
  114. client.stream_create("/newton/prep", "float32_8")
  115. client.stream_create("/newton/raw", "uint16_6")
  116. client.stream_create("/newton/zzz/rawnotch2", "uint16_9")
  117. client.stream_create("/newton/zzz/rawnotch11", "uint16_9")
  118. # Test sort_human (used by stream_list)
  119. eq_(sort_human(["/s/10", "/s/2"]), ["/s/2", "/s/10"])
  120. # Verify we got 4 streams in the right order
  121. eq_(client.stream_list(), [ ["/newton/prep", "float32_8"],
  122. ["/newton/raw", "uint16_6"],
  123. ["/newton/zzz/rawnotch2", "uint16_9"],
  124. ["/newton/zzz/rawnotch11", "uint16_9"]
  125. ])
  126. # Match just one type or one path
  127. eq_(client.stream_list(layout="uint16_6"),
  128. [ ["/newton/raw", "uint16_6"] ])
  129. eq_(client.stream_list(path="/newton/raw"),
  130. [ ["/newton/raw", "uint16_6"] ])
  131. # Try messing with resource limits to trigger errors and get
  132. # more coverage. Here, make it so we can only create files 1
  133. # byte in size, which will trigger an IOError in the server when
  134. # we create a table.
  135. limit = resource.getrlimit(resource.RLIMIT_FSIZE)
  136. resource.setrlimit(resource.RLIMIT_FSIZE, (1, limit[1]))
  137. # normal
  138. with assert_raises(ServerError) as e:
  139. client.stream_create("/newton/hello", "uint16_6")
  140. # same but with force_traceback == False, to improve coverage
  141. global test_server
  142. test_server.force_traceback = False
  143. with assert_raises(ServerError) as e:
  144. client.stream_create("/newton/world", "uint16_6")
  145. test_server.force_traceback = True
  146. # Reset resource limit
  147. resource.setrlimit(resource.RLIMIT_FSIZE, limit)
  148. client.close()
  149. def test_client_03_metadata(self):
  150. client = nilmdb.client.Client(url = testurl)
  151. # Set / get metadata
  152. eq_(client.stream_get_metadata("/newton/prep"), {})
  153. eq_(client.stream_get_metadata("/newton/raw"), {})
  154. meta1 = { "description": "The Data",
  155. "v_scale": "1.234" }
  156. meta2 = { "description": "The Data" }
  157. meta3 = { "v_scale": "1.234" }
  158. client.stream_set_metadata("/newton/prep", meta1)
  159. client.stream_update_metadata("/newton/prep", {})
  160. client.stream_update_metadata("/newton/raw", meta2)
  161. client.stream_update_metadata("/newton/raw", meta3)
  162. eq_(client.stream_get_metadata("/newton/prep"), meta1)
  163. eq_(client.stream_get_metadata("/newton/raw"), meta1)
  164. eq_(client.stream_get_metadata("/newton/raw",
  165. [ "description" ] ), meta2)
  166. eq_(client.stream_get_metadata("/newton/raw",
  167. [ "description", "v_scale" ] ), meta1)
  168. # missing key
  169. eq_(client.stream_get_metadata("/newton/raw", "descr"),
  170. { "descr": None })
  171. eq_(client.stream_get_metadata("/newton/raw", [ "descr" ]),
  172. { "descr": None })
  173. # test wrong types (list instead of dict)
  174. with assert_raises(ClientError):
  175. client.stream_set_metadata("/newton/prep", [1,2,3])
  176. with assert_raises(ClientError):
  177. client.stream_update_metadata("/newton/prep", [1,2,3])
  178. # test wrong types (dict of non-strings)
  179. # numbers are OK; they'll get converted to strings
  180. client.stream_set_metadata("/newton/prep", { "hello": 1234 })
  181. # anything else is not
  182. with assert_raises(ClientError):
  183. client.stream_set_metadata("/newton/prep", { "world": { 1: 2 } })
  184. with assert_raises(ClientError):
  185. client.stream_set_metadata("/newton/prep", { "world": [ 1, 2 ] })
  186. client.close()
  187. def test_client_04_insert(self):
  188. client = nilmdb.client.Client(url = testurl)
  189. # Limit _max_data to 1 MB, since our test file is 1.5 MB
  190. old_max_data = nilmdb.client.client.StreamInserter._max_data
  191. nilmdb.client.client.StreamInserter._max_data = 1 * 1024 * 1024
  192. datetime_tz.localtz_set("America/New_York")
  193. testfile = "tests/data/prep-20120323T1000"
  194. start = nilmdb.utils.time.parse_time("20120323T1000")
  195. rate = 120
  196. # First try a nonexistent path
  197. data = timestamper.TimestamperRate(testfile, start, 120)
  198. with assert_raises(ClientError) as e:
  199. result = client.stream_insert("/newton/no-such-path", data)
  200. in_("404 Not Found", repr(e.exception))
  201. # Now try reversed timestamps
  202. data = timestamper.TimestamperRate(testfile, start, 120)
  203. data = reversed(list(data))
  204. with assert_raises(ClientError) as e:
  205. result = client.stream_insert("/newton/prep", data)
  206. in_("400 Bad Request", str(e.exception))
  207. in2_("timestamp is not monotonically increasing",
  208. "start must precede end", str(e.exception))
  209. # Now try empty data (no server request made)
  210. empty = io.StringIO("")
  211. data = timestamper.TimestamperRate(empty, start, 120)
  212. result = client.stream_insert("/newton/prep", data)
  213. eq_(result, None)
  214. # It's OK to insert an empty interval
  215. client.http.put("stream/insert", b"", { "path": "/newton/prep",
  216. "start": 1, "end": 2 })
  217. eq_(list(client.stream_intervals("/newton/prep")), [[1, 2]])
  218. client.stream_remove("/newton/prep")
  219. eq_(list(client.stream_intervals("/newton/prep")), [])
  220. # Timestamps can be negative too
  221. client.http.put("stream/insert", b"", { "path": "/newton/prep",
  222. "start": -2, "end": -1 })
  223. eq_(list(client.stream_intervals("/newton/prep")), [[-2, -1]])
  224. client.stream_remove("/newton/prep")
  225. eq_(list(client.stream_intervals("/newton/prep")), [])
  226. # Intervals that end at zero shouldn't be any different
  227. client.http.put("stream/insert", b"", { "path": "/newton/prep",
  228. "start": -1, "end": 0 })
  229. eq_(list(client.stream_intervals("/newton/prep")), [[-1, 0]])
  230. client.stream_remove("/newton/prep")
  231. eq_(list(client.stream_intervals("/newton/prep")), [])
  232. # Try forcing a server request with equal start and end
  233. with assert_raises(ClientError) as e:
  234. client.http.put("stream/insert", b"", { "path": "/newton/prep",
  235. "start": 0, "end": 0 })
  236. in_("400 Bad Request", str(e.exception))
  237. in_("start must precede end", str(e.exception))
  238. # Invalid times in HTTP request
  239. with assert_raises(ClientError) as e:
  240. client.http.put("stream/insert", b"", { "path": "/newton/prep",
  241. "start": "asdf", "end": 0 })
  242. in_("400 Bad Request", str(e.exception))
  243. in_("invalid start", str(e.exception))
  244. with assert_raises(ClientError) as e:
  245. client.http.put("stream/insert", b"", { "path": "/newton/prep",
  246. "start": 0, "end": "asdf" })
  247. in_("400 Bad Request", str(e.exception))
  248. in_("invalid end", str(e.exception))
  249. # Good content type
  250. with assert_raises(ClientError) as e:
  251. client.http.put("stream/insert", b"",
  252. { "path": "xxxx", "start": 0, "end": 1,
  253. "binary": 1 })
  254. in_("No such stream", str(e.exception))
  255. # Bad content type
  256. with assert_raises(ClientError) as e:
  257. client.http.put("stream/insert", b"",
  258. { "path": "xxxx", "start": 0, "end": 1,
  259. "binary": 1 },
  260. content_type="text/plain; charset=utf-8")
  261. in_("Content type must be application/octet-stream", str(e.exception))
  262. # Specify start/end (starts too late)
  263. data = timestamper.TimestamperRate(testfile, start, 120)
  264. with assert_raises(ClientError) as e:
  265. result = client.stream_insert("/newton/prep", data,
  266. start + 5000000, start + 120000000)
  267. in_("400 Bad Request", str(e.exception))
  268. in_("Data timestamp 1332511200000000 < start time 1332511205000000",
  269. str(e.exception))
  270. # Specify start/end (ends too early)
  271. data = timestamper.TimestamperRate(testfile, start, 120)
  272. with assert_raises(ClientError) as e:
  273. result = client.stream_insert("/newton/prep", data,
  274. start, start + 1000000)
  275. in_("400 Bad Request", str(e.exception))
  276. # Client chunks the input, so the exact timestamp here might change
  277. # if the chunk positions change.
  278. assert(re.search("Data timestamp 13325[0-9]+ "
  279. ">= end time 1332511201000000", str(e.exception))
  280. is not None)
  281. def check_data():
  282. # Verify the intervals. Should be just one, even if the data
  283. # was inserted in chunks, due to nilmdb interval concatenation.
  284. intervals = list(client.stream_intervals("/newton/prep"))
  285. eq_(intervals, [[start, start + 119999777]])
  286. # Try some overlapping data -- just insert it again
  287. data = timestamper.TimestamperRate(testfile, start, 120)
  288. with assert_raises(ClientError) as e:
  289. result = client.stream_insert("/newton/prep", data)
  290. in_("400 Bad Request", str(e.exception))
  291. in_("verlap", str(e.exception))
  292. # Now do the real load
  293. data = timestamper.TimestamperRate(testfile, start, 120)
  294. result = client.stream_insert("/newton/prep", data,
  295. start, start + 119999777)
  296. check_data()
  297. # Try inserting directly-passed data
  298. client.stream_remove("/newton/prep", start, start + 119999777)
  299. data = timestamper.TimestamperRate(testfile, start, 120)
  300. data_bytes = b''.join(data)
  301. result = client.stream_insert("/newton/prep", data_bytes,
  302. start, start + 119999777)
  303. check_data()
  304. nilmdb.client.client.StreamInserter._max_data = old_max_data
  305. client.close()
  306. def test_client_05_extractremove(self):
  307. # Misc tests for extract and remove. Most of them are in test_cmdline.
  308. client = nilmdb.client.Client(url = testurl)
  309. for x in client.stream_extract("/newton/prep",
  310. 999123000000, 999124000000):
  311. raise AssertionError("shouldn't be any data for this request")
  312. with assert_raises(ClientError) as e:
  313. client.stream_remove("/newton/prep", 123000000, 120000000)
  314. # Test count
  315. eq_(client.stream_count("/newton/prep"), 14400)
  316. # Test binary output
  317. with assert_raises(ClientError) as e:
  318. list(client.stream_extract("/newton/prep",
  319. markup = True, binary = True))
  320. with assert_raises(ClientError) as e:
  321. list(client.stream_extract("/newton/prep",
  322. count = True, binary = True))
  323. data = b"".join(client.stream_extract("/newton/prep", binary = True))
  324. # Quick check using struct
  325. unpacker = struct.Struct("<qffffffff")
  326. out = []
  327. for i in range(14400):
  328. out.append(unpacker.unpack_from(data, i * unpacker.size))
  329. eq_(out[0], (1332511200000000, 266568.0, 224029.0, 5161.39990234375,
  330. 2525.169921875, 8350.83984375, 3724.699951171875,
  331. 1355.3399658203125, 2039.0))
  332. # Just get some coverage
  333. with assert_raises(ClientError) as e:
  334. client.http.post("/stream/remove", { "path": "/none" })
  335. client.close()
  336. def test_client_06_generators(self):
  337. # A lot of the client functionality is already tested by test_cmdline,
  338. # but this gets a bit more coverage that cmdline misses.
  339. client = nilmdb.client.Client(url = testurl)
  340. # Trigger a client error in generator
  341. start = nilmdb.utils.time.parse_time("20120323T2000")
  342. end = nilmdb.utils.time.parse_time("20120323T1000")
  343. for function in [ client.stream_intervals, client.stream_extract ]:
  344. with assert_raises(ClientError) as e:
  345. next(function("/newton/prep", start, end))
  346. in_("400 Bad Request", str(e.exception))
  347. in_("start must precede end", str(e.exception))
  348. # Trigger a curl error in generator
  349. with assert_raises(ServerError) as e:
  350. next(client.http.get_gen("http://nosuchurl.example.com./"))
  351. # Check 404 for missing streams
  352. for function in [ client.stream_intervals, client.stream_extract ]:
  353. with assert_raises(ClientError) as e:
  354. next(function("/no/such/stream"))
  355. in_("404 Not Found", str(e.exception))
  356. in_("No such stream", str(e.exception))
  357. client.close()
  358. def test_client_07_headers(self):
  359. # Make sure that /stream/intervals and /stream/extract
  360. # properly return streaming, chunked, text/plain response.
  361. # Pokes around in client.http internals a bit to look at the
  362. # response headers.
  363. client = nilmdb.client.Client(url = testurl)
  364. http = client.http
  365. # Use a warning rather than returning a test failure for the
  366. # transfer-encoding, so that we can still disable chunked
  367. # responses for debugging.
  368. def headers():
  369. h = ""
  370. for (k, v) in list(http._last_response.headers.items()):
  371. h += k + ": " + v + "\n"
  372. return h.lower()
  373. # Intervals
  374. x = http.get("stream/intervals", { "path": "/newton/prep" })
  375. if "transfer-encoding: chunked" not in headers():
  376. warnings.warn("Non-chunked HTTP response for /stream/intervals")
  377. if "content-type: application/x-json-stream" not in headers():
  378. raise AssertionError("/stream/intervals content type "
  379. "is not application/x-json-stream:\n" +
  380. headers())
  381. # Extract
  382. x = http.get("stream/extract", { "path": "/newton/prep",
  383. "start": "123", "end": "124" })
  384. if "transfer-encoding: chunked" not in headers():
  385. warnings.warn("Non-chunked HTTP response for /stream/extract")
  386. if "content-type: text/plain;charset=utf-8" not in headers():
  387. raise AssertionError("/stream/extract is not text/plain:\n" +
  388. headers())
  389. x = http.get("stream/extract", { "path": "/newton/prep",
  390. "start": "123", "end": "124",
  391. "binary": "1" })
  392. if "transfer-encoding: chunked" not in headers():
  393. warnings.warn("Non-chunked HTTP response for /stream/extract")
  394. if "content-type: application/octet-stream" not in headers():
  395. raise AssertionError("/stream/extract is not binary:\n" +
  396. headers())
  397. # Make sure a binary of "0" is really off
  398. x = http.get("stream/extract", { "path": "/newton/prep",
  399. "start": "123", "end": "124",
  400. "binary": "0" })
  401. if "content-type: application/octet-stream" in headers():
  402. raise AssertionError("/stream/extract is not text:\n" +
  403. headers())
  404. # Invalid parameters
  405. with assert_raises(ClientError) as e:
  406. x = http.get("stream/extract", { "path": "/newton/prep",
  407. "start": "123", "end": "124",
  408. "binary": "asdfasfd" })
  409. in_("can't parse parameter", str(e.exception))
  410. client.close()
  411. def test_client_08_unicode(self):
  412. # Try both with and without posting JSON
  413. for post_json in (False, True):
  414. # Basic Unicode tests
  415. client = nilmdb.client.Client(url = testurl, post_json = post_json)
  416. # Delete streams that exist
  417. for stream in client.stream_list():
  418. client.stream_remove(stream[0])
  419. client.stream_destroy(stream[0])
  420. # Database is empty
  421. eq_(client.stream_list(), [])
  422. # Create Unicode stream, match it
  423. raw = [ "/düsseldorf/raw", "uint16_6" ]
  424. prep = [ "/düsseldorf/prep", "uint16_6" ]
  425. client.stream_create(*raw)
  426. eq_(client.stream_list(), [raw])
  427. eq_(client.stream_list(layout=raw[1]), [raw])
  428. eq_(client.stream_list(path=raw[0]), [raw])
  429. client.stream_create(*prep)
  430. eq_(client.stream_list(), [prep, raw])
  431. # Set / get metadata with Unicode keys and values
  432. eq_(client.stream_get_metadata(raw[0]), {})
  433. eq_(client.stream_get_metadata(prep[0]), {})
  434. meta1 = { "alpha": "α",
  435. "β": "beta" }
  436. meta2 = { "alpha": "α" }
  437. meta3 = { "β": "beta" }
  438. client.stream_set_metadata(prep[0], meta1)
  439. client.stream_update_metadata(prep[0], {})
  440. client.stream_update_metadata(raw[0], meta2)
  441. client.stream_update_metadata(raw[0], meta3)
  442. eq_(client.stream_get_metadata(prep[0]), meta1)
  443. eq_(client.stream_get_metadata(raw[0]), meta1)
  444. eq_(client.stream_get_metadata(raw[0], [ "alpha" ]), meta2)
  445. eq_(client.stream_get_metadata(raw[0], [ "alpha", "β" ]), meta1)
  446. client.close()
  447. def test_client_09_closing(self):
  448. # Make sure we actually close sockets correctly. New
  449. # connections will block for a while if they're not, since the
  450. # server will stop accepting new connections.
  451. for test in [1, 2]:
  452. start = time.time()
  453. for i in range(50):
  454. if time.time() - start > 15:
  455. raise AssertionError("Connections seem to be blocking... "
  456. "probably not closing properly.")
  457. if test == 1:
  458. # explicit close
  459. client = nilmdb.client.Client(url = testurl)
  460. with assert_raises(ClientError) as e:
  461. client.stream_remove("/newton/prep", 123, 120)
  462. client.close() # remove this to see the failure
  463. elif test == 2:
  464. # use the context manager
  465. with nilmdb.client.Client(url = testurl) as c:
  466. with assert_raises(ClientError) as e:
  467. c.stream_remove("/newton/prep", 123, 120)
  468. def test_client_10_context(self):
  469. # Test using the client's stream insertion context manager to
  470. # insert data.
  471. client = nilmdb.client.Client(testurl)
  472. client.stream_create("/context/test", "uint16_1")
  473. with client.stream_insert_context("/context/test") as ctx:
  474. # override _max_data to trigger frequent server updates
  475. ctx._max_data = 15
  476. ctx.insert(b"1000 1\n")
  477. ctx.insert(b"1010 ")
  478. ctx.insert(b"1\n1020 1")
  479. ctx.insert(b"")
  480. ctx.insert(b"\n1030 1\n")
  481. ctx.insert(b"1040 1\n")
  482. ctx.insert(b"# hello\n")
  483. ctx.insert(b" # hello\n")
  484. ctx.insert(b" 1050 1\n")
  485. ctx.finalize()
  486. ctx.insert(b"1070 1\n")
  487. ctx.update_end(1080)
  488. ctx.finalize()
  489. ctx.update_start(1090)
  490. ctx.insert(b"1100 1\n")
  491. ctx.insert(b"1110 1\n")
  492. ctx.send()
  493. ctx.insert(b"1120 1\n")
  494. ctx.insert(b"1130 1\n")
  495. ctx.insert(b"1140 1\n")
  496. ctx.update_end(1160)
  497. ctx.insert(b"1150 1\n")
  498. ctx.update_end(1170)
  499. ctx.insert(b"1160 1\n")
  500. ctx.update_end(1180)
  501. ctx.insert(b"1170 1" +
  502. b" # this is super long" * 100 +
  503. b"\n")
  504. ctx.finalize()
  505. ctx.insert(b"# this is super long" * 100)
  506. # override _max_data_after_send to trigger ValueError on a
  507. # long nonterminated line
  508. ctx._max_data_after_send = 1000
  509. with assert_raises(ValueError):
  510. ctx.insert(b"# this is super long" * 100)
  511. with assert_raises(ClientError):
  512. with client.stream_insert_context("/context/test",
  513. 1000, 2000) as ctx:
  514. ctx.insert(b"1180 1\n")
  515. with assert_raises(ClientError):
  516. with client.stream_insert_context("/context/test",
  517. 2000, 3000) as ctx:
  518. ctx.insert(b"1180 1\n")
  519. with assert_raises(ClientError):
  520. with client.stream_insert_context("/context/test") as ctx:
  521. ctx.insert(b"bogus data\n")
  522. with client.stream_insert_context("/context/test", 2000, 3000) as ctx:
  523. # make sure our override wasn't permanent
  524. ne_(ctx._max_data, 15)
  525. ctx.insert(b"2250 1\n")
  526. ctx.finalize()
  527. with assert_raises(ClientError):
  528. with client.stream_insert_context("/context/test",
  529. 3000, 4000) as ctx:
  530. ctx.insert(b"3010 1\n")
  531. ctx.insert(b"3020 2\n")
  532. ctx.insert(b"3030 3\n")
  533. ctx.insert(b"3040 4\n")
  534. ctx.insert(b"3040 4\n") # non-monotonic after a few lines
  535. ctx.finalize()
  536. eq_(list(client.stream_intervals("/context/test")),
  537. [ [ 1000, 1051 ],
  538. [ 1070, 1080 ],
  539. [ 1090, 1180 ],
  540. [ 2000, 3000 ] ])
  541. # destroy stream (try without removing data first)
  542. with assert_raises(ClientError):
  543. client.stream_destroy("/context/test")
  544. client.stream_remove("/context/test")
  545. client.stream_destroy("/context/test")
  546. client.close()
  547. def test_client_11_emptyintervals(self):
  548. # Empty intervals are ok! If recording detection events
  549. # by inserting rows into the database, we want to be able to
  550. # have an interval where no events occurred. Test them here.
  551. client = nilmdb.client.Client(testurl)
  552. client.stream_create("/empty/test", "uint16_1")
  553. def info():
  554. result = []
  555. for interval in list(client.stream_intervals("/empty/test")):
  556. result.append((client.stream_count("/empty/test", *interval),
  557. interval))
  558. return result
  559. eq_(info(), [])
  560. # Insert a region with just a few points
  561. with client.stream_insert_context("/empty/test") as ctx:
  562. ctx.update_start(100)
  563. ctx.insert(b"140 1\n")
  564. ctx.insert(b"150 1\n")
  565. ctx.insert(b"160 1\n")
  566. ctx.update_end(200)
  567. ctx.finalize()
  568. eq_(info(), [(3, [100, 200])])
  569. # Delete chunk, which will leave one data point and two intervals
  570. client.stream_remove("/empty/test", 145, 175)
  571. eq_(info(), [(1, [100, 145]),
  572. (0, [175, 200])])
  573. # Try also creating a completely empty interval from scratch,
  574. # in a few different ways.
  575. client.stream_insert("/empty/test", b"", 300, 350)
  576. client.stream_insert("/empty/test", [], 400, 450)
  577. with client.stream_insert_context("/empty/test", 500, 550):
  578. pass
  579. # If enough timestamps aren't provided, empty streams won't be created.
  580. client.stream_insert("/empty/test", [])
  581. with client.stream_insert_context("/empty/test"):
  582. pass
  583. client.stream_insert("/empty/test", [], start = 600)
  584. with client.stream_insert_context("/empty/test", start = 700):
  585. pass
  586. client.stream_insert("/empty/test", [], end = 850)
  587. with client.stream_insert_context("/empty/test", end = 950):
  588. pass
  589. # Equal start and end is OK as long as there's no data
  590. with client.stream_insert_context("/empty/test", start=9, end=9):
  591. pass
  592. # Try various things that might cause problems
  593. with client.stream_insert_context("/empty/test", 1000, 1050) as ctx:
  594. ctx.finalize() # inserts [1000, 1050]
  595. ctx.finalize() # nothing
  596. ctx.finalize() # nothing
  597. ctx.insert(b"1100 1\n")
  598. ctx.finalize() # inserts [1100, 1101]
  599. ctx.update_start(1199)
  600. ctx.insert(b"1200 1\n")
  601. ctx.update_end(1250)
  602. ctx.finalize() # inserts [1199, 1250]
  603. ctx.update_start(1299)
  604. ctx.finalize() # nothing
  605. ctx.update_end(1350)
  606. ctx.finalize() # nothing
  607. ctx.update_start(1400)
  608. ctx.insert(b"# nothing!\n")
  609. ctx.update_end(1450)
  610. ctx.finalize()
  611. ctx.update_start(1500)
  612. ctx.insert(b"# nothing!")
  613. ctx.update_end(1550)
  614. ctx.finalize()
  615. ctx.insert(b"# nothing!\n" * 10)
  616. ctx.finalize()
  617. # implicit last finalize inserts [1400, 1450]
  618. # Check everything
  619. eq_(info(), [(1, [100, 145]),
  620. (0, [175, 200]),
  621. (0, [300, 350]),
  622. (0, [400, 450]),
  623. (0, [500, 550]),
  624. (0, [1000, 1050]),
  625. (1, [1100, 1101]),
  626. (1, [1199, 1250]),
  627. (0, [1400, 1450]),
  628. (0, [1500, 1550]),
  629. ])
  630. # Clean up
  631. client.stream_remove("/empty/test")
  632. client.stream_destroy("/empty/test")
  633. client.close()
  634. def test_client_12_persistent(self):
  635. # Check that connections are NOT persistent. Rather than trying
  636. # to verify this at the TCP level, just make sure that the response
  637. # contained a "Connection: close" header.
  638. with nilmdb.client.Client(url = testurl) as c:
  639. c.stream_create("/persist/test", "uint16_1")
  640. eq_(c.http._last_response.headers["Connection"], "close")
  641. c.stream_destroy("/persist/test")
  642. eq_(c.http._last_response.headers["Connection"], "close")
  643. def test_client_13_timestamp_rounding(self):
  644. # Test potentially bad timestamps (due to floating point
  645. # roundoff etc). The server will round floating point values
  646. # to the nearest int.
  647. client = nilmdb.client.Client(testurl)
  648. client.stream_create("/rounding/test", "uint16_1")
  649. with client.stream_insert_context("/rounding/test",
  650. 100000000, 200000000.1) as ctx:
  651. ctx.insert(b"100000000.1 1\n")
  652. ctx.insert(b"150000000.00003 1\n")
  653. ctx.insert(b"199999999.4 1\n")
  654. eq_(list(client.stream_intervals("/rounding/test")),
  655. [ [ 100000000, 200000000 ] ])
  656. with assert_raises(ClientError):
  657. with client.stream_insert_context("/rounding/test",
  658. 200000000, 300000000) as ctx:
  659. ctx.insert(b"200000000 1\n")
  660. ctx.insert(b"250000000 1\n")
  661. # Server will round this and give an error on finalize()
  662. ctx.insert(b"299999999.99 1\n")
  663. client.stream_remove("/rounding/test")
  664. client.stream_destroy("/rounding/test")
  665. client.close()