|
- # -*- coding: utf-8 -*-
-
- """NilmDB
-
- Object that represents a NILM database file.
-
- Manages both the SQL database and the PyTables storage backend.
- """
-
- # Need absolute_import so that "import nilmdb" won't pull in nilmdb.py,
- # but will pull the nilmdb module instead.
- from __future__ import absolute_import
- import nilmdb
- from nilmdb.printf import *
-
- import sqlite3
- import tables
- import time
- import sys
- import os
- import errno
-
- # Don't touch old entries -- just add new ones.
- sql_schema_updates = {
- 0: """
- -- All streams
- CREATE TABLE streams(
- id INTEGER PRIMARY KEY, -- stream ID
- path TEXT UNIQUE NOT NULL, -- path, e.g. '/newton/prep'
- layout TEXT NOT NULL -- one of nilmdb.layout.layouts
- );
-
- -- Individual timestamped ranges in those streams.
- -- For a given start_time and end_time, this tells us that the
- -- data is stored between start_pos and end_pos.
- -- Times are stored as μs since Unix epoch
- -- Positions are opaque: PyTables rows, file offsets, etc.
- CREATE TABLE ranges(
- stream_id INTEGER,
- start_time INTEGER NOT NULL,
- end_time INTEGER NOT NULL,
- start_pos INTEGER NOT NULL,
- end_pos INTEGER NOT NULL
- );
- CREATE INDEX _ranges_index ON ranges (stream_id, start_time, end_time);
- """,
-
- 1: """
- -- Generic dictionary-type metadata that can be associated with a stream
- CREATE TABLE metadata(
- stream_id INTEGER,
- key TEXT NOT NULL,
- value TEXT
- );
- """,
- }
-
- class StreamError(Exception):
- pass
-
- class NilmDB(object):
- verbose = 0
-
- def __init__(self, basepath):
- # set up path
- self.basepath = basepath.rstrip('/')
-
- # Create the database path if it doesn't exist
- try:
- os.makedirs(self.basepath)
- except OSError as e:
- if e.errno != errno.EEXIST:
- raise IOError("can't create tree " + self.basepath)
-
- # Our HD5 file goes inside it
- h5filename = os.path.abspath(self.basepath + "/data.h5")
- self.h5file = tables.openFile(h5filename, "a", "NILM Database")
-
- # SQLite database too
- sqlfilename = os.path.abspath(self.basepath + "/data.sql")
- # We use check_same_thread = False, assuming that the rest
- # of the code (e.g. Server) will be smart and not access this
- # database from multiple threads simultaneously. That requirement
- # may be relaxed later.
- self.con = sqlite3.connect(sqlfilename, check_same_thread = False)
- self.sql_schema_update()
-
- self.opened = True
-
- def __del__(self):
- if "opened" in self.__dict__: # pragma: no cover
- fprintf(sys.stderr,
- "error: NilmDB.close() wasn't called, path %s",
- self.basepath)
-
- def close(self):
- if self.con:
- self.con.commit()
- self.con.close()
- self.h5file.close()
- del self.opened
-
- def sql_schema_update(self):
- cur = self.con.cursor()
- version = cur.execute("PRAGMA user_version").fetchone()[0]
- oldversion = version
-
- while version in sql_schema_updates:
- cur.executescript(sql_schema_updates[version])
- version = version + 1
- if self.verbose: # pragma: no cover
- printf("Schema updated to %d\n", version)
-
- if version != oldversion:
- # this takes .1 seconds, so only do it if necessary
- with self.con:
- cur.execute("PRAGMA user_version = {v:d}".format(v=version))
-
- def stream_list(self, path = None, layout = None):
- """Return list of [path, layout] lists of all streams
- in the database.
-
- If path is specified, include only streams with a path that
- matches the given string.
-
- If layout is specified, include only streams with a layout
- that matches the given string.
- """
- where = "WHERE 1=1"
- params = ()
- if layout:
- where += " AND layout=?"
- params += (layout,)
- if path:
- where += " AND path=?"
- params += (path,)
- result = self.con.execute("SELECT path, layout " +
- "FROM streams " + where, params).fetchall()
-
- return sorted(list(x) for x in result)
-
- def stream_create(self, path, layout_name, index = None):
- """Create a new table in the database.
-
- path: path to the data (e.g. '/newton/prep')
-
- layout_name: one of the nilmdb.layout.layouts keys, e.g. 'PrepData'
-
- index: layout columns listed here are marked as PyTables indices.
- If index = none, the 'timestamp' column is indexed if it exists.
- Pass an empty list to prevent indexing.
- """
- if path[0] != '/':
- raise ValueError("paths must start with /")
- [ group, node ] = path.rsplit("/", 1)
- if group == '':
- raise ValueError("Invalid path")
-
- # Make the group structure, one element at a time
- group_path = group.lstrip('/').split("/")
- for i in range(len(group_path)):
- parent = "/" + "/".join(group_path[0:i])
- child = group_path[i]
- try:
- self.h5file.createGroup(parent, child)
- except tables.NodeError:
- pass
-
- # Get description
- desc = nilmdb.layout.description(layout_name)
-
- # Estimated table size (for PyTables optimization purposes): assume
- # 3 months worth of data. It's OK if this is wrong.
- exp_rows = nilmdb.layout.expected_daily_rows(layout_name) * 90
-
- table = self.h5file.createTable(group, node,
- description = desc,
- expectedrows = exp_rows)
-
- # Create indices
- try:
- if index is None and "timestamp" in table.colnames:
- index = [ "timestamp" ]
- for ind in index:
- table.cols._f_col(str(ind)).createIndex()
- except KeyError as e:
- # Remove this table if we got an error
- self.h5file.removeNode(group, node)
- raise e
-
- # Insert into SQL database once the PyTables is happy
- with self.con as con:
- con.execute("INSERT INTO streams (path, layout) VALUES (?,?)",
- (path, layout_name))
-
- def stream_id(self, path):
- """Return unique stream ID"""
- result = self.con.execute("SELECT id FROM streams WHERE path=?",
- (path,)).fetchone()
- if result is None:
- raise StreamError("No stream at path " + path)
- return result[0]
-
- def stream_set_metadata(self, path, data):
- """Set stream metadata from a dictionary, e.g.
- { description = 'Downstairs lighting',
- v_scaling = 123.45 }
- This replaces all existing metadata.
- """
- stream_id = self.stream_id(path)
- with self.con as con:
- for key in data:
- con.execute("DELETE FROM metadata "
- "WHERE stream_id=? AND key=?", (stream_id, key))
- if data[key] != '':
- con.execute("INSERT INTO metadata VALUES (?, ?, ?)",
- (stream_id, key, data[key]))
-
- def stream_get_metadata(self, path):
- """Return stream metadata as a dictionary."""
- stream_id = self.stream_id(path)
- result = self.con.execute("SELECT metadata.key, metadata.value "
- "FROM metadata "
- "WHERE metadata.stream_id=?", (stream_id,))
- data = {}
- for (key, value) in result:
- data[key] = value
- return data
-
- def stream_update_metadata(self, path, newdata):
- """Update stream metadata from a dictionary"""
- data = self.stream_get_metadata(path)
- data.update(newdata)
- self.stream_set_metadata(path, data)
|