|
- # -*- coding: utf-8 -*-
-
- """NilmDB
-
- Object that represents a NILM database file.
-
- Manages both the SQL database and the table storage backend.
- """
-
- # Need absolute_import so that "import nilmdb" won't pull in
- # nilmdb.py, but will pull the parent nilmdb module instead.
-
- import nilmdb.utils
- from nilmdb.utils.printf import printf
- from nilmdb.utils.time import timestamp_to_bytes
-
- from nilmdb.utils.interval import IntervalError
- from nilmdb.server.interval import Interval, DBInterval, IntervalSet
-
- from nilmdb.server import bulkdata
- from nilmdb.server.errors import NilmDBError, StreamError, OverlapError
-
- import sqlite3
- import os
- import errno
-
- # Note about performance and transactions:
- #
- # Committing a transaction in the default sync mode (PRAGMA synchronous=FULL)
- # takes about 125msec. sqlite3 will commit transactions at 3 times:
- # 1: explicit con.commit()
- # 2: between a series of DML commands and non-DML commands, e.g.
- # after a series of INSERT, SELECT, but before a CREATE TABLE or PRAGMA.
- # 3: at the end of an explicit transaction, e.g. "with self.con as con:"
- #
- # To speed things up, we can set 'PRAGMA synchronous=OFF'. Or, it
- # seems that 'PRAGMA synchronous=NORMAL' and 'PRAGMA journal_mode=WAL'
- # give an equivalent speedup more safely. That is what is used here.
- _sql_schema_updates = {
- 0: {"next": 1, "sql": """
- -- All streams
- CREATE TABLE streams(
- id INTEGER PRIMARY KEY, -- stream ID
- path TEXT UNIQUE NOT NULL, -- path, e.g. '/newton/prep'
- layout TEXT NOT NULL -- layout name, e.g. float32_8
- );
-
- -- Individual timestamped ranges in those streams.
- -- For a given start_time and end_time, this tells us that the
- -- data is stored between start_pos and end_pos.
- -- Times are stored as μs since Unix epoch
- -- Positions are opaque: PyTables rows, file offsets, etc.
- --
- -- Note: end_pos points to the row _after_ end_time, so end_pos-1
- -- is the last valid row.
- CREATE TABLE ranges(
- stream_id INTEGER NOT NULL,
- start_time INTEGER NOT NULL,
- end_time INTEGER NOT NULL,
- start_pos INTEGER NOT NULL,
- end_pos INTEGER NOT NULL
- );
- CREATE INDEX _ranges_index ON ranges (stream_id, start_time, end_time);
- """},
-
- 1: {"next": 3, "sql": """
- -- Generic dictionary-type metadata that can be associated with a stream
- CREATE TABLE metadata(
- stream_id INTEGER NOT NULL,
- key TEXT NOT NULL,
- value TEXT
- );
- """},
-
- 2: {"error": "old format with floating-point timestamps requires "
- "nilmdb 1.3.1 or older"},
-
- 3: {"next": None},
- }
-
-
- @nilmdb.utils.must_close()
- class NilmDB(object):
- verbose = 0
-
- def __init__(self, basepath,
- max_results=None,
- max_removals=None,
- max_int_removals=None,
- bulkdata_args=None):
- """Initialize NilmDB at the given basepath.
- Other arguments are for debugging / testing:
-
- 'max_results' is the max rows to send in a single
- stream_intervals or stream_extract response.
-
- 'max_removals' is the max rows to delete at once
- in stream_remove.
-
- 'max_int_removals' is the max intervals to delete
- at once in stream_remove.
-
- 'bulkdata_args' is kwargs for the bulkdata module.
- """
- if bulkdata_args is None:
- bulkdata_args = {}
-
- # set up path
- self.basepath = os.path.abspath(basepath)
-
- # Create the database path if it doesn't exist
- try:
- os.makedirs(self.basepath)
- except OSError as e:
- if e.errno != errno.EEXIST:
- raise IOError("can't create tree " + self.basepath)
-
- # Our data goes inside it
- self.data = bulkdata.BulkData(self.basepath, **bulkdata_args)
-
- # SQLite database too
- sqlfilename = os.path.join(self.basepath, "data.sql")
- self.con = sqlite3.connect(sqlfilename, check_same_thread=True)
- try:
- self._sql_schema_update()
- except Exception:
- self.data.close()
- raise
-
- # See big comment at top about the performance implications of this
- self.con.execute("PRAGMA synchronous=NORMAL")
- self.con.execute("PRAGMA journal_mode=WAL")
-
- # Approximate largest number of elements that we want to send
- # in a single reply (for stream_intervals, stream_extract).
- self.max_results = max_results or 16384
-
- # Remove up to this many rows per call to stream_remove.
- self.max_removals = max_removals or 1048576
-
- # Remove up to this many intervals per call to stream_remove.
- self.max_int_removals = max_int_removals or 4096
-
- def get_basepath(self):
- return self.basepath
-
- def close(self):
- if self.con:
- self.con.commit()
- self.con.close()
- self.con = None
- self.data.close()
-
- def _sql_schema_update(self):
- cur = self.con.cursor()
- version = cur.execute("PRAGMA user_version").fetchone()[0]
- oldversion = version
-
- while True:
- if version not in _sql_schema_updates:
- raise Exception(self.basepath + ": unknown database version "
- + str(version))
- update = _sql_schema_updates[version]
- if "error" in update:
- raise Exception(self.basepath + ": can't use database version "
- + str(version) + ": " + update["error"])
- if update["next"] is None:
- break
- cur.executescript(update["sql"])
- version = update["next"]
- if self.verbose:
- printf("Database schema updated to %d\n", version)
-
- if version != oldversion:
- with self.con:
- cur.execute("PRAGMA user_version = {v:d}".format(v=version))
-
- def _check_user_times(self, start, end):
- if start is None:
- start = nilmdb.utils.time.min_timestamp
- if end is None:
- end = nilmdb.utils.time.max_timestamp
- if start >= end:
- raise NilmDBError("start must precede end")
- return (start, end)
-
- @nilmdb.utils.lru_cache(size=64)
- def _get_intervals(self, stream_id):
- """
- Return a mutable IntervalSet corresponding to the given stream ID.
- """
- iset = IntervalSet()
- result = self.con.execute("SELECT start_time, end_time, "
- "start_pos, end_pos "
- "FROM ranges "
- "WHERE stream_id=?", (stream_id,))
- try:
- for (start_time, end_time, start_pos, end_pos) in result:
- iset += DBInterval(start_time, end_time,
- start_time, end_time,
- start_pos, end_pos)
- except IntervalError:
- raise NilmDBError("unexpected overlap in ranges table!")
-
- return iset
-
- def _sql_interval_insert(self, id, start, end, start_pos, end_pos):
- """Helper that adds interval to the SQL database only"""
- self.con.execute("INSERT INTO ranges "
- "(stream_id,start_time,end_time,start_pos,end_pos) "
- "VALUES (?,?,?,?,?)",
- (id, start, end, start_pos, end_pos))
-
- def _sql_interval_delete(self, id, start, end, start_pos, end_pos):
- """Helper that removes interval from the SQL database only"""
- self.con.execute("DELETE FROM ranges WHERE "
- "stream_id=? AND start_time=? AND "
- "end_time=? AND start_pos=? AND end_pos=?",
- (id, start, end, start_pos, end_pos))
-
- def _add_interval(self, stream_id, interval, start_pos, end_pos):
- """
- Add interval to the internal interval cache, and to the database.
- Note: arguments must be ints (not numpy.int64, etc)
- """
- # Load this stream's intervals
- iset = self._get_intervals(stream_id)
-
- # Check for adjacency. If there's a stream in the database
- # that ends exactly when this one starts, and the database
- # rows match up, we can make one interval that covers the
- # time range [adjacent.start -> interval.end)
- # and database rows [ adjacent.start_pos -> end_pos ].
- # Only do this if the resulting interval isn't too large.
- max_merged_rows = 8000 * 60 * 60 * 1.05 # 1.05 hours at 8 KHz
- adjacent = iset.find_end(interval.start)
- if (adjacent is not None and
- start_pos == adjacent.db_endpos and
- (end_pos - adjacent.db_startpos) < max_merged_rows):
- # First delete the old one, both from our iset and the
- # database
- iset -= adjacent
- self._sql_interval_delete(stream_id,
- adjacent.db_start, adjacent.db_end,
- adjacent.db_startpos, adjacent.db_endpos)
-
- # Now update our interval so the fallthrough add is
- # correct.
- interval.start = adjacent.start
- start_pos = adjacent.db_startpos
-
- # Add the new interval to the iset
- iset.iadd_nocheck(DBInterval(interval.start, interval.end,
- interval.start, interval.end,
- start_pos, end_pos))
-
- # Insert into the database
- self._sql_interval_insert(stream_id, interval.start, interval.end,
- int(start_pos), int(end_pos))
-
- self.con.commit()
-
- def _remove_interval(self, stream_id, original, remove):
- """
- Remove an interval from the internal cache and the database.
-
- stream_id: id of stream
- original: original DBInterval; must be already present in DB
- to_remove: DBInterval to remove; must be subset of 'original'
- """
- # Load this stream's intervals
- iset = self._get_intervals(stream_id)
-
- # Remove existing interval from the cached set and the database
- iset -= original
- self._sql_interval_delete(stream_id,
- original.db_start, original.db_end,
- original.db_startpos, original.db_endpos)
-
- # Add back the intervals that would be left over if the
- # requested interval is removed. There may be two of them, if
- # the removed piece was in the middle.
- def add(iset, start, end, start_pos, end_pos):
- iset += DBInterval(start, end, start, end, start_pos, end_pos)
- self._sql_interval_insert(stream_id, start, end,
- start_pos, end_pos)
-
- if original.start != remove.start:
- # Interval before the removed region
- add(iset, original.start, remove.start,
- original.db_startpos, remove.db_startpos)
-
- if original.end != remove.end:
- # Interval after the removed region
- add(iset, remove.end, original.end,
- remove.db_endpos, original.db_endpos)
-
- # Commit SQL changes
- self.con.commit()
-
- return
-
- def stream_list(self, path=None, layout=None, extended=False):
- """Return list of lists of all streams in the database.
-
- If path is specified, include only streams with a path that
- matches the given string.
-
- If layout is specified, include only streams with a layout
- that matches the given string.
-
- If extended=False, returns a list of lists containing
- the path and layout: [ path, layout ]
-
- If extended=True, returns a list of lists containing
- more information:
- path
- layout
- interval_min (earliest interval start)
- interval_max (latest interval end)
- rows (total number of rows of data)
- time (total time covered by this stream, in timestamp units)
- """
- params = ()
- query = "SELECT streams.path, streams.layout"
- if extended:
- query += ", min(ranges.start_time), max(ranges.end_time) "
- query += ", coalesce(sum(ranges.end_pos - ranges.start_pos), 0) "
- query += ", coalesce(sum(ranges.end_time - ranges.start_time), 0) "
- query += " FROM streams"
- if extended:
- query += " LEFT JOIN ranges ON streams.id = ranges.stream_id"
- query += " WHERE 1=1"
- if layout is not None:
- query += " AND streams.layout=?"
- params += (layout,)
- if path is not None:
- query += " AND streams.path=?"
- params += (path,)
- query += " GROUP BY streams.id ORDER BY streams.path"
- result = self.con.execute(query, params).fetchall()
- return [list(x) for x in result]
-
- def stream_intervals(self, path, start=None, end=None, diffpath=None):
- """
- List all intervals in 'path' between 'start' and 'end'. If
- 'diffpath' is not none, list instead the set-difference
- between the intervals in the two streams; i.e. all interval
- ranges that are present in 'path' but not 'diffpath'.
-
- Returns (intervals, restart) tuple.
-
- 'intervals' is a list of [start,end] timestamps of all intervals
- that exist for path, between start and end.
-
- 'restart', if not None, means that there were too many results
- to return in a single request. The data is complete from the
- starting timestamp to the point at which it was truncated, and
- a new request with a start time of 'restart' will fetch the
- next block of data.
- """
- stream_id = self._stream_id(path)
- intervals = self._get_intervals(stream_id)
- if diffpath:
- diffstream_id = self._stream_id(diffpath)
- diffintervals = self._get_intervals(diffstream_id)
- (start, end) = self._check_user_times(start, end)
- requested = Interval(start, end)
- result = []
- if diffpath:
- getter = nilmdb.utils.interval.set_difference(
- intervals.intersection(requested),
- diffintervals.intersection(requested))
- else:
- getter = intervals.intersection(requested)
- for n, i in enumerate(getter):
- if n >= self.max_results:
- restart = i.start
- break
- result.append([i.start, i.end])
- else:
- restart = None
- return (result, restart)
-
- def stream_create(self, path, layout_name):
- """Create a new table in the database.
-
- path: path to the data (e.g. '/newton/prep').
- Paths must contain at least two elements, e.g.:
- /newton/prep
- /newton/raw
- /newton/upstairs/prep
- /newton/upstairs/raw
-
- layout_name: string for nilmdb.layout.get_named(), e.g. 'float32_8'
- """
- # Create the bulk storage. Raises ValueError on error, which we
- # pass along.
- self.data.create(path, layout_name)
-
- # Insert into SQL database once the bulk storage is happy
- with self.con as con:
- con.execute("INSERT INTO streams (path, layout) VALUES (?,?)",
- (path, layout_name))
-
- def _stream_id(self, path):
- """Return unique stream ID"""
- result = self.con.execute("SELECT id FROM streams WHERE path=?",
- (path,)).fetchone()
- if result is None:
- raise StreamError("No stream at path " + path)
- return result[0]
-
- def stream_set_metadata(self, path, data):
- """Set stream metadata from a dictionary, e.g.
- { description: 'Downstairs lighting',
- v_scaling: 123.45 }
- This replaces all existing metadata.
- """
- stream_id = self._stream_id(path)
- with self.con as con:
- con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,))
- for key in data:
- if data[key] != '':
- con.execute("INSERT INTO metadata VALUES (?, ?, ?)",
- (stream_id, key, data[key]))
-
- def stream_get_metadata(self, path):
- """Return stream metadata as a dictionary."""
- stream_id = self._stream_id(path)
- result = self.con.execute("SELECT metadata.key, metadata.value "
- "FROM metadata "
- "WHERE metadata.stream_id=?", (stream_id,))
- data = {}
- for (key, value) in result:
- data[key] = value
- return data
-
- def stream_update_metadata(self, path, newdata):
- """Update stream metadata from a dictionary"""
- data = self.stream_get_metadata(path)
- data.update(newdata)
- self.stream_set_metadata(path, data)
-
- def stream_rename(self, oldpath, newpath):
- """Rename a stream."""
- stream_id = self._stream_id(oldpath)
-
- # Rename the data
- self.data.rename(oldpath, newpath)
-
- # Rename the stream in the database
- with self.con as con:
- con.execute("UPDATE streams SET path=? WHERE id=?",
- (newpath, stream_id))
-
- def stream_destroy(self, path):
- """Fully remove a table from the database. Fails if there are
- any intervals data present; remove them first. Metadata is
- also removed."""
- stream_id = self._stream_id(path)
-
- # Verify that no intervals are present, and clear the cache
- iset = self._get_intervals(stream_id)
- if len(iset):
- raise NilmDBError("all intervals must be removed before "
- "destroying a stream")
- self._get_intervals.cache_remove(self, stream_id)
-
- # Delete the bulkdata storage
- self.data.destroy(path)
-
- # Delete metadata, stream, intervals (should be none)
- with self.con as con:
- con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,))
- con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
- con.execute("DELETE FROM streams WHERE id=?", (stream_id,))
-
- def stream_insert(self, path, start, end, data, binary=False):
- """Insert new data into the database.
- path: Path at which to add the data
- start: Starting timestamp
- end: Ending timestamp
- data: Textual data, formatted according to the layout of path
-
- 'binary', if True, means that 'data' is raw binary:
- little-endian, matching the current table's layout,
- including the int64 timestamp.
- """
- # First check for basic overlap using timestamp info given.
- stream_id = self._stream_id(path)
- iset = self._get_intervals(stream_id)
- interval = Interval(start, end)
- if iset.intersects(interval):
- raise OverlapError("new data overlaps existing data at range: "
- + str(iset & interval))
-
- # Tenatively append the data. This will raise a ValueError if
- # there are any parse errors.
- table = self.data.getnode(path)
- row_start = table.nrows
- table.append_data(data, start, end, binary)
- row_end = table.nrows
-
- # Insert the record into the sql database.
- self._add_interval(stream_id, interval, row_start, row_end)
-
- # And that's all
- return
-
- def _bisect_left(self, a, x, lo, hi):
- # Like bisect.bisect_left, but doesn't choke on large indices on
- # 32-bit systems, like bisect's fast C implementation does.
- while lo < hi:
- mid = (lo + hi) // 2
- if a[mid] < x:
- lo = mid + 1
- else:
- hi = mid
- return lo
-
- def _find_start(self, table, dbinterval):
- """
- Given a DBInterval, find the row in the database that
- corresponds to the start time. Return the first database
- position with a timestamp (first element) greater than or
- equal to 'start'.
- """
- # Optimization for the common case where an interval wasn't truncated
- if dbinterval.start == dbinterval.db_start:
- return dbinterval.db_startpos
- return self._bisect_left(table,
- dbinterval.start,
- dbinterval.db_startpos,
- dbinterval.db_endpos)
-
- def _find_end(self, table, dbinterval):
- """
- Given a DBInterval, find the row in the database that follows
- the end time. Return the first database position after the
- row with timestamp (first element) greater than or equal
- to 'end'.
- """
- # Optimization for the common case where an interval wasn't truncated
- if dbinterval.end == dbinterval.db_end:
- return dbinterval.db_endpos
- # Note that we still use bisect_left here, because we don't
- # want to include the given timestamp in the results. This is
- # so a queries like 1:00 -> 2:00 and 2:00 -> 3:00 return
- # non-overlapping data.
- return self._bisect_left(table,
- dbinterval.end,
- dbinterval.db_startpos,
- dbinterval.db_endpos)
-
- def stream_extract(self, path, start=None, end=None,
- count=False, markup=False, binary=False):
- """
- Returns (data, restart) tuple.
-
- 'data' is ASCII-formatted data from the database, formatted
- according to the layout of the stream.
-
- 'restart', if not None, means that there were too many results to
- return in a single request. The data is complete from the
- starting timestamp to the point at which it was truncated,
- and a new request with a start time of 'restart' will fetch
- the next block of data.
-
- 'count', if true, means to not return raw data, but just the count
- of rows that would have been returned. This is much faster
- than actually fetching the data. It is not limited by
- max_results.
-
- 'markup', if true, indicates that returned data should be
- marked with a comment denoting when a particular interval
- starts, and another comment when an interval ends.
-
- 'binary', if true, means to return raw binary rather than
- ASCII-formatted data.
- """
- stream_id = self._stream_id(path)
- table = self.data.getnode(path)
- intervals = self._get_intervals(stream_id)
- (start, end) = self._check_user_times(start, end)
- requested = Interval(start, end)
- result = []
- matched = 0
- remaining = self.max_results
- restart = None
- if binary and (markup or count):
- raise NilmDBError("binary mode can't be used with markup or count")
- for interval in intervals.intersection(requested):
- # Reading single rows from the table is too slow, so
- # we use two bisections to find both the starting and
- # ending row for this particular interval, then
- # read the entire range as one slice.
- row_start = self._find_start(table, interval)
- row_end = self._find_end(table, interval)
-
- if count:
- matched += row_end - row_start
- continue
-
- # Shorten it if we'll hit the maximum number of results
- row_max = row_start + remaining
- if row_max < row_end:
- row_end = row_max
- restart = table[row_max]
-
- # Add markup
- if markup:
- result.append(b"# interval-start " +
- timestamp_to_bytes(interval.start) + b"\n")
-
- # Gather these results up
- result.append(table.get_data(row_start, row_end, binary))
-
- # Count them
- remaining -= row_end - row_start
-
- # Add markup, and exit if restart is set.
- if restart is not None:
- if markup:
- result.append(b"# interval-end " +
- timestamp_to_bytes(restart) + b"\n")
- break
- if markup:
- result.append(b"# interval-end " +
- timestamp_to_bytes(interval.end) + b"\n")
-
- if count:
- return matched
- full_result = b"".join(result)
- return (full_result, restart)
-
- def stream_remove(self, path, start=None, end=None):
- """
- Remove data from the specified time interval within a stream.
-
- Removes data in the interval [start, end), and intervals are
- truncated or split appropriately.
-
- Returns a (removed, restart) tuple.
-
- 'removed' is the number of data points that were removed.
-
- 'restart', if not None, means there were too many rows to
- remove in a single request. This function should be called
- again with a start time of 'restart' to complete the removal.
- """
- stream_id = self._stream_id(path)
- table = self.data.getnode(path)
- intervals = self._get_intervals(stream_id)
- (start, end) = self._check_user_times(start, end)
- to_remove = Interval(start, end)
- removed = 0
- remaining = self.max_removals
- int_remaining = self.max_int_removals
- restart = None
-
- # Can't remove intervals from within the iterator, so we need to
- # remember what's currently in the intersection now.
- all_candidates = list(intervals.intersection(to_remove, orig=True))
-
- remove_start = None
- remove_end = None
-
- for (dbint, orig) in all_candidates:
- # Stop if we've hit the max number of interval removals
- if int_remaining <= 0:
- restart = dbint.start
- break
-
- # Find row start and end
- row_start = self._find_start(table, dbint)
- row_end = self._find_end(table, dbint)
-
- # Shorten it if we'll hit the maximum number of removals
- row_max = row_start + remaining
- if row_max < row_end:
- row_end = row_max
- dbint.end = table[row_max]
- restart = dbint.end
-
- # Adjust the DBInterval to match the newly found ends
- dbint.db_start = dbint.start
- dbint.db_end = dbint.end
- dbint.db_startpos = row_start
- dbint.db_endpos = row_end
-
- # Remove interval from the database
- self._remove_interval(stream_id, orig, dbint)
-
- # Remove data from the underlying table storage,
- # coalescing adjacent removals to reduce the number of calls
- # to table.remove.
- if remove_end == row_start:
- # Extend our coalesced region
- remove_end = row_end
- else:
- # Perform previous removal, then save this one
- if remove_end is not None:
- table.remove(remove_start, remove_end)
- remove_start = row_start
- remove_end = row_end
-
- # Count how many were removed
- removed += row_end - row_start
- remaining -= row_end - row_start
- int_remaining -= 1
-
- if restart is not None:
- break
-
- # Perform any final coalesced removal
- if remove_end is not None:
- table.remove(remove_start, remove_end)
-
- return (removed, restart)
|