|
- # Fixed record size bulk data storage
-
- from __future__ import absolute_import
- from __future__ import division
- import nilmdb
- from nilmdb.utils.printf import *
-
- import os
- import sys
- import cPickle as pickle
- import struct
- import fnmatch
- import mmap
-
- # Up to 256 open file descriptors at any given time
- table_cache_size = 16
- fd_cache_size = 16
-
- @nilmdb.utils.must_close()
- class BulkData(object):
- def __init__(self, basepath):
- self.basepath = basepath
- self.root = os.path.join(self.basepath, "data")
-
- # Make root path
- if not os.path.isdir(self.root):
- os.mkdir(self.root)
-
- def close(self):
- self.getnode.cache_remove_all()
-
- def create(self, path, layout_name):
- """
- path: path to the data (e.g. '/newton/prep').
- Paths must contain at least two elements, e.g.:
- /newton/prep
- /newton/raw
- /newton/upstairs/prep
- /newton/upstairs/raw
-
- layout_name: string for nilmdb.layout.get_named(), e.g. 'float32_8'
- """
- if path[0] != '/':
- raise ValueError("paths must start with /")
- [ group, node ] = path.rsplit("/", 1)
- if group == '':
- raise ValueError("invalid path")
-
- # Get layout, and build format string for struct module
- try:
- layout = nilmdb.layout.get_named(layout_name)
- struct_fmt = '<d' # Little endian, double timestamp
- struct_mapping = {
- "int8": 'b',
- "uint8": 'B',
- "int16": 'h',
- "uint16": 'H',
- "int32": 'i',
- "uint32": 'I',
- "int64": 'q',
- "uint64": 'Q',
- "float32": 'f',
- "float64": 'd',
- }
- for n in range(layout.count):
- struct_fmt += struct_mapping[layout.datatype]
- except KeyError:
- raise ValueError("no such layout, or bad data types")
-
- # Create the table. Note that we make a distinction here
- # between NilmDB paths (always Unix style, split apart
- # manually) and OS paths (built up with os.path.join)
- try:
- # Make directories leading up to this one
- elements = path.lstrip('/').split('/')
- for i in range(len(elements)):
- ospath = os.path.join(self.root, *elements[0:i])
- if Table.exists(ospath):
- raise ValueError("path is subdir of existing node")
- if not os.path.isdir(ospath):
- os.mkdir(ospath)
-
- # Make the final dir
- ospath = os.path.join(self.root, *elements)
- if os.path.isdir(ospath):
- raise ValueError("subdirs of this path already exist")
- os.mkdir(ospath)
-
- # Write format string to file
- Table.create(ospath, struct_fmt)
- except OSError as e:
- raise ValueError("error creating table at that path: " + e.strerror)
-
- # Open and cache it
- self.getnode(path)
-
- # Success
- return
-
- def destroy(self, path):
- """Fully remove all data at a particular path. No way to undo
- it! The group/path structure is removed, too."""
-
- # Get OS path
- elements = path.lstrip('/').split('/')
- ospath = os.path.join(self.root, *elements)
-
- # Remove Table object from cache
- self.getnode.cache_remove(self, ospath)
-
- # Remove the contents of the target directory
- if not os.path.isfile(os.path.join(ospath, "format")):
- raise ValueError("nothing at that path")
- for file in os.listdir(ospath):
- os.remove(os.path.join(ospath, file))
-
- # Remove empty parent directories
- for i in reversed(range(len(elements))):
- ospath = os.path.join(self.root, *elements[0:i])
- try:
- os.rmdir(ospath)
- except OSError:
- pass
-
- # Cache open tables
- @nilmdb.utils.lru_cache(size = table_cache_size,
- onremove = lambda x: x.close())
- def getnode(self, path):
- """Return a Table object corresponding to the given database
- path, which must exist."""
- elements = path.lstrip('/').split('/')
- ospath = os.path.join(self.root, *elements)
- return Table(ospath)
-
- @nilmdb.utils.must_close()
- class Table(object):
- """Tools to help access a single table (data at a specific OS path)"""
-
- # Class methods, to help keep format details in this class.
- @classmethod
- def exists(cls, root):
- """Return True if a table appears to exist at this OS path"""
- return os.path.isfile(os.path.join(root, "format"))
-
- @classmethod
- def create(cls, root, struct_fmt):
- """Initialize a table at the given OS path.
- 'struct_fmt' is a Struct module format description"""
- format = { "rows_per_file": 4 * 1024 * 1024,
- "struct_fmt": struct_fmt }
- with open(os.path.join(root, "format"), "wb") as f:
- pickle.dump(format, f, 2)
-
- # Normal methods
- def __init__(self, root):
- """'root' is the full OS path to the directory of this table"""
- self.root = root
-
- # Load the format and build packer
- with open(self._fullpath("format"), "rb") as f:
- format = pickle.load(f)
- self.rows_per_file = format["rows_per_file"]
- self.packer = struct.Struct(format["struct_fmt"])
- self.file_size = self.packer.size * self.rows_per_file
-
- # Find nrows by locating the lexicographically last filename
- # and using its size.
- pattern = '[0-9a-f]' * 8
- allfiles = fnmatch.filter(os.listdir(self.root), pattern)
- if allfiles:
- filename = max(allfiles)
- offset = os.path.getsize(self._fullpath(filename))
- self.nrows = self._row_from_fnoffset(filename, offset)
- else:
- self.nrows = 0
-
- def close(self):
- self.mmap_open.cache_remove_all()
-
- # Internal helpers
- def _fullpath(self, filename):
- return os.path.join(self.root, filename)
-
- def _fnoffset_from_row(self, row):
- """Return a (filename, offset, count) tuple:
-
- filename: the filename that contains the specified row
- offset: byte offset of the specified row within the file
- count: number of rows (starting at offste) that fit in the file
- """
- filenum = row // self.rows_per_file
- filename = sprintf("%08x", filenum)
- offset = (row % self.rows_per_file) * self.packer.size
- count = self.rows_per_file - (row % self.rows_per_file)
- return (filename, offset, count)
-
- def _row_from_fnoffset(self, filename, offset):
- """Return the row number that corresponds to the given
- filename and byte-offset within that file."""
- filenum = int(filename, 16)
- if (offset % self.packer.size) != 0:
- raise ValueError("file offset is not a multiple of data size")
- row = (filenum * self.rows_per_file) + (offset // self.packer.size)
- return row
-
- # Cache open files
- @nilmdb.utils.lru_cache(size = fd_cache_size,
- onremove = lambda x: x.close())
- def mmap_open(self, file, newsize = None):
- """Open and map a given filename (relative to self.root).
- Will be automatically closed when evicted from the cache.
-
- If 'newsize' is provided, the file is truncated to the given
- size before the mapping is returned. (Note that the LRU cache
- on this function means the truncate will only happen if the
- object isn't already cached; mmap.resize should be used too)"""
- f = open(os.path.join(self.root, file), "a+", 0)
- if newsize is not None:
- # mmap can't map a zero-length file, so this allows the
- # caller to set the filesize between file creation and
- # mmap.
- f.truncate(newsize)
- mm = mmap.mmap(f.fileno(), 0)
- return mm
-
- def append(self, data):
- """Append the data and flush it to disk.
- data is a nested Python list [[row],[row],[...]]"""
- remaining = len(data)
- dataiter = iter(data)
- while remaining:
- # See how many rows we can fit into the current file, and open it
- (filename, offset, count) = self._fnoffset_from_row(self.nrows)
- if count > remaining:
- count = remaining
- newsize = offset + count * self.packer.size
- mm = self.mmap_open(filename, newsize)
- mm.seek(offset)
-
- # Extend the file to the target length. We specified
- # newsize when opening, but that may have been ignored if
- # the mmap_open returned a cached object.
- mm.resize(newsize)
-
- # Write the data
- for i in range(count):
- row = dataiter.next()
- mm.write(self.packer.pack(*row))
- remaining -= count
- self.nrows += count
-
- def __getitem__(self, val):
- """Needs to support simple indexing (table[n]) and
- range slices (table[n:m]). Returns a nested Python
- list [[row],[row],[...]]"""
- ### TODO: actually read the data.
- return []
- raise NotImplementedError()
- return self.table.__getitem__(val)
-
- class TimestampOnlyTable(object):
- """Helper that lets us pass a Tables object into bisect, by
- returning only the timestamp when a particular row is requested."""
- def __init__(self, table):
- self.table = table
- def __getitem__(self, index):
- return self.table[index][0]
|