|
- # -*- coding: utf-8 -*-
-
- """NilmDB
-
- Object that represents a NILM database file.
-
- Manages both the SQL database and the PyTables storage backend.
- """
-
- # Need absolute_import so that "import nilmdb" won't pull in nilmdb.py,
- # but will pull the nilmdb module instead.
- from __future__ import absolute_import
- import nilmdb
- from nilmdb.printf import *
- from nilmdb.interval import Interval, IntervalSet, IntervalError
-
- import sqlite3
- import tables
- import time
- import sys
- import os
- import errno
-
- # Note about performance and transactions:
- #
- # Committing a transaction in the default sync mode (PRAGMA synchronous=FULL)
- # takes about 125msec. sqlite3 will commit transactions at 3 times:
- # 1: explicit con.commit()
- # 2: between a series of DML commands and non-DML commands, e.g.
- # after a series of INSERT, SELECT, but before a CREATE TABLE or PRAGMA.
- # 3: at the end of an explicit transaction, e.g. "with self.con as con:"
- #
- # To speed up testing, or if this transaction speed becomes an issue,
- # the sync=False option to NilmDB.__init__ will set PRAGMA synchronous=OFF.
-
-
- # Don't touch old entries -- just add new ones.
- _sql_schema_updates = {
- 0: """
- -- All streams
- CREATE TABLE streams(
- id INTEGER PRIMARY KEY, -- stream ID
- path TEXT UNIQUE NOT NULL, -- path, e.g. '/newton/prep'
- layout TEXT NOT NULL -- one of nilmdb.layout.layouts
- );
-
- -- Individual timestamped ranges in those streams.
- -- For a given start_time and end_time, this tells us that the
- -- data is stored between start_pos and end_pos.
- -- Times are stored as μs since Unix epoch
- -- Positions are opaque: PyTables rows, file offsets, etc.
- CREATE TABLE ranges(
- stream_id INTEGER NOT NULL,
- start_time INTEGER NOT NULL,
- end_time INTEGER NOT NULL,
- start_pos INTEGER NOT NULL,
- end_pos INTEGER NOT NULL
- );
- CREATE INDEX _ranges_index ON ranges (stream_id, start_time, end_time);
- """,
-
- 1: """
- -- Generic dictionary-type metadata that can be associated with a stream
- CREATE TABLE metadata(
- stream_id INTEGER NOT NULL,
- key TEXT NOT NULL,
- value TEXT
- );
- """,
- }
-
- class NilmDBError(Exception):
- """Base exception for NilmDB errors"""
- def __init__(self, message = "Unspecified error"):
- Exception.__init__(self, self.__class__.__name__ + ": " + message)
-
- class StreamError(NilmDBError):
- pass
-
- class OverlapError(NilmDBError):
- pass
-
- class NilmDB(object):
- verbose = 0
-
- def __init__(self, basepath, sync=True):
- # set up path
- self.basepath = os.path.abspath(basepath.rstrip('/'))
-
- # Create the database path if it doesn't exist
- try:
- os.makedirs(self.basepath)
- except OSError as e:
- if e.errno != errno.EEXIST:
- raise IOError("can't create tree " + self.basepath)
-
- # Our HD5 file goes inside it
- h5filename = os.path.abspath(self.basepath + "/data.h5")
- self.h5file = tables.openFile(h5filename, "a", "NILM Database")
-
- # SQLite database too
- sqlfilename = os.path.abspath(self.basepath + "/data.sql")
- # We use check_same_thread = False, assuming that the rest
- # of the code (e.g. Server) will be smart and not access this
- # database from multiple threads simultaneously. That requirement
- # may be relaxed later.
- self.con = sqlite3.connect(sqlfilename, check_same_thread = False)
- self._sql_schema_update()
-
- # See big comment at top about the performance implications of this
- if sync:
- self.con.execute("PRAGMA synchronous=FULL")
- else:
- self.con.execute("PRAGMA synchronous=OFF")
-
- self.opened = True
-
- def __del__(self):
- if "opened" in self.__dict__: # pragma: no cover
- fprintf(sys.stderr,
- "error: NilmDB.close() wasn't called, path %s",
- self.basepath)
-
- def get_basepath(self):
- return self.basepath
-
- def close(self):
- if self.con:
- self.con.commit()
- self.con.close()
- self.h5file.close()
- del self.opened
-
- def _sql_schema_update(self):
- cur = self.con.cursor()
- version = cur.execute("PRAGMA user_version").fetchone()[0]
- oldversion = version
-
- while version in _sql_schema_updates:
- cur.executescript(_sql_schema_updates[version])
- version = version + 1
- if self.verbose: # pragma: no cover
- printf("Schema updated to %d\n", version)
-
- if version != oldversion:
- with self.con:
- cur.execute("PRAGMA user_version = {v:d}".format(v=version))
-
- def _get_intervals(self, stream_id):
- """
- Return an IntervalSet corresponding to the given stream ID.
- """
- # Could cache these, if it's a performance bottleneck
- iset = IntervalSet()
- result = self.con.execute("SELECT start_time, end_time "
- "FROM ranges "
- "WHERE stream_id=?", (stream_id,))
- try:
- for (start, end) in result:
- iset += Interval(start, end)
- except IntervalError as e: # pragma: no cover
- raise NilmDBError("unexpected overlap in ranges table!")
- return iset
-
- def _add_interval(self, stream_id, interval, start_pos, end_pos):
- # Arguments must be ints (not numpy.int64, etc)
- self.con.execute("INSERT INTO ranges "
- "(stream_id,start_time,end_time,start_pos,end_pos) "
- "VALUES (?,?,?,?,?)",
- (stream_id, interval.start, interval.end,
- int(start_pos), int(end_pos)))
-
- def stream_list(self, path = None, layout = None):
- """Return list of [path, layout] lists of all streams
- in the database.
-
- If path is specified, include only streams with a path that
- matches the given string.
-
- If layout is specified, include only streams with a layout
- that matches the given string.
- """
- where = "WHERE 1=1"
- params = ()
- if layout:
- where += " AND layout=?"
- params += (layout,)
- if path:
- where += " AND path=?"
- params += (path,)
- result = self.con.execute("SELECT path, layout "
- "FROM streams " + where, params).fetchall()
-
- return sorted(list(x) for x in result)
-
- def stream_create(self, path, layout_name, index = None):
- """Create a new table in the database.
-
- path: path to the data (e.g. '/newton/prep')
-
- layout_name: one of the nilmdb.layout.layouts keys, e.g. 'PrepData'
-
- index: list of layout columns to be marked as PyTables indices.
- If index = none, the 'timestamp' column is indexed if it exists.
- Pass an empty list to prevent indexing.
- """
- if path[0] != '/':
- raise ValueError("paths must start with /")
- [ group, node ] = path.rsplit("/", 1)
- if group == '':
- raise ValueError("Invalid path")
-
- # Make the group structure, one element at a time
- group_path = group.lstrip('/').split("/")
- for i in range(len(group_path)):
- parent = "/" + "/".join(group_path[0:i])
- child = group_path[i]
- try:
- self.h5file.createGroup(parent, child)
- except tables.NodeError:
- pass
-
- # Get description
- desc = nilmdb.layout.named[layout_name].description()
-
- # Estimated table size (for PyTables optimization purposes): assume
- # 3 months worth of data. It's OK if this is wrong.
- exp_rows = nilmdb.layout.named[layout_name].rate_hz * 60 * 60 * 24 * 30 * 3
-
- table = self.h5file.createTable(group, node,
- description = desc,
- expectedrows = exp_rows)
-
- # Create indices
- try:
- if index is None and "timestamp" in table.colnames:
- index = [ "timestamp" ]
- for ind in index:
- table.cols._f_col(str(ind)).createIndex()
- except KeyError as e:
- # Remove this table if we got an error
- self.h5file.removeNode(group, node)
- raise e
-
- # Insert into SQL database once the PyTables is happy
- with self.con as con:
- con.execute("INSERT INTO streams (path, layout) VALUES (?,?)",
- (path, layout_name))
-
- def _stream_id(self, path):
- """Return unique stream ID"""
- result = self.con.execute("SELECT id FROM streams WHERE path=?",
- (path,)).fetchone()
- if result is None:
- raise StreamError("No stream at path " + path)
- return result[0]
-
- def stream_set_metadata(self, path, data):
- """Set stream metadata from a dictionary, e.g.
- { description = 'Downstairs lighting',
- v_scaling = 123.45 }
- This replaces all existing metadata.
- """
- stream_id = self._stream_id(path)
- with self.con as con:
- for key in data:
- con.execute("DELETE FROM metadata "
- "WHERE stream_id=? AND key=?", (stream_id, key))
- if data[key] != '':
- con.execute("INSERT INTO metadata VALUES (?, ?, ?)",
- (stream_id, key, data[key]))
-
- def stream_get_metadata(self, path):
- """Return stream metadata as a dictionary."""
- stream_id = self._stream_id(path)
- result = self.con.execute("SELECT metadata.key, metadata.value "
- "FROM metadata "
- "WHERE metadata.stream_id=?", (stream_id,))
- data = {}
- for (key, value) in result:
- data[key] = value
- return data
-
- def stream_update_metadata(self, path, newdata):
- """Update stream metadata from a dictionary"""
- data = self.stream_get_metadata(path)
- data.update(newdata)
- self.stream_set_metadata(path, data)
-
- def stream_insert(self, path, parser):
- """Insert new data into the database.
- path: Path at which to add the data
- parser: nilmdb.layout.Parser instance full of data to insert
- """
- if (not parser.min_timestamp or not parser.max_timestamp or
- not len(parser.data)):
- raise StreamError("no data provided")
-
- # First check for basic overlap using timestamp info from the parser.
- stream_id = self._stream_id(path)
- iset = self._get_intervals(stream_id)
- interval = Interval(parser.min_timestamp, parser.max_timestamp)
- if iset.intersects(interval):
- raise OverlapError("new data overlaps existing data: "
- + str(iset & interval))
-
- # Insert the data into pytables
- table = self.h5file.getNode(path)
- row_start = table.nrows
- table.append(parser.data)
- row_end = table.nrows
- table.flush()
-
- # Insert the record into the sql database.
- # Casts are to convert from numpy.int64.
- self._add_interval(stream_id, interval, int(row_start), int(row_end))
-
- # And that's all
- return "ok"
|