You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

236 lines
7.9 KiB

  1. # -*- coding: utf-8 -*-
  2. """NilmDB
  3. Object that represents a NILM database file.
  4. Manages both the SQL database and the PyTables storage backend.
  5. """
  6. # Need absolute_import so that "import nilmdb" won't pull in nilmdb.py,
  7. # but will pull the nilmdb module instead.
  8. from __future__ import absolute_import
  9. import nilmdb
  10. from nilmdb.printf import *
  11. import sqlite3
  12. import tables
  13. import time
  14. import sys
  15. import os
  16. import errno
  17. # Don't touch old entries -- just add new ones.
  18. sql_schema_updates = {
  19. 0: """
  20. -- All streams
  21. CREATE TABLE streams(
  22. id INTEGER PRIMARY KEY, -- stream ID
  23. path TEXT UNIQUE NOT NULL, -- path, e.g. '/newton/prep'
  24. layout TEXT NOT NULL -- one of nilmdb.layout.layouts
  25. );
  26. -- Individual timestamped ranges in those streams.
  27. -- For a given start_time and end_time, this tells us that the
  28. -- data is stored between start_pos and end_pos.
  29. -- Times are stored as μs since Unix epoch
  30. -- Positions are opaque: PyTables rows, file offsets, etc.
  31. CREATE TABLE ranges(
  32. stream_id INTEGER,
  33. start_time INTEGER NOT NULL,
  34. end_time INTEGER NOT NULL,
  35. start_pos INTEGER NOT NULL,
  36. end_pos INTEGER NOT NULL
  37. );
  38. CREATE INDEX _ranges_index ON ranges (stream_id, start_time, end_time);
  39. """,
  40. 1: """
  41. -- Generic dictionary-type metadata that can be associated with a stream
  42. CREATE TABLE metadata(
  43. stream_id INTEGER,
  44. key TEXT NOT NULL,
  45. value TEXT
  46. );
  47. """,
  48. }
  49. class StreamError(Exception):
  50. pass
  51. class NilmDB(object):
  52. verbose = 0
  53. def __init__(self, basepath):
  54. # set up path
  55. self.basepath = basepath.rstrip('/')
  56. # Create the database path if it doesn't exist
  57. try:
  58. os.makedirs(self.basepath)
  59. except OSError as e:
  60. if e.errno != errno.EEXIST:
  61. raise IOError("can't create tree " + self.basepath)
  62. # Our HD5 file goes inside it
  63. h5filename = os.path.abspath(self.basepath + "/data.h5")
  64. self.h5file = tables.openFile(h5filename, "a", "NILM Database")
  65. # SQLite database too
  66. sqlfilename = os.path.abspath(self.basepath + "/data.sql")
  67. # We use check_same_thread = False, assuming that the rest
  68. # of the code (e.g. Server) will be smart and not access this
  69. # database from multiple threads simultaneously. That requirement
  70. # may be relaxed later.
  71. self.con = sqlite3.connect(sqlfilename, check_same_thread = False)
  72. self.sql_schema_update()
  73. self.opened = True
  74. def __del__(self):
  75. if "opened" in self.__dict__: # pragma: no cover
  76. fprintf(sys.stderr,
  77. "error: NilmDB.close() wasn't called, path %s",
  78. self.basepath)
  79. def close(self):
  80. if self.con:
  81. self.con.commit()
  82. self.con.close()
  83. self.h5file.close()
  84. del self.opened
  85. def sql_schema_update(self):
  86. cur = self.con.cursor()
  87. version = cur.execute("PRAGMA user_version").fetchone()[0]
  88. oldversion = version
  89. while version in sql_schema_updates:
  90. cur.executescript(sql_schema_updates[version])
  91. version = version + 1
  92. if self.verbose: # pragma: no cover
  93. printf("Schema updated to %d\n", version)
  94. if version != oldversion:
  95. # this takes .1 seconds, so only do it if necessary
  96. with self.con:
  97. cur.execute("PRAGMA user_version = {v:d}".format(v=version))
  98. def stream_list(self, path = None, layout = None):
  99. """Return list of [path, layout] lists of all streams
  100. in the database.
  101. If path is specified, include only streams with a path that
  102. matches the given string.
  103. If layout is specified, include only streams with a layout
  104. that matches the given string.
  105. """
  106. where = "WHERE 1=1"
  107. params = ()
  108. if layout:
  109. where += " AND layout=?"
  110. params += (layout,)
  111. if path:
  112. where += " AND path=?"
  113. params += (path,)
  114. result = self.con.execute("SELECT path, layout " +
  115. "FROM streams " + where, params).fetchall()
  116. return sorted(list(x) for x in result)
  117. def stream_create(self, path, layout_name, index = None):
  118. """Create a new table in the database.
  119. path: path to the data (e.g. '/newton/prep')
  120. layout_name: one of the nilmdb.layout.layouts keys, e.g. 'PrepData'
  121. index: list of layout columns to be marked as PyTables indices.
  122. If index = none, the 'timestamp' column is indexed if it exists.
  123. Pass an empty list to prevent indexing.
  124. """
  125. if path[0] != '/':
  126. raise ValueError("paths must start with /")
  127. [ group, node ] = path.rsplit("/", 1)
  128. if group == '':
  129. raise ValueError("Invalid path")
  130. # Make the group structure, one element at a time
  131. group_path = group.lstrip('/').split("/")
  132. for i in range(len(group_path)):
  133. parent = "/" + "/".join(group_path[0:i])
  134. child = group_path[i]
  135. try:
  136. self.h5file.createGroup(parent, child)
  137. except tables.NodeError:
  138. pass
  139. # Get description
  140. desc = nilmdb.layout.description(layout_name)
  141. # Estimated table size (for PyTables optimization purposes): assume
  142. # 3 months worth of data. It's OK if this is wrong.
  143. exp_rows = nilmdb.layout.expected_daily_rows(layout_name) * 90
  144. table = self.h5file.createTable(group, node,
  145. description = desc,
  146. expectedrows = exp_rows)
  147. # Create indices
  148. try:
  149. if index is None and "timestamp" in table.colnames:
  150. index = [ "timestamp" ]
  151. for ind in index:
  152. table.cols._f_col(str(ind)).createIndex()
  153. except KeyError as e:
  154. # Remove this table if we got an error
  155. self.h5file.removeNode(group, node)
  156. raise e
  157. # Insert into SQL database once the PyTables is happy
  158. with self.con as con:
  159. con.execute("INSERT INTO streams (path, layout) VALUES (?,?)",
  160. (path, layout_name))
  161. def stream_id(self, path):
  162. """Return unique stream ID"""
  163. result = self.con.execute("SELECT id FROM streams WHERE path=?",
  164. (path,)).fetchone()
  165. if result is None:
  166. raise StreamError("No stream at path " + path)
  167. return result[0]
  168. def stream_set_metadata(self, path, data):
  169. """Set stream metadata from a dictionary, e.g.
  170. { description = 'Downstairs lighting',
  171. v_scaling = 123.45 }
  172. This replaces all existing metadata.
  173. """
  174. stream_id = self.stream_id(path)
  175. with self.con as con:
  176. for key in data:
  177. con.execute("DELETE FROM metadata "
  178. "WHERE stream_id=? AND key=?", (stream_id, key))
  179. if data[key] != '':
  180. con.execute("INSERT INTO metadata VALUES (?, ?, ?)",
  181. (stream_id, key, data[key]))
  182. def stream_get_metadata(self, path):
  183. """Return stream metadata as a dictionary."""
  184. stream_id = self.stream_id(path)
  185. result = self.con.execute("SELECT metadata.key, metadata.value "
  186. "FROM metadata "
  187. "WHERE metadata.stream_id=?", (stream_id,))
  188. data = {}
  189. for (key, value) in result:
  190. data[key] = value
  191. return data
  192. def stream_update_metadata(self, path, newdata):
  193. """Update stream metadata from a dictionary"""
  194. data = self.stream_get_metadata(path)
  195. data.update(newdata)
  196. self.stream_set_metadata(path, data)