Compare commits
9 Commits
nilmdb-1.2
...
nilmdb-1.2
Author | SHA1 | Date | |
---|---|---|---|
204a6ecb15 | |||
5db3b186a4 | |||
fe640cf421 | |||
ca67c79fe4 | |||
8917bcd4bf | |||
a75ec98673 | |||
e476338d61 | |||
d752b882f2 | |||
ade27773e6 |
@@ -220,7 +220,7 @@ class StreamInserter(object):
|
|||||||
# These are soft limits -- actual data might be rounded up.
|
# These are soft limits -- actual data might be rounded up.
|
||||||
# We send when we have a certain amount of data queued, or
|
# We send when we have a certain amount of data queued, or
|
||||||
# when a certain amount of time has passed since the last send.
|
# when a certain amount of time has passed since the last send.
|
||||||
_max_data = 1048576
|
_max_data = 2 * 1024 * 1024
|
||||||
_max_time = 30
|
_max_time = 30
|
||||||
|
|
||||||
# Delta to add to the final timestamp, if "end" wasn't given
|
# Delta to add to the final timestamp, if "end" wasn't given
|
||||||
|
@@ -25,6 +25,9 @@ def main():
|
|||||||
default = os.path.join(os.getcwd(), "db"))
|
default = os.path.join(os.getcwd(), "db"))
|
||||||
group.add_argument('-q', '--quiet', help = 'Silence output',
|
group.add_argument('-q', '--quiet', help = 'Silence output',
|
||||||
action = 'store_true')
|
action = 'store_true')
|
||||||
|
group.add_argument('-n', '--nosync', help = 'Use asynchronous '
|
||||||
|
'commits for sqlite transactions',
|
||||||
|
action = 'store_true', default = False)
|
||||||
|
|
||||||
group = parser.add_argument_group("Debug options")
|
group = parser.add_argument_group("Debug options")
|
||||||
group.add_argument('-y', '--yappi', help = 'Run under yappi profiler and '
|
group.add_argument('-y', '--yappi', help = 'Run under yappi profiler and '
|
||||||
@@ -35,7 +38,8 @@ def main():
|
|||||||
|
|
||||||
# Create database object. Needs to be serialized before passing
|
# Create database object. Needs to be serialized before passing
|
||||||
# to the Server.
|
# to the Server.
|
||||||
db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(args.database)
|
db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(args.database,
|
||||||
|
sync = not args.nosync)
|
||||||
|
|
||||||
# Configure the server
|
# Configure the server
|
||||||
if args.quiet:
|
if args.quiet:
|
||||||
|
@@ -28,7 +28,7 @@ except: # pragma: no cover
|
|||||||
table_cache_size = 16
|
table_cache_size = 16
|
||||||
fd_cache_size = 16
|
fd_cache_size = 16
|
||||||
|
|
||||||
@nilmdb.utils.must_close(wrap_verify = True)
|
@nilmdb.utils.must_close(wrap_verify = False)
|
||||||
class BulkData(object):
|
class BulkData(object):
|
||||||
def __init__(self, basepath, **kwargs):
|
def __init__(self, basepath, **kwargs):
|
||||||
self.basepath = basepath
|
self.basepath = basepath
|
||||||
@@ -171,7 +171,7 @@ class BulkData(object):
|
|||||||
ospath = os.path.join(self.root, *elements)
|
ospath = os.path.join(self.root, *elements)
|
||||||
return Table(ospath)
|
return Table(ospath)
|
||||||
|
|
||||||
@nilmdb.utils.must_close(wrap_verify = True)
|
@nilmdb.utils.must_close(wrap_verify = False)
|
||||||
class File(object):
|
class File(object):
|
||||||
"""Object representing a single file on disk. Data can be appended,
|
"""Object representing a single file on disk. Data can be appended,
|
||||||
or the self.mmap handle can be used for random reads."""
|
or the self.mmap handle can be used for random reads."""
|
||||||
@@ -210,14 +210,26 @@ class File(object):
|
|||||||
self.mmap.close()
|
self.mmap.close()
|
||||||
self._f.close()
|
self._f.close()
|
||||||
|
|
||||||
def append(self, data):
|
def append(self, data): # pragma: no cover (below version used instead)
|
||||||
# Write data, flush it, and resize our mmap accordingly
|
# Write data, flush it, and resize our mmap accordingly
|
||||||
self._f.write(data)
|
self._f.write(data)
|
||||||
self._f.flush()
|
self._f.flush()
|
||||||
self.size += len(data)
|
self.size += len(data)
|
||||||
self._mmap_reopen()
|
self._mmap_reopen()
|
||||||
|
|
||||||
@nilmdb.utils.must_close(wrap_verify = True)
|
def append_pack_iter(self, count, packer, dataiter):
|
||||||
|
# An optimized verison of append, to avoid flushing the file
|
||||||
|
# and resizing the mmap after each data point.
|
||||||
|
try:
|
||||||
|
for i in xrange(count):
|
||||||
|
row = dataiter.next()
|
||||||
|
self._f.write(packer(*row))
|
||||||
|
finally:
|
||||||
|
self._f.flush()
|
||||||
|
self.size = self._f.tell()
|
||||||
|
self._mmap_reopen()
|
||||||
|
|
||||||
|
@nilmdb.utils.must_close(wrap_verify = False)
|
||||||
class Table(object):
|
class Table(object):
|
||||||
"""Tools to help access a single table (data at a specific OS path)."""
|
"""Tools to help access a single table (data at a specific OS path)."""
|
||||||
# See design.md for design details
|
# See design.md for design details
|
||||||
@@ -351,9 +363,7 @@ class Table(object):
|
|||||||
f = self.file_open(subdir, fname)
|
f = self.file_open(subdir, fname)
|
||||||
|
|
||||||
# Write the data
|
# Write the data
|
||||||
for i in xrange(count):
|
f.append_pack_iter(count, self.packer.pack, dataiter)
|
||||||
row = dataiter.next()
|
|
||||||
f.append(self.packer.pack(*row))
|
|
||||||
remaining -= count
|
remaining -= count
|
||||||
self.nrows += count
|
self.nrows += count
|
||||||
|
|
||||||
|
@@ -4,7 +4,6 @@ import time
|
|||||||
import sys
|
import sys
|
||||||
import inspect
|
import inspect
|
||||||
import cStringIO
|
import cStringIO
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
cdef enum:
|
cdef enum:
|
||||||
max_value_count = 64
|
max_value_count = 64
|
||||||
@@ -42,10 +41,12 @@ class Layout:
|
|||||||
|
|
||||||
if datatype == 'uint16':
|
if datatype == 'uint16':
|
||||||
self.parse = self.parse_uint16
|
self.parse = self.parse_uint16
|
||||||
self.format = self.format_uint16
|
self.format_str = "%.6f" + " %d" * self.count
|
||||||
|
self.format = self.format_generic
|
||||||
elif datatype == 'float32' or datatype == 'float64':
|
elif datatype == 'float32' or datatype == 'float64':
|
||||||
self.parse = self.parse_float64
|
self.parse = self.parse_float64
|
||||||
self.format = self.format_float64
|
self.format_str = "%.6f" + " %f" * self.count
|
||||||
|
self.format = self.format_generic
|
||||||
else:
|
else:
|
||||||
raise KeyError("invalid type")
|
raise KeyError("invalid type")
|
||||||
|
|
||||||
@@ -57,15 +58,15 @@ class Layout:
|
|||||||
cdef double ts
|
cdef double ts
|
||||||
# Return doubles even in float32 case, since they're going into
|
# Return doubles even in float32 case, since they're going into
|
||||||
# a Python array which would upconvert to double anyway.
|
# a Python array which would upconvert to double anyway.
|
||||||
result = []
|
result = [0] * (self.count + 1)
|
||||||
cdef char *end
|
cdef char *end
|
||||||
ts = libc.stdlib.strtod(text, &end)
|
ts = libc.stdlib.strtod(text, &end)
|
||||||
if end == text:
|
if end == text:
|
||||||
raise ValueError("bad timestamp")
|
raise ValueError("bad timestamp")
|
||||||
result.append(ts)
|
result[0] = ts
|
||||||
for n in range(self.count):
|
for n in range(self.count):
|
||||||
text = end
|
text = end
|
||||||
result.append(libc.stdlib.strtod(text, &end))
|
result[n+1] = libc.stdlib.strtod(text, &end)
|
||||||
if end == text:
|
if end == text:
|
||||||
raise ValueError("wrong number of values")
|
raise ValueError("wrong number of values")
|
||||||
n = 0
|
n = 0
|
||||||
@@ -79,18 +80,18 @@ class Layout:
|
|||||||
cdef int n
|
cdef int n
|
||||||
cdef double ts
|
cdef double ts
|
||||||
cdef int v
|
cdef int v
|
||||||
result = []
|
|
||||||
cdef char *end
|
cdef char *end
|
||||||
|
result = [0] * (self.count + 1)
|
||||||
ts = libc.stdlib.strtod(text, &end)
|
ts = libc.stdlib.strtod(text, &end)
|
||||||
if end == text:
|
if end == text:
|
||||||
raise ValueError("bad timestamp")
|
raise ValueError("bad timestamp")
|
||||||
result.append(ts)
|
result[0] = ts
|
||||||
for n in range(self.count):
|
for n in range(self.count):
|
||||||
text = end
|
text = end
|
||||||
v = libc.stdlib.strtol(text, &end, 10)
|
v = libc.stdlib.strtol(text, &end, 10)
|
||||||
if v < 0 or v > 65535:
|
if v < 0 or v > 65535:
|
||||||
raise ValueError("value out of range")
|
raise ValueError("value out of range")
|
||||||
result.append(v)
|
result[n+1] = v
|
||||||
if end == text:
|
if end == text:
|
||||||
raise ValueError("wrong number of values")
|
raise ValueError("wrong number of values")
|
||||||
n = 0
|
n = 0
|
||||||
@@ -101,25 +102,12 @@ class Layout:
|
|||||||
return (ts, result)
|
return (ts, result)
|
||||||
|
|
||||||
# Formatters
|
# Formatters
|
||||||
def format_float64(self, d):
|
def format_generic(self, d):
|
||||||
n = len(d) - 1
|
n = len(d) - 1
|
||||||
if n != self.count:
|
if n != self.count:
|
||||||
raise ValueError("wrong number of values for layout type: "
|
raise ValueError("wrong number of values for layout type: "
|
||||||
"got %d, wanted %d" % (n, self.count))
|
"got %d, wanted %d" % (n, self.count))
|
||||||
s = "%.6f" % d[0]
|
return (self.format_str % tuple(d)) + "\n"
|
||||||
for i in range(n):
|
|
||||||
s += " %f" % d[i+1]
|
|
||||||
return s + "\n"
|
|
||||||
|
|
||||||
def format_uint16(self, d):
|
|
||||||
n = len(d) - 1
|
|
||||||
if n != self.count:
|
|
||||||
raise ValueError("wrong number of values for layout type: "
|
|
||||||
"got %d, wanted %d" % (n, self.count))
|
|
||||||
s = "%.6f" % d[0]
|
|
||||||
for i in range(n):
|
|
||||||
s += " %d" % d[i+1]
|
|
||||||
return s + "\n"
|
|
||||||
|
|
||||||
# Get a layout by name
|
# Get a layout by name
|
||||||
def get_named(typestring):
|
def get_named(typestring):
|
||||||
|
@@ -400,7 +400,7 @@ class Server(object):
|
|||||||
'server.socket_host': host,
|
'server.socket_host': host,
|
||||||
'server.socket_port': port,
|
'server.socket_port': port,
|
||||||
'engine.autoreload_on': False,
|
'engine.autoreload_on': False,
|
||||||
'server.max_request_body_size': 4*1024*1024,
|
'server.max_request_body_size': 8*1024*1024,
|
||||||
})
|
})
|
||||||
if self.embedded:
|
if self.embedded:
|
||||||
cherrypy.config.update({ 'environment': 'embedded' })
|
cherrypy.config.update({ 'environment': 'embedded' })
|
||||||
|
@@ -20,6 +20,7 @@ import unittest
|
|||||||
import warnings
|
import warnings
|
||||||
import resource
|
import resource
|
||||||
import time
|
import time
|
||||||
|
import re
|
||||||
|
|
||||||
from testutil.helpers import *
|
from testutil.helpers import *
|
||||||
|
|
||||||
@@ -172,6 +173,10 @@ class TestClient(object):
|
|||||||
def test_client_04_insert(self):
|
def test_client_04_insert(self):
|
||||||
client = nilmdb.Client(url = testurl)
|
client = nilmdb.Client(url = testurl)
|
||||||
|
|
||||||
|
# Limit _max_data to 1 MB, since our test file is 1.5 MB
|
||||||
|
old_max_data = nilmdb.client.client.StreamInserter._max_data
|
||||||
|
nilmdb.client.client.StreamInserter._max_data = 1 * 1024 * 1024
|
||||||
|
|
||||||
datetime_tz.localtz_set("America/New_York")
|
datetime_tz.localtz_set("America/New_York")
|
||||||
|
|
||||||
testfile = "tests/data/prep-20120323T1000"
|
testfile = "tests/data/prep-20120323T1000"
|
||||||
@@ -244,8 +249,9 @@ class TestClient(object):
|
|||||||
in_("400 Bad Request", str(e.exception))
|
in_("400 Bad Request", str(e.exception))
|
||||||
# Client chunks the input, so the exact timestamp here might change
|
# Client chunks the input, so the exact timestamp here might change
|
||||||
# if the chunk positions change.
|
# if the chunk positions change.
|
||||||
in_("Data timestamp 1332511271.016667 >= end time 1332511201.0",
|
assert(re.search("Data timestamp 13325[0-9]+\.[0-9]+ "
|
||||||
str(e.exception))
|
">= end time 1332511201.0", str(e.exception))
|
||||||
|
is not None)
|
||||||
|
|
||||||
# Now do the real load
|
# Now do the real load
|
||||||
data = timestamper.TimestamperRate(testfile, start, 120)
|
data = timestamper.TimestamperRate(testfile, start, 120)
|
||||||
@@ -264,6 +270,7 @@ class TestClient(object):
|
|||||||
in_("400 Bad Request", str(e.exception))
|
in_("400 Bad Request", str(e.exception))
|
||||||
in_("verlap", str(e.exception))
|
in_("verlap", str(e.exception))
|
||||||
|
|
||||||
|
nilmdb.client.client.StreamInserter._max_data = old_max_data
|
||||||
client.close()
|
client.close()
|
||||||
|
|
||||||
def test_client_05_extractremove(self):
|
def test_client_05_extractremove(self):
|
||||||
|
@@ -246,7 +246,7 @@ class TestLayoutSpeed:
|
|||||||
parser = Parser(layout)
|
parser = Parser(layout)
|
||||||
formatter = Formatter(layout)
|
formatter = Formatter(layout)
|
||||||
parser.parse(data)
|
parser.parse(data)
|
||||||
data = formatter.format(parser.data)
|
formatter.format(parser.data)
|
||||||
elapsed = time.time() - start
|
elapsed = time.time() - start
|
||||||
printf("roundtrip %s: %d ms, %.1f μs/row, %d rows/sec\n",
|
printf("roundtrip %s: %d ms, %.1f μs/row, %d rows/sec\n",
|
||||||
layout,
|
layout,
|
||||||
@@ -264,3 +264,8 @@ class TestLayoutSpeed:
|
|||||||
return [ sprintf("%d", random.randint(0,65535))
|
return [ sprintf("%d", random.randint(0,65535))
|
||||||
for x in range(10) ]
|
for x in range(10) ]
|
||||||
do_speedtest("uint16_10", datagen)
|
do_speedtest("uint16_10", datagen)
|
||||||
|
|
||||||
|
def datagen():
|
||||||
|
return [ sprintf("%d", random.randint(0,65535))
|
||||||
|
for x in range(6) ]
|
||||||
|
do_speedtest("uint16_6", datagen)
|
||||||
|
@@ -34,6 +34,10 @@ class Bar:
|
|||||||
def __del__(self):
|
def __del__(self):
|
||||||
fprintf(err, "Deleting\n")
|
fprintf(err, "Deleting\n")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def baz(self):
|
||||||
|
fprintf(err, "Baz\n")
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
fprintf(err, "Closing\n")
|
fprintf(err, "Closing\n")
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user