# -*- coding: utf-8 -*- """NilmDB Object that represents a NILM database file. Manages both the SQL database and the table storage backend. """ # Need absolute_import so that "import nilmdb" won't pull in # nilmdb.py, but will pull the parent nilmdb module instead. from __future__ import absolute_import import nilmdb from nilmdb.utils.printf import * from nilmdb.server.interval import (Interval, DBInterval, IntervalSet, IntervalError) from nilmdb.server import bulkdata from nilmdb.server.errors import NilmDBError, StreamError, OverlapError import sqlite3 import os import errno import bisect # Note about performance and transactions: # # Committing a transaction in the default sync mode (PRAGMA synchronous=FULL) # takes about 125msec. sqlite3 will commit transactions at 3 times: # 1: explicit con.commit() # 2: between a series of DML commands and non-DML commands, e.g. # after a series of INSERT, SELECT, but before a CREATE TABLE or PRAGMA. # 3: at the end of an explicit transaction, e.g. "with self.con as con:" # # To speed up testing, or if this transaction speed becomes an issue, # the sync=False option to NilmDB.__init__ will set PRAGMA synchronous=OFF. # Don't touch old entries -- just add new ones. _sql_schema_updates = { 0: """ -- All streams CREATE TABLE streams( id INTEGER PRIMARY KEY, -- stream ID path TEXT UNIQUE NOT NULL, -- path, e.g. '/newton/prep' layout TEXT NOT NULL -- layout name, e.g. float32_8 ); -- Individual timestamped ranges in those streams. -- For a given start_time and end_time, this tells us that the -- data is stored between start_pos and end_pos. -- Times are stored as μs since Unix epoch -- Positions are opaque: PyTables rows, file offsets, etc. -- -- Note: end_pos points to the row _after_ end_time, so end_pos-1 -- is the last valid row. CREATE TABLE ranges( stream_id INTEGER NOT NULL, start_time INTEGER NOT NULL, end_time INTEGER NOT NULL, start_pos INTEGER NOT NULL, end_pos INTEGER NOT NULL ); CREATE INDEX _ranges_index ON ranges (stream_id, start_time, end_time); """, 1: """ -- Generic dictionary-type metadata that can be associated with a stream CREATE TABLE metadata( stream_id INTEGER NOT NULL, key TEXT NOT NULL, value TEXT ); """, } @nilmdb.utils.must_close() class NilmDB(object): verbose = 0 def __init__(self, basepath, sync=True, max_results=None, bulkdata_args=None): if bulkdata_args is None: bulkdata_args = {} # set up path self.basepath = os.path.abspath(basepath) # Create the database path if it doesn't exist try: os.makedirs(self.basepath) except OSError as e: if e.errno != errno.EEXIST: raise IOError("can't create tree " + self.basepath) # Our data goes inside it self.data = bulkdata.BulkData(self.basepath, **bulkdata_args) # SQLite database too sqlfilename = os.path.join(self.basepath, "data.sql") # We use check_same_thread = False, assuming that the rest # of the code (e.g. Server) will be smart and not access this # database from multiple threads simultaneously. Otherwise # false positives will occur when the database is only opened # in one thread, and only accessed in another. self.con = sqlite3.connect(sqlfilename, check_same_thread = False) self._sql_schema_update() # See big comment at top about the performance implications of this if sync: self.con.execute("PRAGMA synchronous=FULL") else: self.con.execute("PRAGMA synchronous=OFF") # Approximate largest number of elements that we want to send # in a single reply (for stream_intervals, stream_extract) if max_results: self.max_results = max_results else: self.max_results = 16384 def get_basepath(self): return self.basepath def close(self): if self.con: self.con.commit() self.con.close() self.data.close() def _sql_schema_update(self): cur = self.con.cursor() version = cur.execute("PRAGMA user_version").fetchone()[0] oldversion = version while version in _sql_schema_updates: cur.executescript(_sql_schema_updates[version]) version = version + 1 if self.verbose: # pragma: no cover printf("Schema updated to %d\n", version) if version != oldversion: with self.con: cur.execute("PRAGMA user_version = {v:d}".format(v=version)) @nilmdb.utils.lru_cache(size = 16) def _get_intervals(self, stream_id): """ Return a mutable IntervalSet corresponding to the given stream ID. """ iset = IntervalSet() result = self.con.execute("SELECT start_time, end_time, " "start_pos, end_pos " "FROM ranges " "WHERE stream_id=?", (stream_id,)) try: for (start_time, end_time, start_pos, end_pos) in result: iset += DBInterval(start_time, end_time, start_time, end_time, start_pos, end_pos) except IntervalError: # pragma: no cover raise NilmDBError("unexpected overlap in ranges table!") return iset def _sql_interval_insert(self, id, start, end, start_pos, end_pos): """Helper that adds interval to the SQL database only""" self.con.execute("INSERT INTO ranges " "(stream_id,start_time,end_time,start_pos,end_pos) " "VALUES (?,?,?,?,?)", (id, start, end, start_pos, end_pos)) def _sql_interval_delete(self, id, start, end, start_pos, end_pos): """Helper that removes interval from the SQL database only""" self.con.execute("DELETE FROM ranges WHERE " "stream_id=? AND start_time=? AND " "end_time=? AND start_pos=? AND end_pos=?", (id, start, end, start_pos, end_pos)) def _add_interval(self, stream_id, interval, start_pos, end_pos): """ Add interval to the internal interval cache, and to the database. Note: arguments must be ints (not numpy.int64, etc) """ # Load this stream's intervals iset = self._get_intervals(stream_id) # Check for overlap if iset.intersects(interval): # pragma: no cover (gets caught earlier) raise NilmDBError("new interval overlaps existing data") # Check for adjacency. If there's a stream in the database # that ends exactly when this one starts, and the database # rows match up, we can make one interval that covers the # time range [adjacent.start -> interval.end) # and database rows [ adjacent.start_pos -> end_pos ]. # Only do this if the resulting interval isn't too large. max_merged_rows = 8000 * 60 * 60 * 1.05 # 1.05 hours at 8 KHz adjacent = iset.find_end(interval.start) if (adjacent is not None and start_pos == adjacent.db_endpos and (end_pos - adjacent.db_startpos) < max_merged_rows): # First delete the old one, both from our iset and the # database iset -= adjacent self._sql_interval_delete(stream_id, adjacent.db_start, adjacent.db_end, adjacent.db_startpos, adjacent.db_endpos) # Now update our interval so the fallthrough add is # correct. interval.start = adjacent.start start_pos = adjacent.db_startpos # Add the new interval to the iset iset.iadd_nocheck(DBInterval(interval.start, interval.end, interval.start, interval.end, start_pos, end_pos)) # Insert into the database self._sql_interval_insert(stream_id, interval.start, interval.end, int(start_pos), int(end_pos)) self.con.commit() def _remove_interval(self, stream_id, original, remove): """ Remove an interval from the internal cache and the database. stream_id: id of stream original: original DBInterval; must be already present in DB to_remove: DBInterval to remove; must be subset of 'original' """ # Just return if we have nothing to remove if remove.start == remove.end: # pragma: no cover return # Load this stream's intervals iset = self._get_intervals(stream_id) # Remove existing interval from the cached set and the database iset -= original self._sql_interval_delete(stream_id, original.db_start, original.db_end, original.db_startpos, original.db_endpos) # Add back the intervals that would be left over if the # requested interval is removed. There may be two of them, if # the removed piece was in the middle. def add(iset, start, end, start_pos, end_pos): iset += DBInterval(start, end, start, end, start_pos, end_pos) self._sql_interval_insert(stream_id, start, end, start_pos, end_pos) if original.start != remove.start: # Interval before the removed region add(iset, original.start, remove.start, original.db_startpos, remove.db_startpos) if original.end != remove.end: # Interval after the removed region add(iset, remove.end, original.end, remove.db_endpos, original.db_endpos) # Commit SQL changes self.con.commit() return def stream_list(self, path = None, layout = None): """Return list of [path, layout] lists of all streams in the database. If path is specified, include only streams with a path that matches the given string. If layout is specified, include only streams with a layout that matches the given string. """ where = "WHERE 1=1" params = () if layout: where += " AND layout=?" params += (layout,) if path: where += " AND path=?" params += (path,) result = self.con.execute("SELECT path, layout " "FROM streams " + where, params).fetchall() return sorted(list(x) for x in result) def stream_intervals(self, path, start = None, end = None): """ Returns (intervals, restart) tuple. intervals is a list of [start,end] timestamps of all intervals that exist for path, between start and end. restart, if nonzero, means that there were too many results to return in a single request. The data is complete from the starting timestamp to the point at which it was truncated, and a new request with a start time of 'restart' will fetch the next block of data. """ stream_id = self._stream_id(path) intervals = self._get_intervals(stream_id) requested = Interval(start or 0, end or 1e12) result = [] for n, i in enumerate(intervals.intersection(requested)): if n >= self.max_results: restart = i.start break result.append([i.start, i.end]) else: restart = 0 return (result, restart) def stream_create(self, path, layout_name): """Create a new table in the database. path: path to the data (e.g. '/newton/prep'). Paths must contain at least two elements, e.g.: /newton/prep /newton/raw /newton/upstairs/prep /newton/upstairs/raw layout_name: string for nilmdb.layout.get_named(), e.g. 'float32_8' """ # Create the bulk storage. Raises ValueError on error, which we # pass along. self.data.create(path, layout_name) # Insert into SQL database once the bulk storage is happy with self.con as con: con.execute("INSERT INTO streams (path, layout) VALUES (?,?)", (path, layout_name)) def _stream_id(self, path): """Return unique stream ID""" result = self.con.execute("SELECT id FROM streams WHERE path=?", (path,)).fetchone() if result is None: raise StreamError("No stream at path " + path) return result[0] def stream_set_metadata(self, path, data): """Set stream metadata from a dictionary, e.g. { description = 'Downstairs lighting', v_scaling = 123.45 } This replaces all existing metadata. """ stream_id = self._stream_id(path) with self.con as con: con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,)) for key in data: if data[key] != '': con.execute("INSERT INTO metadata VALUES (?, ?, ?)", (stream_id, key, data[key])) def stream_get_metadata(self, path): """Return stream metadata as a dictionary.""" stream_id = self._stream_id(path) result = self.con.execute("SELECT metadata.key, metadata.value " "FROM metadata " "WHERE metadata.stream_id=?", (stream_id,)) data = {} for (key, value) in result: data[key] = value return data def stream_update_metadata(self, path, newdata): """Update stream metadata from a dictionary""" data = self.stream_get_metadata(path) data.update(newdata) self.stream_set_metadata(path, data) def stream_destroy(self, path): """Fully remove a table and all of its data from the database. No way to undo it! Metadata is removed.""" stream_id = self._stream_id(path) # Delete the cached interval data (if it was cached) self._get_intervals.cache_remove(self, stream_id) # Delete the data self.data.destroy(path) # Delete metadata, stream, intervals with self.con as con: con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,)) con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,)) con.execute("DELETE FROM streams WHERE id=?", (stream_id,)) def stream_insert(self, path, start, end, data): """Insert new data into the database. path: Path at which to add the data start: Starting timestamp end: Ending timestamp data: Rows of data, to be passed to bulkdata table.append method. E.g. nilmdb.layout.Parser.data """ # First check for basic overlap using timestamp info given. stream_id = self._stream_id(path) iset = self._get_intervals(stream_id) interval = Interval(start, end) if iset.intersects(interval): raise OverlapError("new data overlaps existing data at range: " + str(iset & interval)) # Insert the data table = self.data.getnode(path) row_start = table.nrows table.append(data) row_end = table.nrows # Insert the record into the sql database. self._add_interval(stream_id, interval, row_start, row_end) # And that's all return "ok" def _find_start(self, table, dbinterval): """ Given a DBInterval, find the row in the database that corresponds to the start time. Return the first database position with a timestamp (first element) greater than or equal to 'start'. """ # Optimization for the common case where an interval wasn't truncated if dbinterval.start == dbinterval.db_start: return dbinterval.db_startpos return bisect.bisect_left(bulkdata.TimestampOnlyTable(table), dbinterval.start, dbinterval.db_startpos, dbinterval.db_endpos) def _find_end(self, table, dbinterval): """ Given a DBInterval, find the row in the database that follows the end time. Return the first database position after the row with timestamp (first element) greater than or equal to 'end'. """ # Optimization for the common case where an interval wasn't truncated if dbinterval.end == dbinterval.db_end: return dbinterval.db_endpos # Note that we still use bisect_left here, because we don't # want to include the given timestamp in the results. This is # so a queries like 1:00 -> 2:00 and 2:00 -> 3:00 return # non-overlapping data. return bisect.bisect_left(bulkdata.TimestampOnlyTable(table), dbinterval.end, dbinterval.db_startpos, dbinterval.db_endpos) def stream_extract(self, path, start = None, end = None, count = False): """ Returns (data, restart) tuple. data is a list of raw data from the database, suitable for passing to e.g. nilmdb.layout.Formatter to translate into textual form. restart, if nonzero, means that there were too many results to return in a single request. The data is complete from the starting timestamp to the point at which it was truncated, and a new request with a start time of 'restart' will fetch the next block of data. count, if true, means to not return raw data, but just the count of rows that would have been returned. This is much faster than actually fetching the data. It is not limited by max_results. """ stream_id = self._stream_id(path) table = self.data.getnode(path) intervals = self._get_intervals(stream_id) requested = Interval(start or 0, end or 1e12) result = [] matched = 0 remaining = self.max_results restart = 0 for interval in intervals.intersection(requested): # Reading single rows from the table is too slow, so # we use two bisections to find both the starting and # ending row for this particular interval, then # read the entire range as one slice. row_start = self._find_start(table, interval) row_end = self._find_end(table, interval) if count: matched += row_end - row_start continue # Shorten it if we'll hit the maximum number of results row_max = row_start + remaining if row_max < row_end: row_end = row_max restart = table[row_max][0] # Gather these results up result.extend(table[row_start:row_end]) # Count them remaining -= row_end - row_start if restart: break if count: return matched return (result, restart) def stream_remove(self, path, start = None, end = None): """ Remove data from the specified time interval within a stream. Removes all data in the interval [start, end), and intervals are truncated or split appropriately. Returns the number of data points removed. """ stream_id = self._stream_id(path) table = self.data.getnode(path) intervals = self._get_intervals(stream_id) to_remove = Interval(start or 0, end or 1e12) removed = 0 if start == end: return 0 # Can't remove intervals from within the iterator, so we need to # remember what's currently in the intersection now. all_candidates = list(intervals.intersection(to_remove, orig = True)) for (dbint, orig) in all_candidates: # Find row start and end row_start = self._find_start(table, dbint) row_end = self._find_end(table, dbint) # Adjust the DBInterval to match the newly found ends dbint.db_start = dbint.start dbint.db_end = dbint.end dbint.db_startpos = row_start dbint.db_endpos = row_end # Remove interval from the database self._remove_interval(stream_id, orig, dbint) # Remove data from the underlying table storage table.remove(row_start, row_end) # Count how many were removed removed += row_end - row_start return removed