You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

653 lines
25 KiB

  1. # -*- coding: utf-8 -*-
  2. """NilmDB
  3. Object that represents a NILM database file.
  4. Manages both the SQL database and the table storage backend.
  5. """
  6. # Need absolute_import so that "import nilmdb" won't pull in
  7. # nilmdb.py, but will pull the parent nilmdb module instead.
  8. from __future__ import absolute_import
  9. import nilmdb.utils
  10. from nilmdb.utils.printf import *
  11. from nilmdb.utils.interval import IntervalError
  12. from nilmdb.server.interval import Interval, DBInterval, IntervalSet
  13. from nilmdb.server import bulkdata
  14. from nilmdb.server.errors import NilmDBError, StreamError, OverlapError
  15. import sqlite3
  16. import os
  17. import errno
  18. import bisect
  19. # Note about performance and transactions:
  20. #
  21. # Committing a transaction in the default sync mode (PRAGMA synchronous=FULL)
  22. # takes about 125msec. sqlite3 will commit transactions at 3 times:
  23. # 1: explicit con.commit()
  24. # 2: between a series of DML commands and non-DML commands, e.g.
  25. # after a series of INSERT, SELECT, but before a CREATE TABLE or PRAGMA.
  26. # 3: at the end of an explicit transaction, e.g. "with self.con as con:"
  27. #
  28. # To speed things up, we can set 'PRAGMA synchronous=OFF'. Or, it
  29. # seems that 'PRAGMA synchronous=NORMAL' and 'PRAGMA journal_mode=WAL'
  30. # give an equivalent speedup more safely. That is what is used here.
  31. _sql_schema_updates = {
  32. 0: { "next": 1, "sql": """
  33. -- All streams
  34. CREATE TABLE streams(
  35. id INTEGER PRIMARY KEY, -- stream ID
  36. path TEXT UNIQUE NOT NULL, -- path, e.g. '/newton/prep'
  37. layout TEXT NOT NULL -- layout name, e.g. float32_8
  38. );
  39. -- Individual timestamped ranges in those streams.
  40. -- For a given start_time and end_time, this tells us that the
  41. -- data is stored between start_pos and end_pos.
  42. -- Times are stored as μs since Unix epoch
  43. -- Positions are opaque: PyTables rows, file offsets, etc.
  44. --
  45. -- Note: end_pos points to the row _after_ end_time, so end_pos-1
  46. -- is the last valid row.
  47. CREATE TABLE ranges(
  48. stream_id INTEGER NOT NULL,
  49. start_time INTEGER NOT NULL,
  50. end_time INTEGER NOT NULL,
  51. start_pos INTEGER NOT NULL,
  52. end_pos INTEGER NOT NULL
  53. );
  54. CREATE INDEX _ranges_index ON ranges (stream_id, start_time, end_time);
  55. """ },
  56. 1: { "next": 3, "sql": """
  57. -- Generic dictionary-type metadata that can be associated with a stream
  58. CREATE TABLE metadata(
  59. stream_id INTEGER NOT NULL,
  60. key TEXT NOT NULL,
  61. value TEXT
  62. );
  63. """ },
  64. 2: { "error": "old format with floating-point timestamps requires "
  65. "nilmdb 1.3.1 or older" },
  66. 3: { "next": None },
  67. }
  68. @nilmdb.utils.must_close()
  69. class NilmDB(object):
  70. verbose = 0
  71. def __init__(self, basepath, max_results=None,
  72. max_removals=None, bulkdata_args=None):
  73. """Initialize NilmDB at the given basepath.
  74. Other arguments are for debugging / testing:
  75. 'max_results' is the max rows to send in a single
  76. stream_intervals or stream_extract response.
  77. 'max_removals' is the max rows to delete at once
  78. in stream_move.
  79. 'bulkdata_args' is kwargs for the bulkdata module.
  80. """
  81. if bulkdata_args is None:
  82. bulkdata_args = {}
  83. # set up path
  84. self.basepath = os.path.abspath(basepath)
  85. # Create the database path if it doesn't exist
  86. try:
  87. os.makedirs(self.basepath)
  88. except OSError as e:
  89. if e.errno != errno.EEXIST:
  90. raise IOError("can't create tree " + self.basepath)
  91. # Our data goes inside it
  92. self.data = bulkdata.BulkData(self.basepath, **bulkdata_args)
  93. # SQLite database too
  94. sqlfilename = os.path.join(self.basepath, "data.sql")
  95. self.con = sqlite3.connect(sqlfilename, check_same_thread = True)
  96. try:
  97. self._sql_schema_update()
  98. finally: # pragma: no cover
  99. self.data.close()
  100. # See big comment at top about the performance implications of this
  101. self.con.execute("PRAGMA synchronous=NORMAL")
  102. self.con.execute("PRAGMA journal_mode=WAL")
  103. # Approximate largest number of elements that we want to send
  104. # in a single reply (for stream_intervals, stream_extract).
  105. self.max_results = max_results or 16384
  106. # Remove up to this many rows per call to stream_remove.
  107. self.max_removals = max_removals or 1048576
  108. def get_basepath(self):
  109. return self.basepath
  110. def close(self):
  111. if self.con:
  112. self.con.commit()
  113. self.con.close()
  114. self.data.close()
  115. def _sql_schema_update(self):
  116. cur = self.con.cursor()
  117. version = cur.execute("PRAGMA user_version").fetchone()[0]
  118. oldversion = version
  119. while True:
  120. if version not in _sql_schema_updates: # pragma: no cover
  121. raise Exception(self.basepath + ": unknown database version "
  122. + str(version))
  123. update = _sql_schema_updates[version]
  124. if "error" in update: # pragma: no cover
  125. raise Exception(self.basepath + ": can't use database version "
  126. + str(version) + ": " + update["error"])
  127. if update["next"] is None:
  128. break
  129. cur.executescript(update["sql"])
  130. version = update["next"]
  131. if self.verbose: # pragma: no cover
  132. printf("Database schema updated to %d\n", version)
  133. if version != oldversion:
  134. with self.con:
  135. cur.execute("PRAGMA user_version = {v:d}".format(v=version))
  136. def _check_user_times(self, start, end):
  137. if start is None:
  138. start = nilmdb.utils.time.min_timestamp
  139. if end is None:
  140. end = nilmdb.utils.time.max_timestamp
  141. if start >= end:
  142. raise NilmDBError("start must precede end")
  143. return (start, end)
  144. @nilmdb.utils.lru_cache(size = 16)
  145. def _get_intervals(self, stream_id):
  146. """
  147. Return a mutable IntervalSet corresponding to the given stream ID.
  148. """
  149. iset = IntervalSet()
  150. result = self.con.execute("SELECT start_time, end_time, "
  151. "start_pos, end_pos "
  152. "FROM ranges "
  153. "WHERE stream_id=?", (stream_id,))
  154. try:
  155. for (start_time, end_time, start_pos, end_pos) in result:
  156. iset += DBInterval(start_time, end_time,
  157. start_time, end_time,
  158. start_pos, end_pos)
  159. except IntervalError: # pragma: no cover
  160. raise NilmDBError("unexpected overlap in ranges table!")
  161. return iset
  162. def _sql_interval_insert(self, id, start, end, start_pos, end_pos):
  163. """Helper that adds interval to the SQL database only"""
  164. self.con.execute("INSERT INTO ranges "
  165. "(stream_id,start_time,end_time,start_pos,end_pos) "
  166. "VALUES (?,?,?,?,?)",
  167. (id, start, end, start_pos, end_pos))
  168. def _sql_interval_delete(self, id, start, end, start_pos, end_pos):
  169. """Helper that removes interval from the SQL database only"""
  170. self.con.execute("DELETE FROM ranges WHERE "
  171. "stream_id=? AND start_time=? AND "
  172. "end_time=? AND start_pos=? AND end_pos=?",
  173. (id, start, end, start_pos, end_pos))
  174. def _add_interval(self, stream_id, interval, start_pos, end_pos):
  175. """
  176. Add interval to the internal interval cache, and to the database.
  177. Note: arguments must be ints (not numpy.int64, etc)
  178. """
  179. # Load this stream's intervals
  180. iset = self._get_intervals(stream_id)
  181. # Check for overlap
  182. if iset.intersects(interval): # pragma: no cover (gets caught earlier)
  183. raise NilmDBError("new interval overlaps existing data")
  184. # Check for adjacency. If there's a stream in the database
  185. # that ends exactly when this one starts, and the database
  186. # rows match up, we can make one interval that covers the
  187. # time range [adjacent.start -> interval.end)
  188. # and database rows [ adjacent.start_pos -> end_pos ].
  189. # Only do this if the resulting interval isn't too large.
  190. max_merged_rows = 8000 * 60 * 60 * 1.05 # 1.05 hours at 8 KHz
  191. adjacent = iset.find_end(interval.start)
  192. if (adjacent is not None and
  193. start_pos == adjacent.db_endpos and
  194. (end_pos - adjacent.db_startpos) < max_merged_rows):
  195. # First delete the old one, both from our iset and the
  196. # database
  197. iset -= adjacent
  198. self._sql_interval_delete(stream_id,
  199. adjacent.db_start, adjacent.db_end,
  200. adjacent.db_startpos, adjacent.db_endpos)
  201. # Now update our interval so the fallthrough add is
  202. # correct.
  203. interval.start = adjacent.start
  204. start_pos = adjacent.db_startpos
  205. # Add the new interval to the iset
  206. iset.iadd_nocheck(DBInterval(interval.start, interval.end,
  207. interval.start, interval.end,
  208. start_pos, end_pos))
  209. # Insert into the database
  210. self._sql_interval_insert(stream_id, interval.start, interval.end,
  211. int(start_pos), int(end_pos))
  212. self.con.commit()
  213. def _remove_interval(self, stream_id, original, remove):
  214. """
  215. Remove an interval from the internal cache and the database.
  216. stream_id: id of stream
  217. original: original DBInterval; must be already present in DB
  218. to_remove: DBInterval to remove; must be subset of 'original'
  219. """
  220. # Just return if we have nothing to remove
  221. if remove.start == remove.end: # pragma: no cover
  222. return
  223. # Load this stream's intervals
  224. iset = self._get_intervals(stream_id)
  225. # Remove existing interval from the cached set and the database
  226. iset -= original
  227. self._sql_interval_delete(stream_id,
  228. original.db_start, original.db_end,
  229. original.db_startpos, original.db_endpos)
  230. # Add back the intervals that would be left over if the
  231. # requested interval is removed. There may be two of them, if
  232. # the removed piece was in the middle.
  233. def add(iset, start, end, start_pos, end_pos):
  234. iset += DBInterval(start, end, start, end, start_pos, end_pos)
  235. self._sql_interval_insert(stream_id, start, end, start_pos, end_pos)
  236. if original.start != remove.start:
  237. # Interval before the removed region
  238. add(iset, original.start, remove.start,
  239. original.db_startpos, remove.db_startpos)
  240. if original.end != remove.end:
  241. # Interval after the removed region
  242. add(iset, remove.end, original.end,
  243. remove.db_endpos, original.db_endpos)
  244. # Commit SQL changes
  245. self.con.commit()
  246. return
  247. def stream_list(self, path = None, layout = None, extended = False):
  248. """Return list of lists of all streams in the database.
  249. If path is specified, include only streams with a path that
  250. matches the given string.
  251. If layout is specified, include only streams with a layout
  252. that matches the given string.
  253. If extended = False, returns a list of lists containing
  254. the path and layout: [ path, layout ]
  255. If extended = True, returns a list of lists containing
  256. more information:
  257. path
  258. layout
  259. interval_min (earliest interval start)
  260. interval_max (latest interval end)
  261. rows (total number of rows of data)
  262. time (total time covered by this stream, in timestamp units)
  263. """
  264. params = ()
  265. query = "SELECT streams.path, streams.layout"
  266. if extended:
  267. query += ", min(ranges.start_time), max(ranges.end_time) "
  268. query += ", coalesce(sum(ranges.end_pos - ranges.start_pos), 0) "
  269. query += ", coalesce(sum(ranges.end_time - ranges.start_time), 0) "
  270. query += " FROM streams"
  271. if extended:
  272. query += " LEFT JOIN ranges ON streams.id = ranges.stream_id"
  273. query += " WHERE 1=1"
  274. if layout is not None:
  275. query += " AND streams.layout=?"
  276. params += (layout,)
  277. if path is not None:
  278. query += " AND streams.path=?"
  279. params += (path,)
  280. query += " GROUP BY streams.id ORDER BY streams.path"
  281. result = self.con.execute(query, params).fetchall()
  282. return [ list(x) for x in result ]
  283. def stream_intervals(self, path, start = None, end = None, diffpath = None):
  284. """
  285. List all intervals in 'path' between 'start' and 'end'. If
  286. 'diffpath' is not none, list instead the set-difference
  287. between the intervals in the two streams; i.e. all interval
  288. ranges that are present in 'path' but not 'diffpath'.
  289. Returns (intervals, restart) tuple.
  290. 'intervals' is a list of [start,end] timestamps of all intervals
  291. that exist for path, between start and end.
  292. 'restart', if not None, means that there were too many results
  293. to return in a single request. The data is complete from the
  294. starting timestamp to the point at which it was truncated, and
  295. a new request with a start time of 'restart' will fetch the
  296. next block of data.
  297. """
  298. stream_id = self._stream_id(path)
  299. intervals = self._get_intervals(stream_id)
  300. if diffpath:
  301. diffstream_id = self._stream_id(diffpath)
  302. diffintervals = self._get_intervals(diffstream_id)
  303. (start, end) = self._check_user_times(start, end)
  304. requested = Interval(start, end)
  305. result = []
  306. if diffpath:
  307. getter = nilmdb.utils.interval.set_difference(
  308. intervals.intersection(requested),
  309. diffintervals.intersection(requested))
  310. else:
  311. getter = intervals.intersection(requested)
  312. for n, i in enumerate(getter):
  313. if n >= self.max_results:
  314. restart = i.start
  315. break
  316. result.append([i.start, i.end])
  317. else:
  318. restart = None
  319. return (result, restart)
  320. def stream_create(self, path, layout_name):
  321. """Create a new table in the database.
  322. path: path to the data (e.g. '/newton/prep').
  323. Paths must contain at least two elements, e.g.:
  324. /newton/prep
  325. /newton/raw
  326. /newton/upstairs/prep
  327. /newton/upstairs/raw
  328. layout_name: string for nilmdb.layout.get_named(), e.g. 'float32_8'
  329. """
  330. # Create the bulk storage. Raises ValueError on error, which we
  331. # pass along.
  332. self.data.create(path, layout_name)
  333. # Insert into SQL database once the bulk storage is happy
  334. with self.con as con:
  335. con.execute("INSERT INTO streams (path, layout) VALUES (?,?)",
  336. (path, layout_name))
  337. def _stream_id(self, path):
  338. """Return unique stream ID"""
  339. result = self.con.execute("SELECT id FROM streams WHERE path=?",
  340. (path,)).fetchone()
  341. if result is None:
  342. raise StreamError("No stream at path " + path)
  343. return result[0]
  344. def stream_set_metadata(self, path, data):
  345. """Set stream metadata from a dictionary, e.g.
  346. { description = 'Downstairs lighting',
  347. v_scaling = 123.45 }
  348. This replaces all existing metadata.
  349. """
  350. stream_id = self._stream_id(path)
  351. with self.con as con:
  352. con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,))
  353. for key in data:
  354. if data[key] != '':
  355. con.execute("INSERT INTO metadata VALUES (?, ?, ?)",
  356. (stream_id, key, data[key]))
  357. def stream_get_metadata(self, path):
  358. """Return stream metadata as a dictionary."""
  359. stream_id = self._stream_id(path)
  360. result = self.con.execute("SELECT metadata.key, metadata.value "
  361. "FROM metadata "
  362. "WHERE metadata.stream_id=?", (stream_id,))
  363. data = {}
  364. for (key, value) in result:
  365. data[key] = value
  366. return data
  367. def stream_update_metadata(self, path, newdata):
  368. """Update stream metadata from a dictionary"""
  369. data = self.stream_get_metadata(path)
  370. data.update(newdata)
  371. self.stream_set_metadata(path, data)
  372. def stream_rename(self, oldpath, newpath):
  373. """Rename a stream."""
  374. stream_id = self._stream_id(oldpath)
  375. # Rename the data
  376. self.data.rename(oldpath, newpath)
  377. # Rename the stream in the database
  378. with self.con as con:
  379. con.execute("UPDATE streams SET path=? WHERE id=?",
  380. (newpath, stream_id))
  381. def stream_destroy(self, path):
  382. """Fully remove a table from the database. Fails if there are
  383. any intervals data present; remove them first. Metadata is
  384. also removed."""
  385. stream_id = self._stream_id(path)
  386. # Verify that no intervals are present, and clear the cache
  387. iset = self._get_intervals(stream_id)
  388. if len(iset):
  389. raise NilmDBError("all intervals must be removed before "
  390. "destroying a stream")
  391. self._get_intervals.cache_remove(self, stream_id)
  392. # Delete the bulkdata storage
  393. self.data.destroy(path)
  394. # Delete metadata, stream, intervals (should be none)
  395. with self.con as con:
  396. con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,))
  397. con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
  398. con.execute("DELETE FROM streams WHERE id=?", (stream_id,))
  399. def stream_insert(self, path, start, end, data):
  400. """Insert new data into the database.
  401. path: Path at which to add the data
  402. start: Starting timestamp
  403. end: Ending timestamp
  404. data: Textual data, formatted according to the layout of path
  405. """
  406. # First check for basic overlap using timestamp info given.
  407. stream_id = self._stream_id(path)
  408. iset = self._get_intervals(stream_id)
  409. interval = Interval(start, end)
  410. if iset.intersects(interval):
  411. raise OverlapError("new data overlaps existing data at range: "
  412. + str(iset & interval))
  413. # Tenatively append the data. This will raise a ValueError if
  414. # there are any parse errors.
  415. table = self.data.getnode(path)
  416. row_start = table.nrows
  417. table.append_string(data, start, end)
  418. row_end = table.nrows
  419. # Insert the record into the sql database.
  420. self._add_interval(stream_id, interval, row_start, row_end)
  421. # And that's all
  422. return
  423. def _find_start(self, table, dbinterval):
  424. """
  425. Given a DBInterval, find the row in the database that
  426. corresponds to the start time. Return the first database
  427. position with a timestamp (first element) greater than or
  428. equal to 'start'.
  429. """
  430. # Optimization for the common case where an interval wasn't truncated
  431. if dbinterval.start == dbinterval.db_start:
  432. return dbinterval.db_startpos
  433. return bisect.bisect_left(table,
  434. dbinterval.start,
  435. dbinterval.db_startpos,
  436. dbinterval.db_endpos)
  437. def _find_end(self, table, dbinterval):
  438. """
  439. Given a DBInterval, find the row in the database that follows
  440. the end time. Return the first database position after the
  441. row with timestamp (first element) greater than or equal
  442. to 'end'.
  443. """
  444. # Optimization for the common case where an interval wasn't truncated
  445. if dbinterval.end == dbinterval.db_end:
  446. return dbinterval.db_endpos
  447. # Note that we still use bisect_left here, because we don't
  448. # want to include the given timestamp in the results. This is
  449. # so a queries like 1:00 -> 2:00 and 2:00 -> 3:00 return
  450. # non-overlapping data.
  451. return bisect.bisect_left(table,
  452. dbinterval.end,
  453. dbinterval.db_startpos,
  454. dbinterval.db_endpos)
  455. def stream_extract(self, path, start = None, end = None, count = False):
  456. """
  457. Returns (data, restart) tuple.
  458. 'data' is ASCII-formatted data from the database, formatted
  459. according to the layout of the stream.
  460. 'restart', if not None, means that there were too many results to
  461. return in a single request. The data is complete from the
  462. starting timestamp to the point at which it was truncated,
  463. and a new request with a start time of 'restart' will fetch
  464. the next block of data.
  465. count, if true, means to not return raw data, but just the count
  466. of rows that would have been returned. This is much faster
  467. than actually fetching the data. It is not limited by
  468. max_results.
  469. """
  470. stream_id = self._stream_id(path)
  471. table = self.data.getnode(path)
  472. intervals = self._get_intervals(stream_id)
  473. (start, end) = self._check_user_times(start, end)
  474. requested = Interval(start, end)
  475. result = []
  476. matched = 0
  477. remaining = self.max_results
  478. restart = None
  479. for interval in intervals.intersection(requested):
  480. # Reading single rows from the table is too slow, so
  481. # we use two bisections to find both the starting and
  482. # ending row for this particular interval, then
  483. # read the entire range as one slice.
  484. row_start = self._find_start(table, interval)
  485. row_end = self._find_end(table, interval)
  486. if count:
  487. matched += row_end - row_start
  488. continue
  489. # Shorten it if we'll hit the maximum number of results
  490. row_max = row_start + remaining
  491. if row_max < row_end:
  492. row_end = row_max
  493. restart = table[row_max]
  494. # Gather these results up
  495. result.append(table.get_data(row_start, row_end))
  496. # Count them
  497. remaining -= row_end - row_start
  498. if restart is not None:
  499. break
  500. if count:
  501. return matched
  502. return ("".join(result), restart)
  503. def stream_remove(self, path, start = None, end = None):
  504. """
  505. Remove data from the specified time interval within a stream.
  506. Removes data in the interval [start, end), and intervals are
  507. truncated or split appropriately.
  508. Returns a (removed, restart) tuple.
  509. 'removed' is the number of data points that were removed.
  510. 'restart', if not None, means there were too many rows to
  511. remove in a single request. This function should be called
  512. again with a start time of 'restart' to complete the removal.
  513. """
  514. stream_id = self._stream_id(path)
  515. table = self.data.getnode(path)
  516. intervals = self._get_intervals(stream_id)
  517. (start, end) = self._check_user_times(start, end)
  518. to_remove = Interval(start, end)
  519. removed = 0
  520. remaining = self.max_removals
  521. restart = None
  522. # Can't remove intervals from within the iterator, so we need to
  523. # remember what's currently in the intersection now.
  524. all_candidates = list(intervals.intersection(to_remove, orig = True))
  525. for (dbint, orig) in all_candidates:
  526. # Find row start and end
  527. row_start = self._find_start(table, dbint)
  528. row_end = self._find_end(table, dbint)
  529. # Shorten it if we'll hit the maximum number of removals
  530. row_max = row_start + remaining
  531. if row_max < row_end:
  532. row_end = row_max
  533. dbint.end = table[row_max]
  534. restart = dbint.end
  535. # Adjust the DBInterval to match the newly found ends
  536. dbint.db_start = dbint.start
  537. dbint.db_end = dbint.end
  538. dbint.db_startpos = row_start
  539. dbint.db_endpos = row_end
  540. # Remove interval from the database
  541. self._remove_interval(stream_id, orig, dbint)
  542. # Remove data from the underlying table storage
  543. table.remove(row_start, row_end)
  544. # Count how many were removed
  545. removed += row_end - row_start
  546. if restart is not None:
  547. break
  548. return (removed, restart)