Browse Source

More work towards bulkdata

Jim Paris 9 years ago
2 changed files with 84 additions and 36 deletions
  1. +83
  2. +1

+ 83
- 35
nilmdb/ View File

@@ -1,6 +1,7 @@
# Fixed record size bulk data storage

from __future__ import absolute_import
from __future__ import division
import nilmdb
from nilmdb.utils.printf import *

@@ -8,11 +9,13 @@ import os
import sys
import cPickle as pickle
import struct
import fnmatch

# Up to 256 open file descriptors at any given time
table_cache_size = 16
fd_cache_size = 16

class BulkData(object):
def __init__(self, basepath):
self.basepath = basepath
@@ -22,17 +25,8 @@ class BulkData(object):
if not os.path.isdir(self.root):

self.opened = True

def __del__(self):
if "opened" in self.__dict__: # pragma: no cover
"error: BulkData.close() wasn't called, path %s",

def close(self):
del self.opened

def create(self, path, layout_name):
@@ -54,8 +48,8 @@ class BulkData(object):
# Get layout, and build format string for struct module
layout = nilmdb.layout.get_named(layout_name)
desc = '<d' # Little endian, double timestamp
struct_format_mapping = {
struct_fmt = '<d' # Little endian, double timestamp
struct_mapping = {
"int8": 'b',
"uint8": 'B',
"int16": 'h',
@@ -68,7 +62,7 @@ class BulkData(object):
"float64": 'd',
for n in range(layout.count):
desc += struct_format_mapping[layout.datatype]
struct_fmt += struct_mapping[layout.datatype]
except KeyError:
raise ValueError("no such layout, or bad data types")

@@ -80,7 +74,7 @@ class BulkData(object):
elements = path.lstrip('/').split('/')
for i in range(len(elements)):
ospath = os.path.join(self.root, *elements[0:i])
if os.path.isfile(os.path.join(ospath, "format")):
if Table.exists(ospath):
raise ValueError("path is subdir of existing node")
if not os.path.isdir(ospath):
@@ -92,8 +86,7 @@ class BulkData(object):

# Write format string to file
with open(os.path.join(ospath, "format"), "wb") as f:
pickle.dump(desc, f, 2)
Table.create(ospath, struct_fmt)
except OSError as e:
raise ValueError("error creating table at that path: " + e.strerror)

@@ -138,42 +131,96 @@ class BulkData(object):
ospath = os.path.join(self.root, *elements)
return Table(ospath)

class Table(object):
"""Tools to help access a single table (data at a specific path)"""

"""Tools to help access a single table (data at a specific OS path)"""

# Class methods, to help keep format details in this class.
def exists(cls, root):
"""Return True if a table appears to exist at this OS path"""
return os.path.isfile(os.path.join(root, "format"))

def create(cls, root, struct_fmt):
"""Initialize a table at the given OS path.
'struct_fmt' is a Struct module format description"""
format = { "rows_per_file": 4 * 1024 * 1024,
"struct_fmt": struct_fmt }
with open(os.path.join(root, "format"), "wb") as f:
pickle.dump(format, f, 2)

# Normal methods
def __init__(self, root):
"""'root' is the full OS path to the directory of this table"""
self.root = root

# Load the format and build packer
with open(os.path.join(self.root, "format"), "rb") as f:
desc = pickle.load(f)
self.packer = struct.Struct(desc)
with open(self._fullpath("format"), "rb") as f:
format = pickle.load(f)
self.rows_per_file = format["rows_per_file"]
self.packer = struct.Struct(format["struct_fmt"])
self.file_size = self.packer.size * self.rows_per_file

# Find nrows by locating the lexicographically last filename
# and using its size.
pattern = '[0-9a-f]' * 8
allfiles = fnmatch.filter(os.listdir(self.root), pattern)
if allfiles:
filename = max(allfiles)
offset = os.path.getsize(self._fullpath(filename))
self.nrows = self._row_from_fnoffset(filename, offset)
self.nrows = 0

def close(self):

# Internal helpers
def _fullpath(self, filename):
return os.path.join(self.root, filename)

def _fnoffset_from_row(self, row):
"""Return a (filename, offset, count) tuple:

filename: the filename that contains the specified row
offset: byte offset of the specified row within the file
count: number of rows (starting at offste) that fit in the file
filenum = row // self.rows_per_file
filename = sprintf("%08x", filenum)
offset = (row % self.rows_per_file) * self.packer.size
count = self.rows_per_file - (row % self.rows_per_file)
return (filename, offset, count)

def _row_from_fnoffset(self, filename, offset):
"""Return the row number that corresponds to the given
filename and byte-offset within that file."""
filenum = int(filename, 16)
if (offset % self.packer.size) != 0:
raise ValueError("file offset is not a multiple of data size")
row = (filenum * self.rows_per_file) + (offset // self.packer.size)
return row

# Cache open files
@nilmdb.utils.lru_cache(size = fd_cache_size,
onremove = lambda x: self.mmap_close(x))
def mmap_open(self, file):

def mmap_close(self, fd):

def close(self):
"""Open and map a given filename (relative to self.root)"""
f = open(os.path.join(self.root, file), "a+", 0), 2)
mm = mmap.mmap(f.fileno(), f.tell())
return mm

def nrows(self):
"""Return number of rows total"""
return 0
raise NotImplementedError()
return int(self.table.nrows)
def mmap_close(self, mm):
"""Close a mmap object"""

def append(self, data):
"""Append the data and flush it to disk.
data is a nested Python list [[row],[row],[...]]"""
(filename, offset, count) = self._fnoffset_from_row(self.nrows)
### TODO: resize mmap, actually write the data
return 0
raise NotImplementedError()
@@ -183,6 +230,7 @@ class Table(object):
"""Needs to support simple indexing (table[n]) and
range slices (table[n:m]). Returns a nested Python
list [[row],[row],[...]]"""
### TODO: actually read the data.
return []
raise NotImplementedError()
return self.table.__getitem__(val)

+ 1
- 1
setup.cfg View File

@@ -2,7 +2,7 @@
# note: the value doesn't matter, that's why they're empty here
nologcapture= # comment to see cherrypy logs on failure