You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

377 lines
14 KiB

  1. # -*- coding: utf-8 -*-
  2. import nilmdb.server
  3. import nilmdb.client
  4. import nilmdb.client.numpyclient
  5. from nilmdb.utils.printf import *
  6. from nilmdb.utils import timestamper
  7. from nilmdb.client import ClientError, ServerError
  8. import datetime_tz
  9. from nose.plugins.skip import SkipTest
  10. from nose.tools import *
  11. from nose.tools import assert_raises
  12. import itertools
  13. import distutils.version
  14. from testutil.helpers import *
  15. import numpy as np
  16. testdb = "tests/numpyclient-testdb"
  17. testurl = "http://localhost:32180/"
  18. def setup_module():
  19. global test_server, test_db
  20. # Clear out DB
  21. recursive_unlink(testdb)
  22. # Start web app on a custom port
  23. test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
  24. testdb, bulkdata_args = { "file_size" : 16384,
  25. "files_per_dir" : 3 } )
  26. test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
  27. port = 32180, stoppable = False,
  28. fast_shutdown = True,
  29. force_traceback = True)
  30. test_server.start(blocking = False)
  31. def teardown_module():
  32. global test_server, test_db
  33. # Close web app
  34. test_server.stop()
  35. test_db.close()
  36. class TestNumpyClient(object):
  37. def test_numpyclient_01_basic(self):
  38. # Test basic connection
  39. client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
  40. version = client.version()
  41. eq_(distutils.version.LooseVersion(version),
  42. distutils.version.LooseVersion(test_server.version))
  43. # Verify subclassing
  44. assert(isinstance(client, nilmdb.client.Client))
  45. # Layouts
  46. for layout in "int8_t", "something_8", "integer_1":
  47. with assert_raises(ValueError):
  48. for x in client.stream_extract_numpy("/foo", layout=layout):
  49. pass
  50. for layout in "int8_1", "uint8_30", "int16_20", "float64_100":
  51. with assert_raises(ClientError) as e:
  52. for x in client.stream_extract_numpy("/foo", layout=layout):
  53. pass
  54. in_("No such stream", str(e.exception))
  55. with assert_raises(ClientError) as e:
  56. for x in client.stream_extract_numpy("/foo"):
  57. pass
  58. in_("can't get layout for path", str(e.exception))
  59. client.close()
  60. def test_numpyclient_02_extract(self):
  61. client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
  62. # Insert some data as text
  63. client.stream_create("/newton/prep", "float32_8")
  64. testfile = "tests/data/prep-20120323T1000"
  65. start = nilmdb.utils.time.parse_time("20120323T1000")
  66. rate = 120
  67. data = timestamper.TimestamperRate(testfile, start, rate)
  68. result = client.stream_insert("/newton/prep", data,
  69. start, start + 119999777)
  70. # Extract Numpy arrays
  71. array = None
  72. pieces = 0
  73. for chunk in client.stream_extract_numpy("/newton/prep", maxrows=1000):
  74. pieces += 1
  75. if array is not None:
  76. array = np.vstack((array, chunk))
  77. else:
  78. array = chunk
  79. eq_(array.shape, (14400, 9))
  80. eq_(pieces, 15)
  81. # Try structured
  82. s = list(client.stream_extract_numpy("/newton/prep", structured = True))
  83. assert(np.array_equal(np.c_[s[0]['timestamp'], s[0]['data']], array))
  84. # Compare. Will be close but not exact because the conversion
  85. # to and from ASCII was lossy.
  86. data = timestamper.TimestamperRate(testfile, start, rate)
  87. data_str = b" ".join(data).decode('utf-8', errors='backslashreplace')
  88. actual = np.fromstring(data_str, sep=' ').reshape(14400, 9)
  89. assert(np.allclose(array, actual))
  90. client.close()
  91. def test_numpyclient_03_insert(self):
  92. client = nilmdb.client.numpyclient.NumpyClient(url = testurl)
  93. # Limit _max_data just to get better coverage
  94. old_max_data = nilmdb.client.numpyclient.StreamInserterNumpy._max_data
  95. nilmdb.client.numpyclient.StreamInserterNumpy._max_data = 100000
  96. client.stream_create("/test/1", "uint16_1")
  97. client.stream_insert_numpy("/test/1",
  98. np.array([[0, 1],
  99. [1, 2],
  100. [2, 3],
  101. [3, 4]]))
  102. # Wrong number of dimensions
  103. with assert_raises(ValueError) as e:
  104. client.stream_insert_numpy("/test/1",
  105. np.array([[[0, 1],
  106. [1, 2]],
  107. [[3, 4],
  108. [4, 5]]]))
  109. in_("wrong number of dimensions", str(e.exception))
  110. # Wrong number of fields
  111. with assert_raises(ValueError) as e:
  112. client.stream_insert_numpy("/test/1",
  113. np.array([[0, 1, 2],
  114. [1, 2, 3],
  115. [3, 4, 5],
  116. [4, 5, 6]]))
  117. in_("wrong number of fields", str(e.exception))
  118. # Unstructured
  119. client.stream_create("/test/2", "float32_8")
  120. client.stream_insert_numpy(
  121. "/test/2",
  122. client.stream_extract_numpy(
  123. "/newton/prep", structured = False, maxrows = 1000))
  124. # Structured, and specifying layout.
  125. # This also tests the final branch in stream_extract_numpy by specifing
  126. # a value of maxrows that exactly matches how much data we had inserted.
  127. client.stream_create("/test/3", "float32_8")
  128. client.stream_insert_numpy(
  129. path = "/test/3", layout = "float32_8",
  130. data = client.stream_extract_numpy(
  131. "/newton/prep", structured = True, maxrows = 14400))
  132. # Structured, specifying wrong layout
  133. client.stream_create("/test/4", "float32_8")
  134. with assert_raises(ValueError) as e:
  135. client.stream_insert_numpy(
  136. "/test/4", layout = "uint16_1",
  137. data = client.stream_extract_numpy(
  138. "/newton/prep", structured = True, maxrows = 1000))
  139. in_("wrong dtype", str(e.exception))
  140. # Unstructured, and specifying wrong layout
  141. client.stream_create("/test/5", "float32_8")
  142. with assert_raises(ClientError) as e:
  143. client.stream_insert_numpy(
  144. "/test/5", layout = "uint16_8",
  145. data = client.stream_extract_numpy(
  146. "/newton/prep", structured = False, maxrows = 1000))
  147. # timestamps will be screwy here, because data will be parsed wrong
  148. in_("error parsing input data", str(e.exception))
  149. # Make sure the /newton/prep copies are identical
  150. a = np.vstack(list(client.stream_extract_numpy("/newton/prep")))
  151. b = np.vstack(list(client.stream_extract_numpy("/test/2")))
  152. c = np.vstack(list(client.stream_extract_numpy("/test/3")))
  153. assert(np.array_equal(a,b))
  154. assert(np.array_equal(a,c))
  155. # Make sure none of the files are greater than 16384 bytes as
  156. # we configured with the bulkdata_args above.
  157. datapath = os.path.join(testdb, "data")
  158. for (dirpath, dirnames, filenames) in os.walk(datapath):
  159. for f in filenames:
  160. fn = os.path.join(dirpath, f)
  161. size = os.path.getsize(fn)
  162. if size > 16384:
  163. raise AssertionError(sprintf("%s is too big: %d > %d\n",
  164. fn, size, 16384))
  165. nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data
  166. client.close()
  167. def test_numpyclient_04_context(self):
  168. # Like test_client_context, but with Numpy data
  169. client = nilmdb.client.numpyclient.NumpyClient(testurl)
  170. client.stream_create("/context/test", "uint16_1")
  171. with client.stream_insert_numpy_context("/context/test") as ctx:
  172. # override _max_rows to trigger frequent server updates
  173. ctx._max_rows = 2
  174. ctx.insert([[1000, 1]])
  175. ctx.insert([[1010, 1], [1020, 1], [1030, 1]])
  176. ctx.insert([[1040, 1], [1050, 1]])
  177. ctx.finalize()
  178. ctx.insert([[1070, 1]])
  179. ctx.update_end(1080)
  180. ctx.finalize()
  181. ctx.update_start(1090)
  182. ctx.insert([[1100, 1]])
  183. ctx.insert([[1110, 1]])
  184. ctx.send()
  185. ctx.insert([[1120, 1], [1130, 1], [1140, 1]])
  186. ctx.update_end(1160)
  187. ctx.insert([[1150, 1]])
  188. ctx.update_end(1170)
  189. ctx.insert([[1160, 1]])
  190. ctx.update_end(1180)
  191. ctx.insert([[1170, 123456789.0]])
  192. ctx.finalize()
  193. ctx.insert(np.zeros((0,2)))
  194. with assert_raises(ClientError):
  195. with client.stream_insert_numpy_context("/context/test",
  196. 1000, 2000) as ctx:
  197. ctx.insert([[1180, 1]])
  198. with assert_raises(ClientError):
  199. with client.stream_insert_numpy_context("/context/test",
  200. 2000, 3000) as ctx:
  201. ctx._max_rows = 2
  202. ctx.insert([[3180, 1]])
  203. ctx.insert([[3181, 1]])
  204. with client.stream_insert_numpy_context("/context/test",
  205. 2000, 3000) as ctx:
  206. # make sure our override wasn't permanent
  207. ne_(ctx._max_rows, 2)
  208. ctx.insert([[2250, 1]])
  209. ctx.finalize()
  210. with assert_raises(ClientError):
  211. with client.stream_insert_numpy_context("/context/test",
  212. 3000, 4000) as ctx:
  213. ctx.insert([[3010, 1]])
  214. ctx.insert([[3020, 2]])
  215. ctx.insert([[3030, 3]])
  216. ctx.insert([[3040, 4]])
  217. ctx.insert([[3040, 4]]) # non-monotonic after a few lines
  218. ctx.finalize()
  219. eq_(list(client.stream_intervals("/context/test")),
  220. [ [ 1000, 1051 ],
  221. [ 1070, 1080 ],
  222. [ 1090, 1180 ],
  223. [ 2000, 3000 ] ])
  224. client.stream_remove("/context/test")
  225. client.stream_destroy("/context/test")
  226. client.close()
  227. def test_numpyclient_05_emptyintervals(self):
  228. # Like test_client_emptyintervals, with insert_numpy_context
  229. client = nilmdb.client.numpyclient.NumpyClient(testurl)
  230. client.stream_create("/empty/test", "uint16_1")
  231. def info():
  232. result = []
  233. for interval in list(client.stream_intervals("/empty/test")):
  234. result.append((client.stream_count("/empty/test", *interval),
  235. interval))
  236. return result
  237. eq_(info(), [])
  238. # Insert a region with just a few points
  239. with client.stream_insert_numpy_context("/empty/test") as ctx:
  240. ctx.update_start(100)
  241. ctx.insert([[140, 1]])
  242. ctx.insert([[150, 1]])
  243. ctx.insert([[160, 1]])
  244. ctx.update_end(200)
  245. ctx.finalize()
  246. eq_(info(), [(3, [100, 200])])
  247. # Delete chunk, which will leave one data point and two intervals
  248. client.stream_remove("/empty/test", 145, 175)
  249. eq_(info(), [(1, [100, 145]),
  250. (0, [175, 200])])
  251. # Try also creating a completely empty interval from scratch,
  252. # in a few different ways.
  253. client.stream_insert("/empty/test", b"", 300, 350)
  254. client.stream_insert("/empty/test", [], 400, 450)
  255. with client.stream_insert_numpy_context("/empty/test", 500, 550):
  256. pass
  257. # If enough timestamps aren't provided, empty streams won't be created.
  258. client.stream_insert("/empty/test", [])
  259. with client.stream_insert_numpy_context("/empty/test"):
  260. pass
  261. client.stream_insert("/empty/test", [], start = 600)
  262. with client.stream_insert_numpy_context("/empty/test", start = 700):
  263. pass
  264. client.stream_insert("/empty/test", [], end = 850)
  265. with client.stream_insert_numpy_context("/empty/test", end = 950):
  266. pass
  267. # Equal start and end is OK as long as there's no data
  268. with assert_raises(ClientError) as e:
  269. with client.stream_insert_numpy_context("/empty/test",
  270. start=9, end=9) as ctx:
  271. ctx.insert([[9, 9]])
  272. ctx.finalize()
  273. in_("have data to send, but invalid start/end times", str(e.exception))
  274. with client.stream_insert_numpy_context("/empty/test",
  275. start=9, end=9) as ctx:
  276. pass
  277. # reusing a context object is bad
  278. with assert_raises(Exception) as e:
  279. ctx.insert([[9, 9]])
  280. # Try various things that might cause problems
  281. with client.stream_insert_numpy_context("/empty/test",
  282. 1000, 1050) as ctx:
  283. ctx.finalize() # inserts [1000, 1050]
  284. ctx.finalize() # nothing
  285. ctx.finalize() # nothing
  286. ctx.insert([[1100, 1]])
  287. ctx.finalize() # inserts [1100, 1101]
  288. ctx.update_start(1199)
  289. ctx.insert([[1200, 1]])
  290. ctx.update_end(1250)
  291. ctx.finalize() # inserts [1199, 1250]
  292. ctx.update_start(1299)
  293. ctx.finalize() # nothing
  294. ctx.update_end(1350)
  295. ctx.finalize() # nothing
  296. ctx.update_start(1400)
  297. ctx.insert(np.zeros((0,2)))
  298. ctx.update_end(1450)
  299. ctx.finalize()
  300. ctx.update_start(1500)
  301. ctx.insert(np.zeros((0,2)))
  302. ctx.update_end(1550)
  303. ctx.finalize()
  304. ctx.insert(np.zeros((0,2)))
  305. ctx.insert(np.zeros((0,2)))
  306. ctx.insert(np.zeros((0,2)))
  307. ctx.finalize()
  308. # Check everything
  309. eq_(info(), [(1, [100, 145]),
  310. (0, [175, 200]),
  311. (0, [300, 350]),
  312. (0, [400, 450]),
  313. (0, [500, 550]),
  314. (0, [1000, 1050]),
  315. (1, [1100, 1101]),
  316. (1, [1199, 1250]),
  317. (0, [1400, 1450]),
  318. (0, [1500, 1550]),
  319. ])
  320. # Clean up
  321. client.stream_remove("/empty/test")
  322. client.stream_destroy("/empty/test")
  323. client.close()