This will make migrating to my own data storage engine easier.tags/replace-pytables
@@ -0,0 +1,127 @@ | |||
# Fixed record size bulk data storage, currently using PyTables. | |||
from __future__ import absolute_import | |||
import nilmdb | |||
from nilmdb.printf import * | |||
import tables | |||
import os | |||
import sys | |||
class BulkData(object): | |||
def __init__(self, basepath): | |||
self.basepath = basepath | |||
h5filename = os.path.abspath(self.basepath + "/data.h5") | |||
self.h5file = tables.openFile(h5filename, "a", "NILM Database") | |||
self.opened = True | |||
def __del__(self): | |||
if "opened" in self.__dict__: # pragma: no cover | |||
fprintf(sys.stderr, | |||
"error: BulkData.close() wasn't called, path %s", | |||
self.basepath) | |||
def close(self): | |||
self.h5file.close() | |||
del self.opened | |||
def create(self, path, layout_name): | |||
""" | |||
path: path to the data (e.g. '/newton/prep'). | |||
Paths must contain at least two elements, e.g.: | |||
/newton/prep | |||
/newton/raw | |||
/newton/upstairs/prep | |||
/newton/upstairs/raw | |||
layout_name: string for nilmdb.layout.get_named(), e.g. 'float32_8' | |||
""" | |||
if path[0] != '/': | |||
raise ValueError("paths must start with /") | |||
[ group, node ] = path.rsplit("/", 1) | |||
if group == '': | |||
raise ValueError("invalid path") | |||
# Get description | |||
try: | |||
# Build PyTables description | |||
layout = nilmdb.layout.get_named(layout_name) | |||
desc = {} | |||
desc['timestamp'] = tables.Col.from_type('float64', pos=0) | |||
for n in range(layout.count): | |||
desc['c' + str(n+1)] = tables.Col.from_type(layout.datatype, | |||
pos=n+1) | |||
desc = tables.Description(desc) | |||
except KeyError: | |||
raise ValueError("no such layout") | |||
# Estimated table size (for PyTables optimization purposes): assume | |||
# 3 months worth of data at 8 KHz. It's OK if this is wrong. | |||
exp_rows = 8000 * 60*60*24*30*3 | |||
# Create the table | |||
try: | |||
table = self.h5file.createTable(group, | |||
node, | |||
description = desc, | |||
expectedrows = exp_rows, | |||
createparents = True) | |||
except AttributeError: | |||
# Trying to create e.g. /foo/bar/baz when /foo/bar is already | |||
# a table raises this error. | |||
raise ValueError("error creating table at that path") | |||
# Success | |||
return | |||
def destroy(self, path): | |||
"""Fully remove all data at a particular path. No way to undo | |||
it! The group structure is removed, too.""" | |||
# Delete the data node, and all parent nodes (if they have no | |||
# remaining children) | |||
split_path = path.lstrip('/').split("/") | |||
while split_path: | |||
name = split_path.pop() | |||
where = "/" + "/".join(split_path) | |||
try: | |||
self.h5file.removeNode(where, name, recursive = False) | |||
except tables.NodeError: | |||
break | |||
def getnode(self, path): | |||
return Table(self.h5file.getNode(path)) | |||
class Table(object): | |||
"""Tools to help access a single table (data at a specific path)""" | |||
def __init__(self, table): | |||
self.table = table | |||
@property | |||
def nrows(self): | |||
"""Return number of rows total""" | |||
return int(self.table.nrows) | |||
def append(self, data): | |||
"""Append the data and flush it to disk""" | |||
self.table.append(data) | |||
self.table.flush() | |||
def __getitem__(self, val): | |||
"""Needs to support simple indexing (table[n]) and | |||
range slices (table[n:m])""" | |||
return self.table.__getitem__(val) | |||
class TimestampOnlyTable(object): | |||
"""Helper that lets us pass a Tables object into bisect, by | |||
returning only the timestamp when a particular row is requested.""" | |||
def __init__(self, table): | |||
self.table = table | |||
def __getitem__(self, index): | |||
return self.table[index][0] | |||
# Just a helper to so we can call bulkdata.open() instead of | |||
# bulkdata.BulkData(): | |||
def open(path): | |||
return BulkData(path) |
@@ -1,6 +1,5 @@ | |||
# cython: profile=False | |||
import tables | |||
import time | |||
import sys | |||
import inspect | |||
@@ -122,15 +121,6 @@ class Layout: | |||
s += " %d" % d[i+1] | |||
return s + "\n" | |||
# PyTables description | |||
def description(self): | |||
"""Return the PyTables description of this layout""" | |||
desc = {} | |||
desc['timestamp'] = tables.Col.from_type('float64', pos=0) | |||
for n in range(self.count): | |||
desc['c' + str(n+1)] = tables.Col.from_type(self.datatype, pos=n+1) | |||
return tables.Description(desc) | |||
# Get a layout by name | |||
def get_named(typestring): | |||
try: | |||
@@ -4,7 +4,7 @@ | |||
Object that represents a NILM database file. | |||
Manages both the SQL database and the PyTables storage backend. | |||
Manages both the SQL database and the table storage backend. | |||
""" | |||
# Need absolute_import so that "import nilmdb" won't pull in nilmdb.py, | |||
@@ -14,7 +14,6 @@ import nilmdb | |||
from nilmdb.utils.printf import * | |||
import sqlite3 | |||
import tables | |||
import time | |||
import sys | |||
import os | |||
@@ -25,6 +24,8 @@ import pyximport | |||
pyximport.install() | |||
from nilmdb.interval import Interval, DBInterval, IntervalSet, IntervalError | |||
from . import bulkdata | |||
# Note about performance and transactions: | |||
# | |||
# Committing a transaction in the default sync mode (PRAGMA synchronous=FULL) | |||
@@ -87,13 +88,6 @@ class StreamError(NilmDBError): | |||
class OverlapError(NilmDBError): | |||
pass | |||
# Helper that lets us pass a Pytables table into bisect | |||
class BisectableTable(object): | |||
def __init__(self, table): | |||
self.table = table | |||
def __getitem__(self, index): | |||
return self.table[index][0] | |||
@nilmdb.utils.must_close() | |||
class NilmDB(object): | |||
verbose = 0 | |||
@@ -109,9 +103,8 @@ class NilmDB(object): | |||
if e.errno != errno.EEXIST: | |||
raise IOError("can't create tree " + self.basepath) | |||
# Our HD5 file goes inside it | |||
h5filename = os.path.abspath(self.basepath + "/data.h5") | |||
self.h5file = tables.openFile(h5filename, "a", "NILM Database") | |||
# Our data goes inside it | |||
self.data = bulkdata.open(self.basepath) | |||
# SQLite database too | |||
sqlfilename = os.path.abspath(self.basepath + "/data.sql") | |||
@@ -143,7 +136,7 @@ class NilmDB(object): | |||
if self.con: | |||
self.con.commit() | |||
self.con.close() | |||
self.h5file.close() | |||
self.data.close() | |||
def _sql_schema_update(self): | |||
cur = self.con.cursor() | |||
@@ -295,35 +288,11 @@ class NilmDB(object): | |||
layout_name: string for nilmdb.layout.get_named(), e.g. 'float32_8' | |||
""" | |||
if path[0] != '/': | |||
raise ValueError("paths must start with /") | |||
[ group, node ] = path.rsplit("/", 1) | |||
if group == '': | |||
raise ValueError("invalid path") | |||
# Get description | |||
try: | |||
desc = nilmdb.layout.get_named(layout_name).description() | |||
except KeyError: | |||
raise ValueError("no such layout") | |||
# Create the bulk storage. Raises ValueError on error, which we | |||
# pass along. | |||
self.data.create(path, layout_name) | |||
# Estimated table size (for PyTables optimization purposes): assume | |||
# 3 months worth of data at 8 KHz. It's OK if this is wrong. | |||
exp_rows = 8000 * 60*60*24*30*3 | |||
# Create the table | |||
try: | |||
table = self.h5file.createTable(group, | |||
node, | |||
description = desc, | |||
expectedrows = exp_rows, | |||
createparents = True) | |||
except AttributeError: | |||
# Trying to create e.g. /foo/bar/baz when /foo/bar is already | |||
# a table raises this error. | |||
raise ValueError("error creating table at that path") | |||
# Insert into SQL database once the PyTables is happy | |||
# Insert into SQL database once the bulk storage is happy | |||
with self.con as con: | |||
con.execute("INSERT INTO streams (path, layout) VALUES (?,?)", | |||
(path, layout_name)) | |||
@@ -369,23 +338,14 @@ class NilmDB(object): | |||
def stream_destroy(self, path): | |||
"""Fully remove a table and all of its data from the database. | |||
No way to undo it! The group structure is removed, if there | |||
are no other tables in it. Metadata is removed.""" | |||
No way to undo it! Metadata is removed.""" | |||
stream_id = self._stream_id(path) | |||
# Delete the cached interval data | |||
self._get_intervals.cache_remove(self, stream_id) | |||
# Delete the data node, and all parent nodes (if they have no | |||
# remaining children) | |||
split_path = path.lstrip('/').split("/") | |||
while split_path: | |||
name = split_path.pop() | |||
where = "/" + "/".join(split_path) | |||
try: | |||
self.h5file.removeNode(where, name, recursive = False) | |||
except tables.NodeError: | |||
break | |||
# Delete the data | |||
self.data.destroy(path) | |||
# Delete metadata, stream, intervals | |||
with self.con as con: | |||
@@ -409,16 +369,14 @@ class NilmDB(object): | |||
raise OverlapError("new data overlaps existing data at range: " | |||
+ str(iset & interval)) | |||
# Insert the data into pytables | |||
table = self.h5file.getNode(path) | |||
# Insert the data | |||
table = self.data.getnode(path) | |||
row_start = table.nrows | |||
table.append(data) | |||
row_end = table.nrows | |||
table.flush() | |||
# Insert the record into the sql database. | |||
# Casts are to convert from numpy.int64. | |||
self._add_interval(stream_id, interval, int(row_start), int(row_end)) | |||
self._add_interval(stream_id, interval, row_start, row_end) | |||
# And that's all | |||
return "ok" | |||
@@ -433,7 +391,7 @@ class NilmDB(object): | |||
# Optimization for the common case where an interval wasn't truncated | |||
if interval.start == interval.db_start: | |||
return interval.db_startpos | |||
return bisect.bisect_left(BisectableTable(table), | |||
return bisect.bisect_left(bulkdata.TimestampOnlyTable(table), | |||
interval.start, | |||
interval.db_startpos, | |||
interval.db_endpos) | |||
@@ -452,7 +410,7 @@ class NilmDB(object): | |||
# want to include the given timestamp in the results. This is | |||
# so a queries like 1:00 -> 2:00 and 2:00 -> 3:00 return | |||
# non-overlapping data. | |||
return bisect.bisect_left(BisectableTable(table), | |||
return bisect.bisect_left(bulkdata.TimestampOnlyTable(table), | |||
interval.end, | |||
interval.db_startpos, | |||
interval.db_endpos) | |||
@@ -476,7 +434,7 @@ class NilmDB(object): | |||
than actually fetching the data. It is not limited by | |||
max_results. | |||
""" | |||
table = self.h5file.getNode(path) | |||
table = self.data.getnode(path) | |||
stream_id = self._stream_id(path) | |||
intervals = self._get_intervals(stream_id) | |||
requested = Interval(start or 0, end or 1e12) | |||
@@ -28,9 +28,13 @@ class TestLayouts(object): | |||
# Some nilmdb.layout tests. Not complete, just fills in missing | |||
# coverage. | |||
def test_layouts(self): | |||
x = nilmdb.layout.get_named("PrepData").description() | |||
y = nilmdb.layout.get_named("float32_8").description() | |||
eq_(repr(x), repr(y)) | |||
x = nilmdb.layout.get_named("PrepData") | |||
y = nilmdb.layout.get_named("float32_8") | |||
eq_(x.count, y.count) | |||
eq_(x.datatype, y.datatype) | |||
y = nilmdb.layout.get_named("float32_7") | |||
ne_(x.count, y.count) | |||
eq_(x.datatype, y.datatype) | |||
def test_parsing(self): | |||
self.real_t_parsing("PrepData", "RawData", "RawNotchedData") | |||
@@ -70,12 +70,14 @@ class Test00Nilmdb(object): # named 00 so it runs first | |||
eq_(db.stream_list(layout="RawData"), [ ["/newton/raw", "RawData"] ]) | |||
eq_(db.stream_list(path="/newton/raw"), [ ["/newton/raw", "RawData"] ]) | |||
# Verify that columns were made right | |||
eq_(len(db.h5file.getNode("/newton/prep").cols), 9) | |||
eq_(len(db.h5file.getNode("/newton/raw").cols), 7) | |||
eq_(len(db.h5file.getNode("/newton/zzz/rawnotch").cols), 10) | |||
assert(not db.h5file.getNode("/newton/prep").colindexed["timestamp"]) | |||
assert(not db.h5file.getNode("/newton/prep").colindexed["c1"]) | |||
# Verify that columns were made right (pytables specific) | |||
if "h5file" in db.data.__dict__: | |||
h5file = db.data.h5file | |||
eq_(len(h5file.getNode("/newton/prep").cols), 9) | |||
eq_(len(h5file.getNode("/newton/raw").cols), 7) | |||
eq_(len(h5file.getNode("/newton/zzz/rawnotch").cols), 10) | |||
assert(not h5file.getNode("/newton/prep").colindexed["timestamp"]) | |||
assert(not h5file.getNode("/newton/prep").colindexed["c1"]) | |||
# Set / get metadata | |||
eq_(db.stream_get_metadata("/newton/prep"), {}) | |||