|
- # Fixed record size bulk data storage
-
- # Need absolute_import so that "import nilmdb" won't pull in
- # nilmdb.py, but will pull the parent nilmdb module instead.
- from __future__ import absolute_import
- from __future__ import division
- import nilmdb
- from nilmdb.utils.printf import *
-
- import os
- import cPickle as pickle
- import struct
- import mmap
- import re
-
- # Up to 256 open file descriptors at any given time.
- # These variables are global so they can be used in the decorator arguments.
- table_cache_size = 16
- fd_cache_size = 16
-
- @nilmdb.utils.must_close(wrap_verify = True)
- class BulkData(object):
- def __init__(self, basepath, **kwargs):
- self.basepath = basepath
- self.root = os.path.join(self.basepath, "data")
-
- # Tuneables
- if "file_size" in kwargs:
- self.file_size = kwargs["file_size"]
- else:
- # Default to approximately 128 MiB per file
- self.file_size = 128 * 1024 * 1024
-
- if "files_per_dir" in kwargs:
- self.files_per_dir = kwargs["files_per_dir"]
- else:
- # 32768 files per dir should work even on FAT32
- self.files_per_dir = 32768
-
- # Make root path
- if not os.path.isdir(self.root):
- os.mkdir(self.root)
-
- def close(self):
- self.getnode.cache_remove_all()
-
- def _encode_filename(self, path):
- # Encode all paths to UTF-8, regardless of sys.getfilesystemencoding(),
- # because we want to be able to represent all code points and the user
- # will never be directly exposed to filenames. We can then do path
- # manipulations on the UTF-8 directly.
- if isinstance(path, unicode):
- return path.encode('utf-8')
- return path
-
- def create(self, unicodepath, layout_name):
- """
- unicodepath: path to the data (e.g. u'/newton/prep').
- Paths must contain at least two elements, e.g.:
- /newton/prep
- /newton/raw
- /newton/upstairs/prep
- /newton/upstairs/raw
-
- layout_name: string for nilmdb.layout.get_named(), e.g. 'float32_8'
- """
- path = self._encode_filename(unicodepath)
-
- if path[0] != '/':
- raise ValueError("paths must start with /")
- [ group, node ] = path.rsplit("/", 1)
- if group == '':
- raise ValueError("invalid path; path must contain at least one "
- "folder")
-
- # Get layout, and build format string for struct module
- try:
- layout = nilmdb.server.layout.get_named(layout_name)
- struct_fmt = '<d' # Little endian, double timestamp
- struct_mapping = {
- "int8": 'b',
- "uint8": 'B',
- "int16": 'h',
- "uint16": 'H',
- "int32": 'i',
- "uint32": 'I',
- "int64": 'q',
- "uint64": 'Q',
- "float32": 'f',
- "float64": 'd',
- }
- struct_fmt += struct_mapping[layout.datatype] * layout.count
- except KeyError:
- raise ValueError("no such layout, or bad data types")
-
- # Create the table. Note that we make a distinction here
- # between NilmDB paths (always Unix style, split apart
- # manually) and OS paths (built up with os.path.join)
-
- # Make directories leading up to this one
- elements = path.lstrip('/').split('/')
- for i in range(len(elements)):
- ospath = os.path.join(self.root, *elements[0:i])
- if Table.exists(ospath):
- raise ValueError("path is subdir of existing node")
- if not os.path.isdir(ospath):
- os.mkdir(ospath)
-
- # Make the final dir
- ospath = os.path.join(self.root, *elements)
- if os.path.isdir(ospath):
- raise ValueError("subdirs of this path already exist")
- os.mkdir(ospath)
-
- # Write format string to file
- Table.create(ospath, struct_fmt, self.file_size, self.files_per_dir)
-
- # Open and cache it
- self.getnode(unicodepath)
-
- # Success
- return
-
- def destroy(self, unicodepath):
- """Fully remove all data at a particular path. No way to undo
- it! The group/path structure is removed, too."""
- path = self._encode_filename(unicodepath)
-
- # Get OS path
- elements = path.lstrip('/').split('/')
- ospath = os.path.join(self.root, *elements)
-
- # Remove Table object from cache
- self.getnode.cache_remove(self, unicodepath)
-
- # Remove the contents of the target directory
- if not Table.exists(ospath):
- raise ValueError("nothing at that path")
- for (root, dirs, files) in os.walk(ospath, topdown = False):
- for name in files:
- os.remove(os.path.join(root, name))
- for name in dirs:
- os.rmdir(os.path.join(root, name))
-
- # Remove empty parent directories
- for i in reversed(range(len(elements))):
- ospath = os.path.join(self.root, *elements[0:i+1])
- try:
- os.rmdir(ospath)
- except OSError:
- break
-
- # Cache open tables
- @nilmdb.utils.lru_cache(size = table_cache_size,
- onremove = lambda x: x.close())
- def getnode(self, unicodepath):
- """Return a Table object corresponding to the given database
- path, which must exist."""
- path = self._encode_filename(unicodepath)
- elements = path.lstrip('/').split('/')
- ospath = os.path.join(self.root, *elements)
- return Table(ospath)
-
- @nilmdb.utils.must_close(wrap_verify = True)
- class Table(object):
- """Tools to help access a single table (data at a specific OS path)."""
- # See design.md for design details
-
- # Class methods, to help keep format details in this class.
- @classmethod
- def exists(cls, root):
- """Return True if a table appears to exist at this OS path"""
- return os.path.isfile(os.path.join(root, "_format"))
-
- @classmethod
- def create(cls, root, struct_fmt, file_size, files_per_dir):
- """Initialize a table at the given OS path.
- 'struct_fmt' is a Struct module format description"""
-
- # Calculate rows per file so that each file is approximately
- # file_size bytes.
- packer = struct.Struct(struct_fmt)
- rows_per_file = max(file_size // packer.size, 1)
-
- fmt = { "rows_per_file": rows_per_file,
- "files_per_dir": files_per_dir,
- "struct_fmt": struct_fmt,
- "version": 1 }
- with open(os.path.join(root, "_format"), "wb") as f:
- pickle.dump(fmt, f, 2)
-
- # Normal methods
- def __init__(self, root):
- """'root' is the full OS path to the directory of this table"""
- self.root = root
-
- # Load the format and build packer
- with open(os.path.join(self.root, "_format"), "rb") as f:
- fmt = pickle.load(f)
-
- if fmt["version"] != 1: # pragma: no cover (just future proofing)
- raise NotImplementedError("version " + fmt["version"] +
- " bulk data store not supported")
-
- self.rows_per_file = fmt["rows_per_file"]
- self.files_per_dir = fmt["files_per_dir"]
- self.packer = struct.Struct(fmt["struct_fmt"])
- self.file_size = self.packer.size * self.rows_per_file
-
- # Find nrows
- self.nrows = self._get_nrows()
-
- def close(self):
- self.mmap_open.cache_remove_all()
-
- # Internal helpers
- def _get_nrows(self):
- """Find nrows by locating the lexicographically last filename
- and using its size"""
- # Note that this just finds a 'nrows' that is guaranteed to be
- # greater than the row number of any piece of data that
- # currently exists, not necessarily all data that _ever_
- # existed.
- regex = re.compile("^[0-9a-f]{4,}$")
-
- # Find the last directory. We sort and loop through all of them,
- # starting with the numerically greatest, because the dirs could be
- # empty if something was deleted.
- subdirs = sorted(filter(regex.search, os.listdir(self.root)),
- key = lambda x: int(x, 16), reverse = True)
-
- for subdir in subdirs:
- # Now find the last file in that dir
- path = os.path.join(self.root, subdir)
- files = filter(regex.search, os.listdir(path))
- if not files: # pragma: no cover (shouldn't occur)
- # Empty dir: try the next one
- continue
-
- # Find the numerical max
- filename = max(files, key = lambda x: int(x, 16))
- offset = os.path.getsize(os.path.join(self.root, subdir, filename))
-
- # Convert to row number
- return self._row_from_offset(subdir, filename, offset)
-
- # No files, so no data
- return 0
-
- def _offset_from_row(self, row):
- """Return a (subdir, filename, offset, count) tuple:
-
- subdir: subdirectory for the file
- filename: the filename that contains the specified row
- offset: byte offset of the specified row within the file
- count: number of rows (starting at offset) that fit in the file
- """
- filenum = row // self.rows_per_file
- # It's OK if these format specifiers are too short; the filenames
- # will just get longer but will still sort correctly.
- dirname = sprintf("%04x", filenum // self.files_per_dir)
- filename = sprintf("%04x", filenum % self.files_per_dir)
- offset = (row % self.rows_per_file) * self.packer.size
- count = self.rows_per_file - (row % self.rows_per_file)
- return (dirname, filename, offset, count)
-
- def _row_from_offset(self, subdir, filename, offset):
- """Return the row number that corresponds to the given
- 'subdir/filename' and byte-offset within that file."""
- if (offset % self.packer.size) != 0: # pragma: no cover; shouldn't occur
- raise ValueError("file offset is not a multiple of data size")
- filenum = int(subdir, 16) * self.files_per_dir + int(filename, 16)
- row = (filenum * self.rows_per_file) + (offset // self.packer.size)
- return row
-
- # Cache open files
- @nilmdb.utils.lru_cache(size = fd_cache_size,
- keys = slice(0, 3), # exclude newsize
- onremove = lambda x: x.close())
- def mmap_open(self, subdir, filename, newsize = None):
- """Open and map a given 'subdir/filename' (relative to self.root).
- Will be automatically closed when evicted from the cache.
-
- If 'newsize' is provided, the file is truncated to the given
- size before the mapping is returned. (Note that the LRU cache
- on this function means the truncate will only happen if the
- object isn't already cached; mmap.resize should be used too.)"""
- try:
- os.mkdir(os.path.join(self.root, subdir))
- except OSError:
- pass
- f = open(os.path.join(self.root, subdir, filename), "a+", 0)
- if newsize is not None:
- # mmap can't map a zero-length file, so this allows the
- # caller to set the filesize between file creation and
- # mmap.
- f.truncate(newsize)
- mm = mmap.mmap(f.fileno(), 0)
- return mm
-
- def mmap_open_resize(self, subdir, filename, newsize):
- """Open and map a given 'subdir/filename' (relative to self.root).
- The file is resized to the given size."""
- # Pass new size to mmap_open
- mm = self.mmap_open(subdir, filename, newsize)
- # In case we got a cached copy, need to call mm.resize too.
- mm.resize(newsize)
- return mm
-
- def append(self, data):
- """Append the data and flush it to disk.
- data is a nested Python list [[row],[row],[...]]"""
- remaining = len(data)
- dataiter = iter(data)
- while remaining:
- # See how many rows we can fit into the current file, and open it
- (subdir, fname, offset, count) = self._offset_from_row(self.nrows)
- if count > remaining:
- count = remaining
- newsize = offset + count * self.packer.size
- mm = self.mmap_open_resize(subdir, fname, newsize)
- mm.seek(offset)
-
- # Write the data
- for i in xrange(count):
- row = dataiter.next()
- mm.write(self.packer.pack(*row))
- remaining -= count
- self.nrows += count
-
- def __getitem__(self, key):
- """Extract data and return it. Supports simple indexing
- (table[n]) and range slices (table[n:m]). Returns a nested
- Python list [[row],[row],[...]]"""
-
- # Handle simple slices
- if isinstance(key, slice):
- # Fall back to brute force if the slice isn't simple
- if ((key.step is not None and key.step != 1) or
- key.start is None or
- key.stop is None or
- key.start >= key.stop or
- key.start < 0 or
- key.stop > self.nrows):
- return [ self[x] for x in xrange(*key.indices(self.nrows)) ]
-
- ret = []
- row = key.start
- remaining = key.stop - key.start
- while remaining:
- (subdir, filename, offset, count) = self._offset_from_row(row)
- if count > remaining:
- count = remaining
- mm = self.mmap_open(subdir, filename)
- for i in xrange(count):
- ret.append(list(self.packer.unpack_from(mm, offset)))
- offset += self.packer.size
- remaining -= count
- row += count
- return ret
-
- # Handle single points
- if key < 0 or key >= self.nrows:
- raise IndexError("Index out of range")
- (subdir, filename, offset, count) = self._offset_from_row(key)
- mm = self.mmap_open(subdir, filename)
- # unpack_from ignores the mmap object's current seek position
- return list(self.packer.unpack_from(mm, offset))
-
- def _remove_rows(self, subdir, filename, start, stop):
- """Helper to mark specific rows as being removed from a
- file, and potentially removing or truncating the file itself."""
- # Import an existing list of deleted rows for this file
- datafile = os.path.join(self.root, subdir, filename)
- cachefile = datafile + ".removed"
- try:
- with open(cachefile, "rb") as f:
- ranges = pickle.load(f)
- cachefile_present = True
- except:
- ranges = []
- cachefile_present = False
-
- # Append our new range and sort
- ranges.append((start, stop))
- ranges.sort()
-
- # Merge adjacent ranges into "out"
- merged = []
- prev = None
- for new in ranges:
- if prev is None:
- # No previous range, so remember this one
- prev = new
- elif prev[1] == new[0]:
- # Previous range connected to this new one; extend prev
- prev = (prev[0], new[1])
- else:
- # Not connected; append previous and start again
- merged.append(prev)
- prev = new
- if prev is not None:
- merged.append(prev)
-
- # If the range covered the whole file, we can delete it now.
- # Note that the last file in a table may be only partially
- # full (smaller than self.rows_per_file). We purposely leave
- # those files around rather than deleting them, because the
- # remainder will be filled on a subsequent append(), and things
- # are generally easier if we don't have to special-case that.
- if (len(merged) == 1 and
- merged[0][0] == 0 and merged[0][1] == self.rows_per_file):
- # Close potentially open file in mmap_open LRU cache
- self.mmap_open.cache_remove(self, subdir, filename)
-
- # Delete files
- os.remove(datafile)
- if cachefile_present:
- os.remove(cachefile)
-
- # Try deleting subdir, too
- try:
- os.rmdir(os.path.join(self.root, subdir))
- except:
- pass
- else:
- # Update cache. Try to do it atomically.
- nilmdb.utils.atomic.replace_file(cachefile,
- pickle.dumps(merged, 2))
-
- def remove(self, start, stop):
- """Remove specified rows [start, stop) from this table.
-
- If a file is left empty, it is fully removed. Otherwise, a
- parallel data file is used to remember which rows have been
- removed, and the file is otherwise untouched."""
- if start < 0 or start > stop or stop > self.nrows:
- raise IndexError("Index out of range")
-
- row = start
- remaining = stop - start
- while remaining:
- # Loop through each file that we need to touch
- (subdir, filename, offset, count) = self._offset_from_row(row)
- if count > remaining:
- count = remaining
- row_offset = offset // self.packer.size
- # Mark the rows as being removed
- self._remove_rows(subdir, filename, row_offset, row_offset + count)
- remaining -= count
- row += count
-
- class TimestampOnlyTable(object):
- """Helper that lets us pass a Tables object into bisect, by
- returning only the timestamp when a particular row is requested."""
- def __init__(self, table):
- self.table = table
- def __getitem__(self, index):
- return self.table[index][0]
|