Now the goal is 128 MiB files, rather than a specific length.tags/nilmdb-0.1
@@ -1 +1 @@ | |||
- Merge adjacent intervals on insert (maybe with client help?) | |||
- Change BulkData filenames to a 2-layer structure (0000/0000) |
@@ -12,22 +12,23 @@ import struct | |||
import fnmatch | |||
import mmap | |||
# Up to 256 open file descriptors at any given time | |||
# Up to 256 open file descriptors at any given time. | |||
# Global to this module so they can be used in the decorator arguments. | |||
table_cache_size = 16 | |||
fd_cache_size = 16 | |||
# uint16_6 (raw) files will be 40 MiB each | |||
# float32_8 (prep) files will be 320 MiB each | |||
# float64_16 files would be be 1088 MiB each | |||
# At 8000 Hz, this is 30k files per year. | |||
default_rows_per_file = 8 * 1024 * 1024 | |||
@nilmdb.utils.must_close() | |||
class BulkData(object): | |||
def __init__(self, basepath): | |||
def __init__(self, basepath, | |||
# Default to approximately 128 MiB per file | |||
default_file_size = 128 * 1024 * 1024, | |||
): | |||
self.basepath = basepath | |||
self.root = os.path.join(self.basepath, "data") | |||
# Tuneables | |||
self.default_file_size = default_file_size | |||
# Make root path | |||
if not os.path.isdir(self.root): | |||
os.mkdir(self.root) | |||
@@ -104,7 +105,7 @@ class BulkData(object): | |||
os.mkdir(ospath) | |||
# Write format string to file | |||
Table.create(ospath, struct_fmt) | |||
Table.create(ospath, struct_fmt, self.default_file_size) | |||
# Open and cache it | |||
self.getnode(unicodepath) | |||
@@ -122,7 +123,7 @@ class BulkData(object): | |||
ospath = os.path.join(self.root, *elements) | |||
# Remove Table object from cache | |||
self.getnode.cache_remove(self, ospath) | |||
self.getnode.cache_remove(self, unicodepath) | |||
# Remove the contents of the target directory | |||
table_exists = os.path.isfile(os.path.join(ospath, "format")) | |||
@@ -161,10 +162,16 @@ class Table(object): | |||
return os.path.isfile(os.path.join(root, "format")) | |||
@classmethod | |||
def create(cls, root, struct_fmt): | |||
def create(cls, root, struct_fmt, file_size): | |||
"""Initialize a table at the given OS path. | |||
'struct_fmt' is a Struct module format description""" | |||
format = { "rows_per_file": default_rows_per_file, | |||
# Calculate rows per file so that each file is approximately | |||
# file_size bytes. | |||
packer = struct.Struct(struct_fmt) | |||
rows_per_file = max(file_size // packer.size, 1) | |||
format = { "rows_per_file": rows_per_file, | |||
"struct_fmt": struct_fmt } | |||
with open(os.path.join(root, "format"), "wb") as f: | |||
pickle.dump(format, f, 2) | |||
@@ -92,7 +92,8 @@ class OverlapError(NilmDBError): | |||
class NilmDB(object): | |||
verbose = 0 | |||
def __init__(self, basepath, sync=True, max_results=None): | |||
def __init__(self, basepath, sync=True, max_results=None, | |||
bulkdata_args={}): | |||
# set up path | |||
self.basepath = os.path.abspath(basepath) | |||
@@ -104,7 +105,7 @@ class NilmDB(object): | |||
raise IOError("can't create tree " + self.basepath) | |||
# Our data goes inside it | |||
self.data = bulkdata.BulkData(self.basepath) | |||
self.data = bulkdata.BulkData(self.basepath, **bulkdata_args) | |||
# SQLite database too | |||
sqlfilename = os.path.join(self.basepath, "data.sql") | |||
@@ -341,7 +342,7 @@ class NilmDB(object): | |||
No way to undo it! Metadata is removed.""" | |||
stream_id = self._stream_id(path) | |||
# Delete the cached interval data | |||
# Delete the cached interval data (if it was cached) | |||
self._get_intervals.cache_remove(self, stream_id) | |||
# Delete the data | |||
@@ -22,10 +22,12 @@ from test_helpers import * | |||
testdb = "tests/cmdline-testdb" | |||
def server_start(max_results = None): | |||
def server_start(max_results = None, bulkdata_args = {}): | |||
global test_server, test_db | |||
# Start web app on a custom port | |||
test_db = nilmdb.NilmDB(testdb, sync = False, max_results = max_results) | |||
test_db = nilmdb.NilmDB(testdb, sync = False, | |||
max_results = max_results, | |||
bulkdata_args = bulkdata_args) | |||
test_server = nilmdb.Server(test_db, host = "127.0.0.1", | |||
port = 12380, stoppable = False, | |||
fast_shutdown = True, | |||
@@ -540,3 +542,52 @@ class TestCmdline(object): | |||
self.ok(u"metadata /düsseldorf/raw --update 'α=β ε τ α'") | |||
self.ok(u"metadata /düsseldorf/raw") | |||
self.match(u"α=β ε τ α\nγ=δ\n") | |||
self.ok(u"destroy /düsseldorf/raw") | |||
def test_cmdline_12_files(self): | |||
# Test BulkData's ability to split into multiple files, | |||
# by forcing the file size to be really small. | |||
server_stop() | |||
server_start(bulkdata_args = { "default_file_size" : 1000 }) | |||
# Fill data | |||
self.ok("create /newton/prep float32_8") | |||
os.environ['TZ'] = "UTC" | |||
with open("tests/data/prep-20120323T1004-timestamped") as input: | |||
self.ok("insert --none /newton/prep", input) | |||
# Extract it | |||
self.ok("extract /newton/prep --start '2000-01-01' " + | |||
"--end '2012-03-23 10:04:01'") | |||
lines_(self.captured, 120) | |||
self.ok("extract /newton/prep --start '2000-01-01' " + | |||
"--end '2022-03-23 10:04:01'") | |||
lines_(self.captured, 14400) | |||
# Make sure there were lots of files generated in the database | |||
# dir | |||
nfiles = 0 | |||
for (dirpath, dirnames, filenames) in os.walk(testdb): | |||
nfiles += len(filenames) | |||
assert(nfiles > 500) | |||
# Make sure we can restart the server with a different file | |||
# size and have it still work | |||
server_stop() | |||
server_start() | |||
self.ok("extract /newton/prep --start '2000-01-01' " + | |||
"--end '2022-03-23 10:04:01'") | |||
lines_(self.captured, 14400) | |||
# Now recreate the data one more time and make sure there are | |||
# fewer files. | |||
self.ok("destroy /newton/prep") | |||
self.ok("create /newton/prep float32_8") | |||
os.environ['TZ'] = "UTC" | |||
with open("tests/data/prep-20120323T1004-timestamped") as input: | |||
self.ok("insert --none /newton/prep", input) | |||
nfiles = 0 | |||
for (dirpath, dirnames, filenames) in os.walk(testdb): | |||
nfiles += len(filenames) | |||
assert(nfiles < 50) |