|
- # Fixed record size bulk data storage
-
- from __future__ import absolute_import
- import nilmdb
- from nilmdb.printf import *
-
- import tables
- import os
- import sys
- import re
-
- class BulkData(object):
- def __init__(self, basepath):
- self.basepath = basepath
- self.root = self.basepath + "/data"
-
- # Make root path
- if not os.path.isdir(self.root):
- os.mkdir(self.root)
-
- self.opened = True
- self.tablecache = {}
-
- def __del__(self):
- if "opened" in self.__dict__: # pragma: no cover
- fprintf(sys.stderr,
- "error: BulkData.close() wasn't called, path %s",
- self.basepath)
-
- def close(self):
- for table in self.tablecache.values():
- table.close()
- del self.opened
-
- def create(self, path, layout_name):
- """
- path: path to the data (e.g. '/newton/prep').
- Paths must contain at least two elements, e.g.:
- /newton/prep
- /newton/raw
- /newton/upstairs/prep
- /newton/upstairs/raw
-
- layout_name: string for nilmdb.layout.get_named(), e.g. 'float32_8'
- """
- if path[0] != '/':
- raise ValueError("paths must start with /")
- [ group, node ] = path.rsplit("/", 1)
- if group == '':
- raise ValueError("invalid path")
-
- # Get layout, and build format string for struct module
- try:
- layout = nilmdb.layout.get_named(layout_name)
- desc = '<d' # Little endian, double timestamp
- mapping = {
- "int8": 'b',
- "uint8": 'B',
- "int16": 'h',
- "uint16": 'H',
- "int32": 'i',
- "uint32": 'I',
- "int64": 'q',
- "uint64": 'Q',
- "float32": 'f',
- "float64": 'd',
- }
- for n in range(layout.count):
- desc += mapping[layout.datatype]
- except KeyError:
- raise ValueError("no such layout, or bad data types")
-
- ## XXXX TODO: Fix inside this "try:" so for
- # path /foo/bar, we can't create /foo/bar/baz:
-
- # Create the table
- try:
- # Create path
- if os.path.isdir(self.root + path):
- raise OSError()
- os.makedirs(self.root + path)
-
- # Write format string to file
- with open(self.root + path + "/format", "w") as f:
- f.write(desc + "\n")
- except OSError:
- raise ValueError("error creating table at that path")
-
- # Open and cache it
- self.getnode(path)
-
- # Success
- return
-
- def destroy(self, path):
- """Fully remove all data at a particular path. No way to undo
- it! The group/path structure is removed, too."""
- # Delete the data node, and all parent nodes (if they have no
- # remaining children)
-
- ### XXX TODO: Remove path recursively, then try to rmdir on all parents
- ### up to self.root or until we hit an error, whichever is first.
- split_path = path.lstrip('/').split("/")
- while split_path:
- name = split_path.pop()
- where = "/" + "/".join(split_path)
- try:
- self.h5file.removeNode(where, name, recursive = False)
- except tables.NodeError:
- break
-
- def getnode(self, path):
- if path not in self.tablecache:
- self.tablecache[path] = Table(self.h5file.getNode(path))
- return self.tablecache[path]
-
- class Table(object):
- """Tools to help access a single table (data at a specific path)"""
-
- def __init__(self, table):
- self.table = table
-
- def close(self):
- pass
-
- @property
- def nrows(self):
- """Return number of rows total"""
- return int(self.table.nrows)
-
- def append(self, data):
- """Append the data and flush it to disk.
- data is a nested Python list [[row],[row],[...]]"""
- self.table.append(data)
- self.table.flush()
-
- def __getitem__(self, val):
- """Needs to support simple indexing (table[n]) and
- range slices (table[n:m]). Returns a nested Python
- list [[row],[row],[...]]"""
- return self.table.__getitem__(val)
-
- class TimestampOnlyTable(object):
- """Helper that lets us pass a Tables object into bisect, by
- returning only the timestamp when a particular row is requested."""
- def __init__(self, table):
- self.table = table
- def __getitem__(self, index):
- return self.table[index][0]
|