First pass at Python implementation of rocket

This commit is contained in:
Jim Paris 2013-03-03 13:27:47 -05:00
parent 9b6de6ecb7
commit 4406d51a98
4 changed files with 121 additions and 95 deletions

View File

@ -23,6 +23,7 @@ lint:
test: test:
ifeq ($(INSIDE_EMACS), t) ifeq ($(INSIDE_EMACS), t)
# Use the slightly more flexible script # Use the slightly more flexible script
python setup.py build_ext --inplace
python tests/runtests.py python tests/runtests.py
else else
# Let setup.py check dependencies, build stuff, and run the test # Let setup.py check dependencies, build stuff, and run the test

View File

@ -9,19 +9,9 @@ from nilmdb.utils.printf import *
import os import os
import cPickle as pickle import cPickle as pickle
import struct
import mmap
import re import re
# If we have the faulthandler module, use it. All of the mmap stuff from . import pyrocket as rocket
# might trigger a SIGSEGV or SIGBUS if we're not careful, and
# faulthandler will give a traceback in that case. (the Python
# interpreter will still die either way).
try: # pragma: no cover
import faulthandler
faulthandler.enable()
except: # pragma: no cover
pass
# Up to 256 open file descriptors at any given time. # Up to 256 open file descriptors at any given time.
# These variables are global so they can be used in the decorator arguments. # These variables are global so they can be used in the decorator arguments.
@ -83,26 +73,6 @@ class BulkData(object):
raise ValueError("invalid path; path must contain at least one " raise ValueError("invalid path; path must contain at least one "
"folder") "folder")
# Get layout, and build format string for struct module
try:
layout = nilmdb.server.layout.get_named(layout_name)
struct_fmt = '<d' # Little endian, double timestamp
struct_mapping = {
"int8": 'b',
"uint8": 'B',
"int16": 'h',
"uint16": 'H',
"int32": 'i',
"uint32": 'I',
"int64": 'q',
"uint64": 'Q',
"float32": 'f',
"float64": 'd',
}
struct_fmt += struct_mapping[layout.datatype] * layout.count
except KeyError:
raise ValueError("no such layout, or bad data types")
# Create the table. Note that we make a distinction here # Create the table. Note that we make a distinction here
# between NilmDB paths (always Unix style, split apart # between NilmDB paths (always Unix style, split apart
# manually) and OS paths (built up with os.path.join) # manually) and OS paths (built up with os.path.join)
@ -122,11 +92,16 @@ class BulkData(object):
raise ValueError("subdirs of this path already exist") raise ValueError("subdirs of this path already exist")
os.mkdir(ospath) os.mkdir(ospath)
# Write format string to file try:
Table.create(ospath, struct_fmt, self.file_size, self.files_per_dir) # Write format string to file
Table.create(ospath, layout_name, self.file_size,
self.files_per_dir)
# Open and cache it # Open and cache it
self.getnode(unicodepath) self.getnode(unicodepath)
except:
os.rmdir(ospath)
raise
# Success # Success
return return
@ -173,8 +148,8 @@ class BulkData(object):
@nilmdb.utils.must_close(wrap_verify = False) @nilmdb.utils.must_close(wrap_verify = False)
class File(object): class File(object):
"""Object representing a single file on disk. Data can be appended, """Object representing a single file on disk. Data can be appended
or the self.mmap handle can be used for random reads.""" or extracted using the rocket functions."""
def __init__(self, root, subdir, filename): def __init__(self, root, subdir, filename):
# Create path if it doesn't exist # Create path if it doesn't exist
@ -190,46 +165,22 @@ class File(object):
self._f.seek(0, 2) self._f.seek(0, 2)
self.size = self._f.tell() self.size = self._f.tell()
# Open mmap object
self.mmap = None
self._mmap_reopen()
def _mmap_reopen(self):
if self.size == 0:
# Don't mmap if the file is empty; it would fail
pass
elif self.mmap is None:
# Not opened yet, so open it
self.mmap = mmap.mmap(self._f.fileno(), 0)
else:
# Already opened, so just resize it
self.mmap.resize(self.size)
def close(self): def close(self):
if self.mmap is not None:
self.mmap.close()
self._f.close() self._f.close()
def append(self, data): # pragma: no cover (below version used instead) def append_rocket_iter(self, rocket, rows, dataiter):
# Write data, flush it, and resize our mmap accordingly
self._f.write(data)
self._f.flush()
self.size += len(data)
self._mmap_reopen()
def append_pack_iter(self, count, packer, dataiter):
# An optimized verison of append, to avoid flushing the file
# and resizing the mmap after each data point.
try: try:
rows = [] for i in xrange(rows):
for i in xrange(count): rocket.append_list(self._f, [dataiter.next()])
row = dataiter.next()
rows.append(packer(*row))
self._f.write("".join(rows))
finally: finally:
self._f.flush() self._f.flush()
self.size = self._f.tell() self.size = self._f.tell()
self._mmap_reopen()
def extract_rocket_list(self, rocket, offset, rows):
return rocket.extract_list(self._f, offset, rows)
def extract_rocket_string(self, rocket, offset, rows):
return rocket.extract_string(self._f, offset, rows)
@nilmdb.utils.must_close(wrap_verify = False) @nilmdb.utils.must_close(wrap_verify = False)
class Table(object): class Table(object):
@ -243,19 +194,19 @@ class Table(object):
return os.path.isfile(os.path.join(root, "_format")) return os.path.isfile(os.path.join(root, "_format"))
@classmethod @classmethod
def create(cls, root, struct_fmt, file_size, files_per_dir): def create(cls, root, layout, file_size, files_per_dir):
"""Initialize a table at the given OS path. """Initialize a table at the given OS path with the
'struct_fmt' is a Struct module format description""" given layout string"""
# Calculate rows per file so that each file is approximately # Calculate rows per file so that each file is approximately
# file_size bytes. # file_size bytes.
packer = struct.Struct(struct_fmt) rkt = rocket.Rocket(layout)
rows_per_file = max(file_size // packer.size, 1) rows_per_file = max(file_size // rkt.binary_size, 1)
fmt = { "rows_per_file": rows_per_file, fmt = { "rows_per_file": rows_per_file,
"files_per_dir": files_per_dir, "files_per_dir": files_per_dir,
"struct_fmt": struct_fmt, "layout": layout,
"version": 1 } "version": 2 }
with open(os.path.join(root, "_format"), "wb") as f: with open(os.path.join(root, "_format"), "wb") as f:
pickle.dump(fmt, f, 2) pickle.dump(fmt, f, 2)
@ -264,18 +215,18 @@ class Table(object):
"""'root' is the full OS path to the directory of this table""" """'root' is the full OS path to the directory of this table"""
self.root = root self.root = root
# Load the format and build packer # Load the format and build rocket
with open(os.path.join(self.root, "_format"), "rb") as f: with open(os.path.join(self.root, "_format"), "rb") as f:
fmt = pickle.load(f) fmt = pickle.load(f)
if fmt["version"] != 1: # pragma: no cover (just future proofing) if fmt["version"] != 2: # pragma: no cover (just future proofing)
raise NotImplementedError("version " + fmt["version"] + raise NotImplementedError("version " + fmt["version"] +
" bulk data store not supported") " bulk data store not supported")
self.rows_per_file = fmt["rows_per_file"] self.rows_per_file = fmt["rows_per_file"]
self.files_per_dir = fmt["files_per_dir"] self.files_per_dir = fmt["files_per_dir"]
self.packer = struct.Struct(fmt["struct_fmt"]) self.rocket = rocket.Rocket(fmt["layout"])
self.file_size = self.packer.size * self.rows_per_file self.file_size = self.rocket.binary_size * self.rows_per_file
# Find nrows # Find nrows
self.nrows = self._get_nrows() self.nrows = self._get_nrows()
@ -330,17 +281,19 @@ class Table(object):
# will just get longer but will still sort correctly. # will just get longer but will still sort correctly.
dirname = sprintf("%04x", filenum // self.files_per_dir) dirname = sprintf("%04x", filenum // self.files_per_dir)
filename = sprintf("%04x", filenum % self.files_per_dir) filename = sprintf("%04x", filenum % self.files_per_dir)
offset = (row % self.rows_per_file) * self.packer.size offset = (row % self.rows_per_file) * self.rocket.binary_size
count = self.rows_per_file - (row % self.rows_per_file) count = self.rows_per_file - (row % self.rows_per_file)
return (dirname, filename, offset, count) return (dirname, filename, offset, count)
def _row_from_offset(self, subdir, filename, offset): def _row_from_offset(self, subdir, filename, offset):
"""Return the row number that corresponds to the given """Return the row number that corresponds to the given
'subdir/filename' and byte-offset within that file.""" 'subdir/filename' and byte-offset within that file."""
if (offset % self.packer.size) != 0: # pragma: no cover; shouldn't occur if (offset % self.rocket.binary_size) != 0: # pragma: no cover
# shouldn't occur, unless there is some corruption somewhere
raise ValueError("file offset is not a multiple of data size") raise ValueError("file offset is not a multiple of data size")
filenum = int(subdir, 16) * self.files_per_dir + int(filename, 16) filenum = int(subdir, 16) * self.files_per_dir + int(filename, 16)
row = (filenum * self.rows_per_file) + (offset // self.packer.size) row = ((filenum * self.rows_per_file) +
(offset // self.rocket.binary_size))
return row return row
# Cache open files # Cache open files
@ -365,10 +318,33 @@ class Table(object):
f = self.file_open(subdir, fname) f = self.file_open(subdir, fname)
# Write the data # Write the data
f.append_pack_iter(count, self.packer.pack, dataiter) f.append_rocket_iter(self.rocket, count, dataiter)
remaining -= count remaining -= count
self.nrows += count self.nrows += count
def get_string(self, start, stop):
"""Extract data corresponding to Python range [n:m],
and returns a formatted string"""
if (start is None or
stop is None or
start >= stop or
start < 0 or
stop > self.nrows):
raise IndexError("Index out of range")
ret_chunks = []
row = start
remaining = stop - start
while remaining:
(subdir, filename, offset, count) = self._offset_from_row(row)
if count > remaining:
count = remaining
f = self.file_open(subdir, filename)
ret.append(f.extract_rocket_string(self.rocket, offset, count))
remaining -= count
row += count
return "".join(ret)
def __getitem__(self, key): def __getitem__(self, key):
"""Extract data and return it. Supports simple indexing """Extract data and return it. Supports simple indexing
(table[n]) and range slices (table[n:m]). Returns a nested (table[n]) and range slices (table[n:m]). Returns a nested
@ -392,10 +368,8 @@ class Table(object):
(subdir, filename, offset, count) = self._offset_from_row(row) (subdir, filename, offset, count) = self._offset_from_row(row)
if count > remaining: if count > remaining:
count = remaining count = remaining
mm = self.file_open(subdir, filename).mmap f = self.file_open(subdir, filename)
for i in xrange(count): ret.extend(f.extract_rocket_list(self.rocket, offset, count))
ret.append(list(self.packer.unpack_from(mm, offset)))
offset += self.packer.size
remaining -= count remaining -= count
row += count row += count
return ret return ret
@ -404,9 +378,8 @@ class Table(object):
if key < 0 or key >= self.nrows: if key < 0 or key >= self.nrows:
raise IndexError("Index out of range") raise IndexError("Index out of range")
(subdir, filename, offset, count) = self._offset_from_row(key) (subdir, filename, offset, count) = self._offset_from_row(key)
mm = self.file_open(subdir, filename).mmap f = self.file_open(subdir, filename)
# unpack_from ignores the mmap object's current seek position return f.extract_rocket_list(self.rocket, offset, 1)[0]
return list(self.packer.unpack_from(mm, offset))
def _remove_rows(self, subdir, filename, start, stop): def _remove_rows(self, subdir, filename, start, stop):
"""Helper to mark specific rows as being removed from a """Helper to mark specific rows as being removed from a
@ -485,7 +458,7 @@ class Table(object):
(subdir, filename, offset, count) = self._offset_from_row(row) (subdir, filename, offset, count) = self._offset_from_row(row)
if count > remaining: if count > remaining:
count = remaining count = remaining
row_offset = offset // self.packer.size row_offset = offset // self.rocket.binary_size
# Mark the rows as being removed # Mark the rows as being removed
self._remove_rows(subdir, filename, row_offset, row_offset + count) self._remove_rows(subdir, filename, row_offset, row_offset + count)
remaining -= count remaining -= count

54
nilmdb/server/pyrocket.py Normal file
View File

@ -0,0 +1,54 @@
# Python implementation of the "rocket" data parsing interface
import struct
class Rocket(object):
def __init__(self, layout):
self.layout = layout
# For packing/unpacking into a binary file.
# This will change in the C version
(self.ltype, lcount) = layout.split('_', 2)
self.lcount = int(lcount)
try:
struct_fmt = '<d' # Little endian, double timestamp
struct_mapping = {
"int8": 'b',
"uint8": 'B',
"int16": 'h',
"uint16": 'H',
"int32": 'i',
"uint32": 'I',
"int64": 'q',
"uint64": 'Q',
"float32": 'f',
"float64": 'd',
}
struct_fmt += struct_mapping[self.ltype] * self.lcount
except KeyError:
raise ValueError("no such layout, or bad data types")
self.packer = struct.Struct(struct_fmt)
@property
def binary_size(self):
"""Return size of one row of data in the binary file, in bytes"""
return self.packer.size
def append_list(self, file, data):
"""Append the list data to the file"""
for row in data:
file.write(self.packer.pack(*row))
file.flush()
def extract_list(self, file, offset, count):
"""Extract count rows of data from the file at offset offset.
Return a list of lists [[row],[row],...]"""
ret = []
file.seek(offset)
for i in xrange(count):
data = file.read(self.binary_size)
ret.append(list(self.packer.unpack(data)))
return ret
def extract_string(self, rocket, offset, rows):
pass

View File

@ -35,8 +35,6 @@ class TestBulkData(object):
data.create("/foo", "uint16_8") data.create("/foo", "uint16_8")
with assert_raises(ValueError): with assert_raises(ValueError):
data.create("foo/bar", "uint16_8") data.create("foo/bar", "uint16_8")
with assert_raises(ValueError):
data.create("/foo/bar", "uint8_8")
data.create("/foo/bar", "uint16_8") data.create("/foo/bar", "uint16_8")
data.create(u"/foo/baz/quux", "float64_16") data.create(u"/foo/baz/quux", "float64_16")
with assert_raises(ValueError): with assert_raises(ValueError):