@@ -16,6 +16,12 @@ import mmap | |||
table_cache_size = 16 | |||
fd_cache_size = 16 | |||
# uint16_6 (raw) files will be 40 MiB each | |||
# float32_8 (prep) files will be 320 MiB each | |||
# float64_16 files would be be 1088 MiB each | |||
# At 8000 Hz, this is 30k files per year. | |||
default_rows_per_file = 8 * 1024 * 1024 | |||
@nilmdb.utils.must_close() | |||
class BulkData(object): | |||
def __init__(self, basepath): | |||
@@ -81,26 +87,24 @@ class BulkData(object): | |||
# Create the table. Note that we make a distinction here | |||
# between NilmDB paths (always Unix style, split apart | |||
# manually) and OS paths (built up with os.path.join) | |||
try: | |||
# Make directories leading up to this one | |||
elements = path.lstrip('/').split('/') | |||
for i in range(len(elements)): | |||
ospath = os.path.join(self.root, *elements[0:i]) | |||
if Table.exists(ospath): | |||
raise ValueError("path is subdir of existing node") | |||
if not os.path.isdir(ospath): | |||
os.mkdir(ospath) | |||
# Make the final dir | |||
ospath = os.path.join(self.root, *elements) | |||
if os.path.isdir(ospath): | |||
raise ValueError("subdirs of this path already exist") | |||
os.mkdir(ospath) | |||
# Write format string to file | |||
Table.create(ospath, struct_fmt) | |||
except OSError as e: | |||
raise ValueError("error creating table at that path: " + e.strerror) | |||
# Make directories leading up to this one | |||
elements = path.lstrip('/').split('/') | |||
for i in range(len(elements)): | |||
ospath = os.path.join(self.root, *elements[0:i]) | |||
if Table.exists(ospath): | |||
raise ValueError("path is subdir of existing node") | |||
if not os.path.isdir(ospath): | |||
os.mkdir(ospath) | |||
# Make the final dir | |||
ospath = os.path.join(self.root, *elements) | |||
if os.path.isdir(ospath): | |||
raise ValueError("subdirs of this path already exist") | |||
os.mkdir(ospath) | |||
# Write format string to file | |||
Table.create(ospath, struct_fmt) | |||
# Open and cache it | |||
self.getnode(unicodepath) | |||
@@ -121,7 +125,8 @@ class BulkData(object): | |||
self.getnode.cache_remove(self, ospath) | |||
# Remove the contents of the target directory | |||
if not os.path.isfile(os.path.join(ospath, "format")): | |||
table_exists = os.path.isfile(os.path.join(ospath, "format")) | |||
if not table_exists: # pragma: no cover; only happens with corrupt fs | |||
raise ValueError("nothing at that path") | |||
for file in os.listdir(ospath): | |||
os.remove(os.path.join(ospath, file)) | |||
@@ -159,7 +164,7 @@ class Table(object): | |||
def create(cls, root, struct_fmt): | |||
"""Initialize a table at the given OS path. | |||
'struct_fmt' is a Struct module format description""" | |||
format = { "rows_per_file": 4 * 1024 * 1024, | |||
format = { "rows_per_file": default_rows_per_file, | |||
"struct_fmt": struct_fmt } | |||
with open(os.path.join(root, "format"), "wb") as f: | |||
pickle.dump(format, f, 2) | |||
@@ -211,7 +216,7 @@ class Table(object): | |||
"""Return the row number that corresponds to the given | |||
filename and byte-offset within that file.""" | |||
filenum = int(filename, 16) | |||
if (offset % self.packer.size) != 0: | |||
if (offset % self.packer.size) != 0: # pragma: no cover; shouldn't occur | |||
raise ValueError("file offset is not a multiple of data size") | |||
row = (filenum * self.rows_per_file) + (offset // self.packer.size) | |||
return row | |||
@@ -294,7 +299,7 @@ class Table(object): | |||
return ret | |||
# Handle single points | |||
if key < 0 or key >= self.nrows: | |||
if key < 0 or key >= self.nrows: # pragma: no cover (shouldn't occur) | |||
raise IndexError("Index out of range") | |||
(filename, offset, count) = self._fnoffset_from_row(key) | |||
mm = self.mmap_open(filename) | |||
@@ -51,17 +51,21 @@ def workaround_cp_bug_1200(func): # pragma: no cover (just a workaround) | |||
traceback.format_exc()) | |||
return wrapper | |||
def exception_to_httperror(response = "400 Bad Request"): | |||
"""Return a decorator that catches Exception and throws | |||
a HTTPError describing it instead""" | |||
def exception_to_httperror(*expected): | |||
"""Return a decorator that catches expected exceptions and throws | |||
a HTTPError describing it instead. This lets us still raise proper | |||
500 Internal Server Errors when it's something stranger.""" | |||
def decorator(func): | |||
@functools.wraps(func) | |||
def wrapper(*args, **kwargs): | |||
try: | |||
return func(*args, **kwargs) | |||
except (SyntaxError, OSError, IOError): | |||
# Raise these as usual | |||
raise | |||
except Exception as e: | |||
message = sprintf("%s: %s", type(e).__name__, str(e)) | |||
raise cherrypy.HTTPError(response, message) | |||
raise cherrypy.HTTPError("400 Bad Request", message) | |||
return wrapper | |||
return decorator | |||
@@ -118,7 +122,7 @@ class Stream(NilmApp): | |||
# /stream/create?path=/newton/prep&layout=PrepData | |||
@cherrypy.expose | |||
@cherrypy.tools.json_out() | |||
@exception_to_httperror() | |||
@exception_to_httperror(ValueError, nilmdb.nilmdb.NilmDBError) | |||
def create(self, path, layout): | |||
"""Create a new stream in the database. Provide path | |||
and one of the nilmdb.layout.layouts keys. | |||
@@ -17,6 +17,7 @@ import cStringIO | |||
import simplejson as json | |||
import unittest | |||
import warnings | |||
import resource | |||
from test_helpers import * | |||
@@ -69,7 +70,7 @@ class TestClient(object): | |||
eq_(distutils.version.StrictVersion(version), | |||
distutils.version.StrictVersion(test_server.version)) | |||
def test_client_2_nilmdb(self): | |||
def test_client_2_createlist(self): | |||
# Basic stream tests, like those in test_nilmdb:test_stream | |||
client = nilmdb.Client(url = "http://localhost:12380/") | |||
@@ -99,6 +100,20 @@ class TestClient(object): | |||
eq_(client.stream_list(layout="RawData"), [ ["/newton/raw", "RawData"] ]) | |||
eq_(client.stream_list(path="/newton/raw"), [ ["/newton/raw", "RawData"] ]) | |||
# Try messing with resource limits to trigger errors and get | |||
# more coverage. Here, make it so we can only create files 1 | |||
# byte in size, which will trigger an IOError in the server when | |||
# we create a table. | |||
limit = resource.getrlimit(resource.RLIMIT_FSIZE) | |||
resource.setrlimit(resource.RLIMIT_FSIZE, (1, limit[1])) | |||
with assert_raises(ServerError) as e: | |||
client.stream_create("/newton/hello", "RawData") | |||
resource.setrlimit(resource.RLIMIT_FSIZE, limit) | |||
def test_client_3_metadata(self): | |||
client = nilmdb.Client(url = "http://localhost:12380/") | |||
# Set / get metadata | |||
eq_(client.stream_get_metadata("/newton/prep"), {}) | |||
eq_(client.stream_get_metadata("/newton/raw"), {}) | |||
@@ -128,7 +143,7 @@ class TestClient(object): | |||
with assert_raises(ClientError): | |||
client.stream_update_metadata("/newton/prep", [1,2,3]) | |||
def test_client_3_insert(self): | |||
def test_client_4_insert(self): | |||
client = nilmdb.Client(url = "http://localhost:12380/") | |||
datetime_tz.localtz_set("America/New_York") | |||
@@ -203,14 +218,14 @@ class TestClient(object): | |||
in_("400 Bad Request", str(e.exception)) | |||
in_("OverlapError", str(e.exception)) | |||
def test_client_4_extract(self): | |||
def test_client_5_extract(self): | |||
# Misc tests for extract. Most of them are in test_cmdline. | |||
client = nilmdb.Client(url = "http://localhost:12380/") | |||
for x in client.stream_extract("/newton/prep", 123, 123): | |||
raise Exception("shouldn't be any data for this request") | |||
def test_client_5_generators(self): | |||
def test_client_6_generators(self): | |||
# A lot of the client functionality is already tested by test_cmdline, | |||
# but this gets a bit more coverage that cmdline misses. | |||
client = nilmdb.Client(url = "http://localhost:12380/") | |||
@@ -259,7 +274,7 @@ class TestClient(object): | |||
in_("404 Not Found", str(e.exception)) | |||
in_("No such stream", str(e.exception)) | |||
def test_client_6_chunked(self): | |||
def test_client_7_chunked(self): | |||
# Make sure that /stream/intervals and /stream/extract | |||
# properly return streaming, chunked response. Pokes around | |||
# in client.http internals a bit to look at the response | |||
@@ -282,7 +297,7 @@ class TestClient(object): | |||
if "transfer-encoding: chunked" not in client.http._headers.lower(): | |||
warnings.warn("Non-chunked HTTP response for /stream/extract") | |||
def test_client_7_unicode(self): | |||
def test_client_8_unicode(self): | |||
# Basic Unicode tests | |||
client = nilmdb.Client(url = "http://localhost:12380/") | |||