You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

567 lines
21 KiB

  1. # -*- coding: utf-8 -*-
  2. """NilmDB
  3. Object that represents a NILM database file.
  4. Manages both the SQL database and the table storage backend.
  5. """
  6. # Need absolute_import so that "import nilmdb" won't pull in nilmdb.py,
  7. # but will pull the nilmdb module instead.
  8. from __future__ import absolute_import
  9. import nilmdb
  10. from nilmdb.utils.printf import *
  11. import sqlite3
  12. import time
  13. import sys
  14. import os
  15. import errno
  16. import bisect
  17. import pyximport
  18. pyximport.install()
  19. from nilmdb.interval import Interval, DBInterval, IntervalSet, IntervalError
  20. from . import bulkdata
  21. # Note about performance and transactions:
  22. #
  23. # Committing a transaction in the default sync mode (PRAGMA synchronous=FULL)
  24. # takes about 125msec. sqlite3 will commit transactions at 3 times:
  25. # 1: explicit con.commit()
  26. # 2: between a series of DML commands and non-DML commands, e.g.
  27. # after a series of INSERT, SELECT, but before a CREATE TABLE or PRAGMA.
  28. # 3: at the end of an explicit transaction, e.g. "with self.con as con:"
  29. #
  30. # To speed up testing, or if this transaction speed becomes an issue,
  31. # the sync=False option to NilmDB.__init__ will set PRAGMA synchronous=OFF.
  32. # Don't touch old entries -- just add new ones.
  33. _sql_schema_updates = {
  34. 0: """
  35. -- All streams
  36. CREATE TABLE streams(
  37. id INTEGER PRIMARY KEY, -- stream ID
  38. path TEXT UNIQUE NOT NULL, -- path, e.g. '/newton/prep'
  39. layout TEXT NOT NULL -- layout name, e.g. float32_8
  40. );
  41. -- Individual timestamped ranges in those streams.
  42. -- For a given start_time and end_time, this tells us that the
  43. -- data is stored between start_pos and end_pos.
  44. -- Times are stored as μs since Unix epoch
  45. -- Positions are opaque: PyTables rows, file offsets, etc.
  46. --
  47. -- Note: end_pos points to the row _after_ end_time, so end_pos-1
  48. -- is the last valid row.
  49. CREATE TABLE ranges(
  50. stream_id INTEGER NOT NULL,
  51. start_time INTEGER NOT NULL,
  52. end_time INTEGER NOT NULL,
  53. start_pos INTEGER NOT NULL,
  54. end_pos INTEGER NOT NULL
  55. );
  56. CREATE INDEX _ranges_index ON ranges (stream_id, start_time, end_time);
  57. """,
  58. 1: """
  59. -- Generic dictionary-type metadata that can be associated with a stream
  60. CREATE TABLE metadata(
  61. stream_id INTEGER NOT NULL,
  62. key TEXT NOT NULL,
  63. value TEXT
  64. );
  65. """,
  66. }
  67. class NilmDBError(Exception):
  68. """Base exception for NilmDB errors"""
  69. def __init__(self, message = "Unspecified error"):
  70. Exception.__init__(self, self.__class__.__name__ + ": " + message)
  71. class StreamError(NilmDBError):
  72. pass
  73. class OverlapError(NilmDBError):
  74. pass
  75. @nilmdb.utils.must_close()
  76. class NilmDB(object):
  77. verbose = 0
  78. def __init__(self, basepath, sync=True, max_results=None,
  79. bulkdata_args={}):
  80. # set up path
  81. self.basepath = os.path.abspath(basepath)
  82. # Create the database path if it doesn't exist
  83. try:
  84. os.makedirs(self.basepath)
  85. except OSError as e:
  86. if e.errno != errno.EEXIST:
  87. raise IOError("can't create tree " + self.basepath)
  88. # Our data goes inside it
  89. self.data = bulkdata.BulkData(self.basepath, **bulkdata_args)
  90. # SQLite database too
  91. sqlfilename = os.path.join(self.basepath, "data.sql")
  92. # We use check_same_thread = False, assuming that the rest
  93. # of the code (e.g. Server) will be smart and not access this
  94. # database from multiple threads simultaneously. Otherwise
  95. # false positives will occur when the database is only opened
  96. # in one thread, and only accessed in another.
  97. self.con = sqlite3.connect(sqlfilename, check_same_thread = False)
  98. self._sql_schema_update()
  99. # See big comment at top about the performance implications of this
  100. if sync:
  101. self.con.execute("PRAGMA synchronous=FULL")
  102. else:
  103. self.con.execute("PRAGMA synchronous=OFF")
  104. # Approximate largest number of elements that we want to send
  105. # in a single reply (for stream_intervals, stream_extract)
  106. if max_results:
  107. self.max_results = max_results
  108. else:
  109. self.max_results = 16384
  110. def get_basepath(self):
  111. return self.basepath
  112. def close(self):
  113. if self.con:
  114. self.con.commit()
  115. self.con.close()
  116. self.data.close()
  117. def _sql_schema_update(self):
  118. cur = self.con.cursor()
  119. version = cur.execute("PRAGMA user_version").fetchone()[0]
  120. oldversion = version
  121. while version in _sql_schema_updates:
  122. cur.executescript(_sql_schema_updates[version])
  123. version = version + 1
  124. if self.verbose: # pragma: no cover
  125. printf("Schema updated to %d\n", version)
  126. if version != oldversion:
  127. with self.con:
  128. cur.execute("PRAGMA user_version = {v:d}".format(v=version))
  129. @nilmdb.utils.lru_cache(size = 16)
  130. def _get_intervals(self, stream_id):
  131. """
  132. Return a mutable IntervalSet corresponding to the given stream ID.
  133. """
  134. iset = IntervalSet()
  135. result = self.con.execute("SELECT start_time, end_time, "
  136. "start_pos, end_pos "
  137. "FROM ranges "
  138. "WHERE stream_id=?", (stream_id,))
  139. try:
  140. for (start_time, end_time, start_pos, end_pos) in result:
  141. iset += DBInterval(start_time, end_time,
  142. start_time, end_time,
  143. start_pos, end_pos)
  144. except IntervalError as e: # pragma: no cover
  145. raise NilmDBError("unexpected overlap in ranges table!")
  146. return iset
  147. def _sql_interval_insert(self, id, start, end, start_pos, end_pos):
  148. """Helper that adds interval to the SQL database only"""
  149. self.con.execute("INSERT INTO ranges "
  150. "(stream_id,start_time,end_time,start_pos,end_pos) "
  151. "VALUES (?,?,?,?,?)",
  152. (id, start, end, start_pos, end_pos))
  153. def _sql_interval_delete(self, id, start, end, start_pos, end_pos):
  154. """Helper that removes interval from the SQL database only"""
  155. self.con.execute("DELETE FROM ranges WHERE "
  156. "stream_id=? AND start_time=? AND "
  157. "end_time=? AND start_pos=? AND end_pos=?",
  158. (id, start, end, start_pos, end_pos))
  159. def _add_interval(self, stream_id, interval, start_pos, end_pos):
  160. """
  161. Add interval to the internal interval cache, and to the database.
  162. Note: arguments must be ints (not numpy.int64, etc)
  163. """
  164. # Load this stream's intervals
  165. iset = self._get_intervals(stream_id)
  166. # Check for overlap
  167. if iset.intersects(interval): # pragma: no cover (gets caught earlier)
  168. raise NilmDBError("new interval overlaps existing data")
  169. # Check for adjacency. If there's a stream in the database
  170. # that ends exactly when this one starts, and the database
  171. # rows match up, we can make one interval that covers the
  172. # time range [adjacent.start -> interval.end)
  173. # and database rows [ adjacent.start_pos -> end_pos ].
  174. # Only do this if the resulting interval isn't too large.
  175. max_merged_rows = 8000 * 60 * 60 * 1.05 # 1.05 hours at 8 KHz
  176. adjacent = iset.find_end(interval.start)
  177. if (adjacent is not None and
  178. start_pos == adjacent.db_endpos and
  179. (end_pos - adjacent.db_startpos) < max_merged_rows):
  180. # First delete the old one, both from our iset and the
  181. # database
  182. iset -= adjacent
  183. self._sql_interval_delete(stream_id,
  184. adjacent.db_start, adjacent.db_end,
  185. adjacent.db_startpos, adjacent.db_endpos)
  186. # Now update our interval so the fallthrough add is
  187. # correct.
  188. interval.start = adjacent.start
  189. start_pos = adjacent.db_startpos
  190. # Add the new interval to the iset
  191. iset.iadd_nocheck(DBInterval(interval.start, interval.end,
  192. interval.start, interval.end,
  193. start_pos, end_pos))
  194. # Insert into the database
  195. self._sql_interval_insert(stream_id, interval.start, interval.end,
  196. int(start_pos), int(end_pos))
  197. self.con.commit()
  198. def _remove_interval(self, stream_id, original, remove):
  199. """
  200. Remove an interval from the internal cache and the database.
  201. stream_id: id of stream
  202. original: original DBInterval; must be already present in DB
  203. to_remove: DBInterval to remove; must be subset of 'original'
  204. """
  205. # Just return if we have nothing to remove
  206. if remove.start == remove.end: # pragma: no cover
  207. return
  208. # Load this stream's intervals
  209. iset = self._get_intervals(stream_id)
  210. # Remove existing interval from the cached set and the database
  211. iset -= original
  212. self._sql_interval_delete(stream_id,
  213. original.db_start, original.db_end,
  214. original.db_startpos, original.db_endpos)
  215. # Add back the intervals that would be left over if the
  216. # requested interval is removed. There may be two of them, if
  217. # the removed piece was in the middle.
  218. def add(iset, start, end, start_pos, end_pos):
  219. iset += DBInterval(start, end, start, end, start_pos, end_pos)
  220. self._sql_interval_insert(stream_id, start, end, start_pos, end_pos)
  221. if original.start != remove.start:
  222. # Interval before the removed region
  223. add(iset, original.start, remove.start,
  224. original.db_startpos, remove.db_startpos)
  225. if original.end != remove.end:
  226. # Interval after the removed region
  227. add(iset, remove.end, original.end,
  228. remove.db_endpos, original.db_endpos)
  229. # Commit SQL changes
  230. self.con.commit()
  231. return
  232. def stream_list(self, path = None, layout = None):
  233. """Return list of [path, layout] lists of all streams
  234. in the database.
  235. If path is specified, include only streams with a path that
  236. matches the given string.
  237. If layout is specified, include only streams with a layout
  238. that matches the given string.
  239. """
  240. where = "WHERE 1=1"
  241. params = ()
  242. if layout:
  243. where += " AND layout=?"
  244. params += (layout,)
  245. if path:
  246. where += " AND path=?"
  247. params += (path,)
  248. result = self.con.execute("SELECT path, layout "
  249. "FROM streams " + where, params).fetchall()
  250. return sorted(list(x) for x in result)
  251. def stream_intervals(self, path, start = None, end = None):
  252. """
  253. Returns (intervals, restart) tuple.
  254. intervals is a list of [start,end] timestamps of all intervals
  255. that exist for path, between start and end.
  256. restart, if nonzero, means that there were too many results to
  257. return in a single request. The data is complete from the
  258. starting timestamp to the point at which it was truncated,
  259. and a new request with a start time of 'restart' will fetch
  260. the next block of data.
  261. """
  262. stream_id = self._stream_id(path)
  263. intervals = self._get_intervals(stream_id)
  264. requested = Interval(start or 0, end or 1e12)
  265. result = []
  266. for n, i in enumerate(intervals.intersection(requested)):
  267. if n >= self.max_results:
  268. restart = i.start
  269. break
  270. result.append([i.start, i.end])
  271. else:
  272. restart = 0
  273. return (result, restart)
  274. def stream_create(self, path, layout_name):
  275. """Create a new table in the database.
  276. path: path to the data (e.g. '/newton/prep').
  277. Paths must contain at least two elements, e.g.:
  278. /newton/prep
  279. /newton/raw
  280. /newton/upstairs/prep
  281. /newton/upstairs/raw
  282. layout_name: string for nilmdb.layout.get_named(), e.g. 'float32_8'
  283. """
  284. # Create the bulk storage. Raises ValueError on error, which we
  285. # pass along.
  286. self.data.create(path, layout_name)
  287. # Insert into SQL database once the bulk storage is happy
  288. with self.con as con:
  289. con.execute("INSERT INTO streams (path, layout) VALUES (?,?)",
  290. (path, layout_name))
  291. def _stream_id(self, path):
  292. """Return unique stream ID"""
  293. result = self.con.execute("SELECT id FROM streams WHERE path=?",
  294. (path,)).fetchone()
  295. if result is None:
  296. raise StreamError("No stream at path " + path)
  297. return result[0]
  298. def stream_set_metadata(self, path, data):
  299. """Set stream metadata from a dictionary, e.g.
  300. { description = 'Downstairs lighting',
  301. v_scaling = 123.45 }
  302. This replaces all existing metadata.
  303. """
  304. stream_id = self._stream_id(path)
  305. with self.con as con:
  306. con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,))
  307. for key in data:
  308. if data[key] != '':
  309. con.execute("INSERT INTO metadata VALUES (?, ?, ?)",
  310. (stream_id, key, data[key]))
  311. def stream_get_metadata(self, path):
  312. """Return stream metadata as a dictionary."""
  313. stream_id = self._stream_id(path)
  314. result = self.con.execute("SELECT metadata.key, metadata.value "
  315. "FROM metadata "
  316. "WHERE metadata.stream_id=?", (stream_id,))
  317. data = {}
  318. for (key, value) in result:
  319. data[key] = value
  320. return data
  321. def stream_update_metadata(self, path, newdata):
  322. """Update stream metadata from a dictionary"""
  323. data = self.stream_get_metadata(path)
  324. data.update(newdata)
  325. self.stream_set_metadata(path, data)
  326. def stream_destroy(self, path):
  327. """Fully remove a table and all of its data from the database.
  328. No way to undo it! Metadata is removed."""
  329. stream_id = self._stream_id(path)
  330. # Delete the cached interval data (if it was cached)
  331. self._get_intervals.cache_remove(self, stream_id)
  332. # Delete the data
  333. self.data.destroy(path)
  334. # Delete metadata, stream, intervals
  335. with self.con as con:
  336. con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,))
  337. con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
  338. con.execute("DELETE FROM streams WHERE id=?", (stream_id,))
  339. def stream_insert(self, path, start, end, data):
  340. """Insert new data into the database.
  341. path: Path at which to add the data
  342. start: Starting timestamp
  343. end: Ending timestamp
  344. data: Rows of data, to be passed to PyTable's table.append
  345. method. E.g. nilmdb.layout.Parser.data
  346. """
  347. # First check for basic overlap using timestamp info given.
  348. stream_id = self._stream_id(path)
  349. iset = self._get_intervals(stream_id)
  350. interval = Interval(start, end)
  351. if iset.intersects(interval):
  352. raise OverlapError("new data overlaps existing data at range: "
  353. + str(iset & interval))
  354. # Insert the data
  355. table = self.data.getnode(path)
  356. row_start = table.nrows
  357. table.append(data)
  358. row_end = table.nrows
  359. # Insert the record into the sql database.
  360. self._add_interval(stream_id, interval, row_start, row_end)
  361. # And that's all
  362. return "ok"
  363. def _find_start(self, table, dbinterval):
  364. """
  365. Given a DBInterval, find the row in the database that
  366. corresponds to the start time. Return the first database
  367. position with a timestamp (first element) greater than or
  368. equal to 'start'.
  369. """
  370. # Optimization for the common case where an interval wasn't truncated
  371. if dbinterval.start == dbinterval.db_start:
  372. return dbinterval.db_startpos
  373. return bisect.bisect_left(bulkdata.TimestampOnlyTable(table),
  374. dbinterval.start,
  375. dbinterval.db_startpos,
  376. dbinterval.db_endpos)
  377. def _find_end(self, table, dbinterval):
  378. """
  379. Given a DBInterval, find the row in the database that follows
  380. the end time. Return the first database position after the
  381. row with timestamp (first element) greater than or equal
  382. to 'end'.
  383. """
  384. # Optimization for the common case where an interval wasn't truncated
  385. if dbinterval.end == dbinterval.db_end:
  386. return dbinterval.db_endpos
  387. # Note that we still use bisect_left here, because we don't
  388. # want to include the given timestamp in the results. This is
  389. # so a queries like 1:00 -> 2:00 and 2:00 -> 3:00 return
  390. # non-overlapping data.
  391. return bisect.bisect_left(bulkdata.TimestampOnlyTable(table),
  392. dbinterval.end,
  393. dbinterval.db_startpos,
  394. dbinterval.db_endpos)
  395. def stream_extract(self, path, start = None, end = None, count = False):
  396. """
  397. Returns (data, restart) tuple.
  398. data is a list of raw data from the database, suitable for
  399. passing to e.g. nilmdb.layout.Formatter to translate into
  400. textual form.
  401. restart, if nonzero, means that there were too many results to
  402. return in a single request. The data is complete from the
  403. starting timestamp to the point at which it was truncated,
  404. and a new request with a start time of 'restart' will fetch
  405. the next block of data.
  406. count, if true, means to not return raw data, but just the count
  407. of rows that would have been returned. This is much faster
  408. than actually fetching the data. It is not limited by
  409. max_results.
  410. """
  411. table = self.data.getnode(path)
  412. stream_id = self._stream_id(path)
  413. intervals = self._get_intervals(stream_id)
  414. requested = Interval(start or 0, end or 1e12)
  415. result = []
  416. matched = 0
  417. remaining = self.max_results
  418. restart = 0
  419. for interval in intervals.intersection(requested):
  420. # Reading single rows from the table is too slow, so
  421. # we use two bisections to find both the starting and
  422. # ending row for this particular interval, then
  423. # read the entire range as one slice.
  424. row_start = self._find_start(table, interval)
  425. row_end = self._find_end(table, interval)
  426. if count:
  427. matched += row_end - row_start
  428. continue
  429. # Shorten it if we'll hit the maximum number of results
  430. row_max = row_start + remaining
  431. if row_max < row_end:
  432. row_end = row_max
  433. restart = table[row_max][0]
  434. # Gather these results up
  435. result.extend(table[row_start:row_end])
  436. # Count them
  437. remaining -= row_end - row_start
  438. if restart:
  439. break
  440. if count:
  441. return matched
  442. return (result, restart)
  443. def stream_remove(self, path, start = None, end = None):
  444. """
  445. Remove data from the specified time interval within a stream.
  446. Removes all data in the interval [start, end), and intervals
  447. are truncated or split appropriately. Returns the number of
  448. data points removed.
  449. """
  450. table = self.data.getnode(path)
  451. stream_id = self._stream_id(path)
  452. intervals = self._get_intervals(stream_id)
  453. to_remove = Interval(start or 0, end or 1e12)
  454. removed = 0
  455. if start == end:
  456. return 0
  457. # Can't remove intervals from within the iterator, so we need to
  458. # remember what's currently in the intersection now.
  459. all_candidates = list(intervals.intersection(to_remove, orig = True))
  460. for (dbint, orig) in all_candidates:
  461. # Find row start and end
  462. row_start = self._find_start(table, dbint)
  463. row_end = self._find_end(table, dbint)
  464. # Adjust the DBInterval to match the newly found ends
  465. dbint.db_start = dbint.start
  466. dbint.db_end = dbint.end
  467. dbint.db_startpos = row_start
  468. dbint.db_endpos = row_end
  469. # Remove interval from the database
  470. self._remove_interval(stream_id, orig, dbint)
  471. # Remove data from the underlying table storage
  472. table.remove(row_start, row_end)
  473. # Count how many were removed
  474. removed += row_end - row_start
  475. return removed