You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

328 lines
12 KiB

  1. # -*- coding: utf-8 -*-
  2. """NilmDB
  3. Object that represents a NILM database file.
  4. Manages both the SQL database and the PyTables storage backend.
  5. """
  6. # Need absolute_import so that "import nilmdb" won't pull in nilmdb.py,
  7. # but will pull the nilmdb module instead.
  8. from __future__ import absolute_import
  9. import nilmdb
  10. from nilmdb.printf import *
  11. from nilmdb.interval import Interval, IntervalSet, IntervalError
  12. import sqlite3
  13. import tables
  14. import time
  15. import sys
  16. import os
  17. import errno
  18. # Note about performance and transactions:
  19. #
  20. # Committing a transaction in the default sync mode (PRAGMA synchronous=FULL)
  21. # takes about 125msec. sqlite3 will commit transactions at 3 times:
  22. # 1: explicit con.commit()
  23. # 2: between a series of DML commands and non-DML commands, e.g.
  24. # after a series of INSERT, SELECT, but before a CREATE TABLE or PRAGMA.
  25. # 3: at the end of an explicit transaction, e.g. "with self.con as con:"
  26. #
  27. # To speed up testing, or if this transaction speed becomes an issue,
  28. # the sync=False option to NilmDB.__init__ will set PRAGMA synchronous=OFF.
  29. # Don't touch old entries -- just add new ones.
  30. _sql_schema_updates = {
  31. 0: """
  32. -- All streams
  33. CREATE TABLE streams(
  34. id INTEGER PRIMARY KEY, -- stream ID
  35. path TEXT UNIQUE NOT NULL, -- path, e.g. '/newton/prep'
  36. layout TEXT NOT NULL -- one of nilmdb.layout.layouts
  37. );
  38. -- Individual timestamped ranges in those streams.
  39. -- For a given start_time and end_time, this tells us that the
  40. -- data is stored between start_pos and end_pos.
  41. -- Times are stored as μs since Unix epoch
  42. -- Positions are opaque: PyTables rows, file offsets, etc.
  43. CREATE TABLE ranges(
  44. stream_id INTEGER NOT NULL,
  45. start_time INTEGER NOT NULL,
  46. end_time INTEGER NOT NULL,
  47. start_pos INTEGER NOT NULL,
  48. end_pos INTEGER NOT NULL
  49. );
  50. CREATE INDEX _ranges_index ON ranges (stream_id, start_time, end_time);
  51. """,
  52. 1: """
  53. -- Generic dictionary-type metadata that can be associated with a stream
  54. CREATE TABLE metadata(
  55. stream_id INTEGER NOT NULL,
  56. key TEXT NOT NULL,
  57. value TEXT
  58. );
  59. """,
  60. }
  61. class NilmDBError(Exception):
  62. """Base exception for NilmDB errors"""
  63. def __init__(self, message = "Unspecified error"):
  64. Exception.__init__(self, self.__class__.__name__ + ": " + message)
  65. class StreamError(NilmDBError):
  66. pass
  67. class OverlapError(NilmDBError):
  68. pass
  69. class NilmDB(object):
  70. verbose = 0
  71. def __init__(self, basepath, sync=True):
  72. # set up path
  73. self.basepath = os.path.abspath(basepath.rstrip('/'))
  74. # Create the database path if it doesn't exist
  75. try:
  76. os.makedirs(self.basepath)
  77. except OSError as e:
  78. if e.errno != errno.EEXIST:
  79. raise IOError("can't create tree " + self.basepath)
  80. # Our HD5 file goes inside it
  81. h5filename = os.path.abspath(self.basepath + "/data.h5")
  82. self.h5file = tables.openFile(h5filename, "a", "NILM Database")
  83. # SQLite database too
  84. sqlfilename = os.path.abspath(self.basepath + "/data.sql")
  85. # We use check_same_thread = False, assuming that the rest
  86. # of the code (e.g. Server) will be smart and not access this
  87. # database from multiple threads simultaneously. That requirement
  88. # may be relaxed later.
  89. self.con = sqlite3.connect(sqlfilename, check_same_thread = False)
  90. self._sql_schema_update()
  91. # See big comment at top about the performance implications of this
  92. if sync:
  93. self.con.execute("PRAGMA synchronous=FULL")
  94. else:
  95. self.con.execute("PRAGMA synchronous=OFF")
  96. self.opened = True
  97. def __del__(self):
  98. if "opened" in self.__dict__: # pragma: no cover
  99. fprintf(sys.stderr,
  100. "error: NilmDB.close() wasn't called, path %s",
  101. self.basepath)
  102. def get_basepath(self):
  103. return self.basepath
  104. def close(self):
  105. if self.con:
  106. self.con.commit()
  107. self.con.close()
  108. self.h5file.close()
  109. del self.opened
  110. def _sql_schema_update(self):
  111. cur = self.con.cursor()
  112. version = cur.execute("PRAGMA user_version").fetchone()[0]
  113. oldversion = version
  114. while version in _sql_schema_updates:
  115. cur.executescript(_sql_schema_updates[version])
  116. version = version + 1
  117. if self.verbose: # pragma: no cover
  118. printf("Schema updated to %d\n", version)
  119. if version != oldversion:
  120. with self.con:
  121. cur.execute("PRAGMA user_version = {v:d}".format(v=version))
  122. def _get_intervals(self, stream_id):
  123. """
  124. Return an IntervalSet corresponding to the given stream ID.
  125. """
  126. # Could cache these, if it's a performance bottleneck
  127. iset = IntervalSet()
  128. result = self.con.execute("SELECT start_time, end_time "
  129. "FROM ranges "
  130. "WHERE stream_id=?", (stream_id,))
  131. try:
  132. for (start, end) in result:
  133. iset += Interval(start, end)
  134. except IntervalError as e: # pragma: no cover
  135. raise NilmDBError("unexpected overlap in ranges table!")
  136. return iset
  137. def _add_interval(self, stream_id, interval, start_pos, end_pos):
  138. # Arguments must be ints (not numpy.int64, etc)
  139. self.con.execute("INSERT INTO ranges "
  140. "(stream_id,start_time,end_time,start_pos,end_pos) "
  141. "VALUES (?,?,?,?,?)",
  142. (stream_id, interval.start, interval.end,
  143. int(start_pos), int(end_pos)))
  144. def stream_list(self, path = None, layout = None):
  145. """Return list of [path, layout] lists of all streams
  146. in the database.
  147. If path is specified, include only streams with a path that
  148. matches the given string.
  149. If layout is specified, include only streams with a layout
  150. that matches the given string.
  151. """
  152. where = "WHERE 1=1"
  153. params = ()
  154. if layout:
  155. where += " AND layout=?"
  156. params += (layout,)
  157. if path:
  158. where += " AND path=?"
  159. params += (path,)
  160. result = self.con.execute("SELECT path, layout "
  161. "FROM streams " + where, params).fetchall()
  162. return sorted(list(x) for x in result)
  163. def stream_create(self, path, layout_name, index = None):
  164. """Create a new table in the database.
  165. path: path to the data (e.g. '/newton/prep').
  166. Paths must contain at least two elements, e.g.:
  167. /newton/prep
  168. /newton/raw
  169. /newton/upstairs/prep
  170. /newton/upstairs/raw
  171. layout_name: one of the nilmdb.layout.layouts keys, e.g. 'PrepData'
  172. index: list of layout columns to be marked as PyTables indices.
  173. If index = none, the 'timestamp' column is indexed if it exists.
  174. Pass an empty list to prevent indexing.
  175. """
  176. if path[0] != '/':
  177. raise ValueError("paths must start with /")
  178. [ group, node ] = path.rsplit("/", 1)
  179. if group == '':
  180. raise ValueError("invalid path")
  181. # Make the group structure, one element at a time
  182. group_path = group.lstrip('/').split("/")
  183. for i in range(len(group_path)):
  184. parent = "/" + "/".join(group_path[0:i])
  185. child = group_path[i]
  186. try:
  187. self.h5file.createGroup(parent, child)
  188. except tables.NodeError:
  189. pass
  190. # Get description
  191. try:
  192. desc = nilmdb.layout.named[layout_name].description()
  193. except KeyError:
  194. raise ValueError("no such layout")
  195. # Estimated table size (for PyTables optimization purposes): assume
  196. # 3 months worth of data. It's OK if this is wrong.
  197. exp_rows = nilmdb.layout.named[layout_name].rate_hz * 60 * 60 * 24 * 30 * 3
  198. table = self.h5file.createTable(group, node,
  199. description = desc,
  200. expectedrows = exp_rows)
  201. # Create indices
  202. try:
  203. if index is None and "timestamp" in table.colnames:
  204. index = [ "timestamp" ]
  205. for ind in index:
  206. table.cols._f_col(str(ind)).createIndex()
  207. except KeyError as e:
  208. # Remove this table if we got an error
  209. self.h5file.removeNode(group, node)
  210. raise e
  211. # Insert into SQL database once the PyTables is happy
  212. with self.con as con:
  213. con.execute("INSERT INTO streams (path, layout) VALUES (?,?)",
  214. (path, layout_name))
  215. def _stream_id(self, path):
  216. """Return unique stream ID"""
  217. result = self.con.execute("SELECT id FROM streams WHERE path=?",
  218. (path,)).fetchone()
  219. if result is None:
  220. raise StreamError("No stream at path " + path)
  221. return result[0]
  222. def stream_set_metadata(self, path, data):
  223. """Set stream metadata from a dictionary, e.g.
  224. { description = 'Downstairs lighting',
  225. v_scaling = 123.45 }
  226. This replaces all existing metadata.
  227. """
  228. stream_id = self._stream_id(path)
  229. with self.con as con:
  230. con.execute("DELETE FROM metadata "
  231. "WHERE stream_id=?", (stream_id,))
  232. for key in data:
  233. if data[key] != '':
  234. con.execute("INSERT INTO metadata VALUES (?, ?, ?)",
  235. (stream_id, key, data[key]))
  236. def stream_get_metadata(self, path):
  237. """Return stream metadata as a dictionary."""
  238. stream_id = self._stream_id(path)
  239. result = self.con.execute("SELECT metadata.key, metadata.value "
  240. "FROM metadata "
  241. "WHERE metadata.stream_id=?", (stream_id,))
  242. data = {}
  243. for (key, value) in result:
  244. data[key] = value
  245. return data
  246. def stream_update_metadata(self, path, newdata):
  247. """Update stream metadata from a dictionary"""
  248. data = self.stream_get_metadata(path)
  249. data.update(newdata)
  250. self.stream_set_metadata(path, data)
  251. def stream_insert(self, path, parser):
  252. """Insert new data into the database.
  253. path: Path at which to add the data
  254. parser: nilmdb.layout.Parser instance full of data to insert
  255. """
  256. if (not parser.min_timestamp or not parser.max_timestamp or
  257. not len(parser.data)):
  258. raise StreamError("no data provided")
  259. # First check for basic overlap using timestamp info from the parser.
  260. stream_id = self._stream_id(path)
  261. iset = self._get_intervals(stream_id)
  262. interval = Interval(parser.min_timestamp, parser.max_timestamp)
  263. if iset.intersects(interval):
  264. raise OverlapError("new data overlaps existing data: "
  265. + str(iset & interval))
  266. # Insert the data into pytables
  267. table = self.h5file.getNode(path)
  268. row_start = table.nrows
  269. table.append(parser.data)
  270. row_end = table.nrows
  271. table.flush()
  272. # Insert the record into the sql database.
  273. # Casts are to convert from numpy.int64.
  274. self._add_interval(stream_id, interval, int(row_start), int(row_end))
  275. # And that's all
  276. return "ok"