Compare commits
9 Commits
nilmdb-1.2
...
nilmdb-1.2
Author | SHA1 | Date | |
---|---|---|---|
204a6ecb15 | |||
5db3b186a4 | |||
fe640cf421 | |||
ca67c79fe4 | |||
8917bcd4bf | |||
a75ec98673 | |||
e476338d61 | |||
d752b882f2 | |||
ade27773e6 |
@@ -220,7 +220,7 @@ class StreamInserter(object):
|
||||
# These are soft limits -- actual data might be rounded up.
|
||||
# We send when we have a certain amount of data queued, or
|
||||
# when a certain amount of time has passed since the last send.
|
||||
_max_data = 1048576
|
||||
_max_data = 2 * 1024 * 1024
|
||||
_max_time = 30
|
||||
|
||||
# Delta to add to the final timestamp, if "end" wasn't given
|
||||
|
@@ -25,6 +25,9 @@ def main():
|
||||
default = os.path.join(os.getcwd(), "db"))
|
||||
group.add_argument('-q', '--quiet', help = 'Silence output',
|
||||
action = 'store_true')
|
||||
group.add_argument('-n', '--nosync', help = 'Use asynchronous '
|
||||
'commits for sqlite transactions',
|
||||
action = 'store_true', default = False)
|
||||
|
||||
group = parser.add_argument_group("Debug options")
|
||||
group.add_argument('-y', '--yappi', help = 'Run under yappi profiler and '
|
||||
@@ -35,7 +38,8 @@ def main():
|
||||
|
||||
# Create database object. Needs to be serialized before passing
|
||||
# to the Server.
|
||||
db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(args.database)
|
||||
db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(args.database,
|
||||
sync = not args.nosync)
|
||||
|
||||
# Configure the server
|
||||
if args.quiet:
|
||||
|
@@ -28,7 +28,7 @@ except: # pragma: no cover
|
||||
table_cache_size = 16
|
||||
fd_cache_size = 16
|
||||
|
||||
@nilmdb.utils.must_close(wrap_verify = True)
|
||||
@nilmdb.utils.must_close(wrap_verify = False)
|
||||
class BulkData(object):
|
||||
def __init__(self, basepath, **kwargs):
|
||||
self.basepath = basepath
|
||||
@@ -171,7 +171,7 @@ class BulkData(object):
|
||||
ospath = os.path.join(self.root, *elements)
|
||||
return Table(ospath)
|
||||
|
||||
@nilmdb.utils.must_close(wrap_verify = True)
|
||||
@nilmdb.utils.must_close(wrap_verify = False)
|
||||
class File(object):
|
||||
"""Object representing a single file on disk. Data can be appended,
|
||||
or the self.mmap handle can be used for random reads."""
|
||||
@@ -210,14 +210,26 @@ class File(object):
|
||||
self.mmap.close()
|
||||
self._f.close()
|
||||
|
||||
def append(self, data):
|
||||
def append(self, data): # pragma: no cover (below version used instead)
|
||||
# Write data, flush it, and resize our mmap accordingly
|
||||
self._f.write(data)
|
||||
self._f.flush()
|
||||
self.size += len(data)
|
||||
self._mmap_reopen()
|
||||
|
||||
@nilmdb.utils.must_close(wrap_verify = True)
|
||||
def append_pack_iter(self, count, packer, dataiter):
|
||||
# An optimized verison of append, to avoid flushing the file
|
||||
# and resizing the mmap after each data point.
|
||||
try:
|
||||
for i in xrange(count):
|
||||
row = dataiter.next()
|
||||
self._f.write(packer(*row))
|
||||
finally:
|
||||
self._f.flush()
|
||||
self.size = self._f.tell()
|
||||
self._mmap_reopen()
|
||||
|
||||
@nilmdb.utils.must_close(wrap_verify = False)
|
||||
class Table(object):
|
||||
"""Tools to help access a single table (data at a specific OS path)."""
|
||||
# See design.md for design details
|
||||
@@ -351,9 +363,7 @@ class Table(object):
|
||||
f = self.file_open(subdir, fname)
|
||||
|
||||
# Write the data
|
||||
for i in xrange(count):
|
||||
row = dataiter.next()
|
||||
f.append(self.packer.pack(*row))
|
||||
f.append_pack_iter(count, self.packer.pack, dataiter)
|
||||
remaining -= count
|
||||
self.nrows += count
|
||||
|
||||
|
@@ -4,7 +4,6 @@ import time
|
||||
import sys
|
||||
import inspect
|
||||
import cStringIO
|
||||
import numpy as np
|
||||
|
||||
cdef enum:
|
||||
max_value_count = 64
|
||||
@@ -42,10 +41,12 @@ class Layout:
|
||||
|
||||
if datatype == 'uint16':
|
||||
self.parse = self.parse_uint16
|
||||
self.format = self.format_uint16
|
||||
self.format_str = "%.6f" + " %d" * self.count
|
||||
self.format = self.format_generic
|
||||
elif datatype == 'float32' or datatype == 'float64':
|
||||
self.parse = self.parse_float64
|
||||
self.format = self.format_float64
|
||||
self.format_str = "%.6f" + " %f" * self.count
|
||||
self.format = self.format_generic
|
||||
else:
|
||||
raise KeyError("invalid type")
|
||||
|
||||
@@ -57,15 +58,15 @@ class Layout:
|
||||
cdef double ts
|
||||
# Return doubles even in float32 case, since they're going into
|
||||
# a Python array which would upconvert to double anyway.
|
||||
result = []
|
||||
result = [0] * (self.count + 1)
|
||||
cdef char *end
|
||||
ts = libc.stdlib.strtod(text, &end)
|
||||
if end == text:
|
||||
raise ValueError("bad timestamp")
|
||||
result.append(ts)
|
||||
result[0] = ts
|
||||
for n in range(self.count):
|
||||
text = end
|
||||
result.append(libc.stdlib.strtod(text, &end))
|
||||
result[n+1] = libc.stdlib.strtod(text, &end)
|
||||
if end == text:
|
||||
raise ValueError("wrong number of values")
|
||||
n = 0
|
||||
@@ -79,18 +80,18 @@ class Layout:
|
||||
cdef int n
|
||||
cdef double ts
|
||||
cdef int v
|
||||
result = []
|
||||
cdef char *end
|
||||
result = [0] * (self.count + 1)
|
||||
ts = libc.stdlib.strtod(text, &end)
|
||||
if end == text:
|
||||
raise ValueError("bad timestamp")
|
||||
result.append(ts)
|
||||
result[0] = ts
|
||||
for n in range(self.count):
|
||||
text = end
|
||||
v = libc.stdlib.strtol(text, &end, 10)
|
||||
if v < 0 or v > 65535:
|
||||
raise ValueError("value out of range")
|
||||
result.append(v)
|
||||
result[n+1] = v
|
||||
if end == text:
|
||||
raise ValueError("wrong number of values")
|
||||
n = 0
|
||||
@@ -101,25 +102,12 @@ class Layout:
|
||||
return (ts, result)
|
||||
|
||||
# Formatters
|
||||
def format_float64(self, d):
|
||||
def format_generic(self, d):
|
||||
n = len(d) - 1
|
||||
if n != self.count:
|
||||
raise ValueError("wrong number of values for layout type: "
|
||||
"got %d, wanted %d" % (n, self.count))
|
||||
s = "%.6f" % d[0]
|
||||
for i in range(n):
|
||||
s += " %f" % d[i+1]
|
||||
return s + "\n"
|
||||
|
||||
def format_uint16(self, d):
|
||||
n = len(d) - 1
|
||||
if n != self.count:
|
||||
raise ValueError("wrong number of values for layout type: "
|
||||
"got %d, wanted %d" % (n, self.count))
|
||||
s = "%.6f" % d[0]
|
||||
for i in range(n):
|
||||
s += " %d" % d[i+1]
|
||||
return s + "\n"
|
||||
return (self.format_str % tuple(d)) + "\n"
|
||||
|
||||
# Get a layout by name
|
||||
def get_named(typestring):
|
||||
|
@@ -400,7 +400,7 @@ class Server(object):
|
||||
'server.socket_host': host,
|
||||
'server.socket_port': port,
|
||||
'engine.autoreload_on': False,
|
||||
'server.max_request_body_size': 4*1024*1024,
|
||||
'server.max_request_body_size': 8*1024*1024,
|
||||
})
|
||||
if self.embedded:
|
||||
cherrypy.config.update({ 'environment': 'embedded' })
|
||||
|
@@ -20,6 +20,7 @@ import unittest
|
||||
import warnings
|
||||
import resource
|
||||
import time
|
||||
import re
|
||||
|
||||
from testutil.helpers import *
|
||||
|
||||
@@ -172,6 +173,10 @@ class TestClient(object):
|
||||
def test_client_04_insert(self):
|
||||
client = nilmdb.Client(url = testurl)
|
||||
|
||||
# Limit _max_data to 1 MB, since our test file is 1.5 MB
|
||||
old_max_data = nilmdb.client.client.StreamInserter._max_data
|
||||
nilmdb.client.client.StreamInserter._max_data = 1 * 1024 * 1024
|
||||
|
||||
datetime_tz.localtz_set("America/New_York")
|
||||
|
||||
testfile = "tests/data/prep-20120323T1000"
|
||||
@@ -244,8 +249,9 @@ class TestClient(object):
|
||||
in_("400 Bad Request", str(e.exception))
|
||||
# Client chunks the input, so the exact timestamp here might change
|
||||
# if the chunk positions change.
|
||||
in_("Data timestamp 1332511271.016667 >= end time 1332511201.0",
|
||||
str(e.exception))
|
||||
assert(re.search("Data timestamp 13325[0-9]+\.[0-9]+ "
|
||||
">= end time 1332511201.0", str(e.exception))
|
||||
is not None)
|
||||
|
||||
# Now do the real load
|
||||
data = timestamper.TimestamperRate(testfile, start, 120)
|
||||
@@ -264,6 +270,7 @@ class TestClient(object):
|
||||
in_("400 Bad Request", str(e.exception))
|
||||
in_("verlap", str(e.exception))
|
||||
|
||||
nilmdb.client.client.StreamInserter._max_data = old_max_data
|
||||
client.close()
|
||||
|
||||
def test_client_05_extractremove(self):
|
||||
|
@@ -246,7 +246,7 @@ class TestLayoutSpeed:
|
||||
parser = Parser(layout)
|
||||
formatter = Formatter(layout)
|
||||
parser.parse(data)
|
||||
data = formatter.format(parser.data)
|
||||
formatter.format(parser.data)
|
||||
elapsed = time.time() - start
|
||||
printf("roundtrip %s: %d ms, %.1f μs/row, %d rows/sec\n",
|
||||
layout,
|
||||
@@ -264,3 +264,8 @@ class TestLayoutSpeed:
|
||||
return [ sprintf("%d", random.randint(0,65535))
|
||||
for x in range(10) ]
|
||||
do_speedtest("uint16_10", datagen)
|
||||
|
||||
def datagen():
|
||||
return [ sprintf("%d", random.randint(0,65535))
|
||||
for x in range(6) ]
|
||||
do_speedtest("uint16_6", datagen)
|
||||
|
@@ -34,6 +34,10 @@ class Bar:
|
||||
def __del__(self):
|
||||
fprintf(err, "Deleting\n")
|
||||
|
||||
@classmethod
|
||||
def baz(self):
|
||||
fprintf(err, "Baz\n")
|
||||
|
||||
def close(self):
|
||||
fprintf(err, "Closing\n")
|
||||
|
||||
|
Reference in New Issue
Block a user