Compare commits

...

9 Commits

Author SHA1 Message Date
204a6ecb15 Optimize bulkdata.append() by postponing flushes & mmap resize
Rather than flushing and resizing after each row is written to the
file, have the file object iterate by itself and do all of the
writes.  Only flush and resize the mmap after finishing.  This should
be pretty safe to do, especially since nothing is concurrent at the
moment.
2013-03-01 16:30:49 -05:00
5db3b186a4 Make test_mustclose more complete 2013-03-01 16:30:22 -05:00
fe640cf421 Remove must_close verification wrappers on bulkdata
At this point we know that the close() behavior is correct, so it's
not worth slowing everything down for these checks.
2013-03-01 16:11:44 -05:00
ca67c79fe4 Improve test_layout_speed 2013-03-01 16:04:10 -05:00
8917bcd4bf Fix test case failures due to increased client chunk size 2013-03-01 16:04:00 -05:00
a75ec98673 Slight speed improvements in layout.pyx 2013-03-01 16:03:38 -05:00
e476338d61 Remove outdated numpy dependency 2013-03-01 16:03:19 -05:00
d752b882f2 Bump up block sizes in client
This will help amortize the sqlite synchronization costs.
2013-02-28 21:11:57 -05:00
ade27773e6 Add --nosync option to nilmdb-server script 2013-02-28 20:45:08 -05:00
8 changed files with 55 additions and 37 deletions

View File

@@ -220,7 +220,7 @@ class StreamInserter(object):
# These are soft limits -- actual data might be rounded up.
# We send when we have a certain amount of data queued, or
# when a certain amount of time has passed since the last send.
_max_data = 1048576
_max_data = 2 * 1024 * 1024
_max_time = 30
# Delta to add to the final timestamp, if "end" wasn't given

View File

@@ -25,6 +25,9 @@ def main():
default = os.path.join(os.getcwd(), "db"))
group.add_argument('-q', '--quiet', help = 'Silence output',
action = 'store_true')
group.add_argument('-n', '--nosync', help = 'Use asynchronous '
'commits for sqlite transactions',
action = 'store_true', default = False)
group = parser.add_argument_group("Debug options")
group.add_argument('-y', '--yappi', help = 'Run under yappi profiler and '
@@ -35,7 +38,8 @@ def main():
# Create database object. Needs to be serialized before passing
# to the Server.
db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(args.database)
db = nilmdb.utils.serializer_proxy(nilmdb.NilmDB)(args.database,
sync = not args.nosync)
# Configure the server
if args.quiet:

View File

@@ -28,7 +28,7 @@ except: # pragma: no cover
table_cache_size = 16
fd_cache_size = 16
@nilmdb.utils.must_close(wrap_verify = True)
@nilmdb.utils.must_close(wrap_verify = False)
class BulkData(object):
def __init__(self, basepath, **kwargs):
self.basepath = basepath
@@ -171,7 +171,7 @@ class BulkData(object):
ospath = os.path.join(self.root, *elements)
return Table(ospath)
@nilmdb.utils.must_close(wrap_verify = True)
@nilmdb.utils.must_close(wrap_verify = False)
class File(object):
"""Object representing a single file on disk. Data can be appended,
or the self.mmap handle can be used for random reads."""
@@ -210,14 +210,26 @@ class File(object):
self.mmap.close()
self._f.close()
def append(self, data):
def append(self, data): # pragma: no cover (below version used instead)
# Write data, flush it, and resize our mmap accordingly
self._f.write(data)
self._f.flush()
self.size += len(data)
self._mmap_reopen()
@nilmdb.utils.must_close(wrap_verify = True)
def append_pack_iter(self, count, packer, dataiter):
# An optimized verison of append, to avoid flushing the file
# and resizing the mmap after each data point.
try:
for i in xrange(count):
row = dataiter.next()
self._f.write(packer(*row))
finally:
self._f.flush()
self.size = self._f.tell()
self._mmap_reopen()
@nilmdb.utils.must_close(wrap_verify = False)
class Table(object):
"""Tools to help access a single table (data at a specific OS path)."""
# See design.md for design details
@@ -351,9 +363,7 @@ class Table(object):
f = self.file_open(subdir, fname)
# Write the data
for i in xrange(count):
row = dataiter.next()
f.append(self.packer.pack(*row))
f.append_pack_iter(count, self.packer.pack, dataiter)
remaining -= count
self.nrows += count

View File

@@ -4,7 +4,6 @@ import time
import sys
import inspect
import cStringIO
import numpy as np
cdef enum:
max_value_count = 64
@@ -42,10 +41,12 @@ class Layout:
if datatype == 'uint16':
self.parse = self.parse_uint16
self.format = self.format_uint16
self.format_str = "%.6f" + " %d" * self.count
self.format = self.format_generic
elif datatype == 'float32' or datatype == 'float64':
self.parse = self.parse_float64
self.format = self.format_float64
self.format_str = "%.6f" + " %f" * self.count
self.format = self.format_generic
else:
raise KeyError("invalid type")
@@ -57,15 +58,15 @@ class Layout:
cdef double ts
# Return doubles even in float32 case, since they're going into
# a Python array which would upconvert to double anyway.
result = []
result = [0] * (self.count + 1)
cdef char *end
ts = libc.stdlib.strtod(text, &end)
if end == text:
raise ValueError("bad timestamp")
result.append(ts)
result[0] = ts
for n in range(self.count):
text = end
result.append(libc.stdlib.strtod(text, &end))
result[n+1] = libc.stdlib.strtod(text, &end)
if end == text:
raise ValueError("wrong number of values")
n = 0
@@ -79,18 +80,18 @@ class Layout:
cdef int n
cdef double ts
cdef int v
result = []
cdef char *end
result = [0] * (self.count + 1)
ts = libc.stdlib.strtod(text, &end)
if end == text:
raise ValueError("bad timestamp")
result.append(ts)
result[0] = ts
for n in range(self.count):
text = end
v = libc.stdlib.strtol(text, &end, 10)
if v < 0 or v > 65535:
raise ValueError("value out of range")
result.append(v)
result[n+1] = v
if end == text:
raise ValueError("wrong number of values")
n = 0
@@ -101,25 +102,12 @@ class Layout:
return (ts, result)
# Formatters
def format_float64(self, d):
def format_generic(self, d):
n = len(d) - 1
if n != self.count:
raise ValueError("wrong number of values for layout type: "
"got %d, wanted %d" % (n, self.count))
s = "%.6f" % d[0]
for i in range(n):
s += " %f" % d[i+1]
return s + "\n"
def format_uint16(self, d):
n = len(d) - 1
if n != self.count:
raise ValueError("wrong number of values for layout type: "
"got %d, wanted %d" % (n, self.count))
s = "%.6f" % d[0]
for i in range(n):
s += " %d" % d[i+1]
return s + "\n"
return (self.format_str % tuple(d)) + "\n"
# Get a layout by name
def get_named(typestring):

View File

@@ -400,7 +400,7 @@ class Server(object):
'server.socket_host': host,
'server.socket_port': port,
'engine.autoreload_on': False,
'server.max_request_body_size': 4*1024*1024,
'server.max_request_body_size': 8*1024*1024,
})
if self.embedded:
cherrypy.config.update({ 'environment': 'embedded' })

View File

@@ -20,6 +20,7 @@ import unittest
import warnings
import resource
import time
import re
from testutil.helpers import *
@@ -172,6 +173,10 @@ class TestClient(object):
def test_client_04_insert(self):
client = nilmdb.Client(url = testurl)
# Limit _max_data to 1 MB, since our test file is 1.5 MB
old_max_data = nilmdb.client.client.StreamInserter._max_data
nilmdb.client.client.StreamInserter._max_data = 1 * 1024 * 1024
datetime_tz.localtz_set("America/New_York")
testfile = "tests/data/prep-20120323T1000"
@@ -244,8 +249,9 @@ class TestClient(object):
in_("400 Bad Request", str(e.exception))
# Client chunks the input, so the exact timestamp here might change
# if the chunk positions change.
in_("Data timestamp 1332511271.016667 >= end time 1332511201.0",
str(e.exception))
assert(re.search("Data timestamp 13325[0-9]+\.[0-9]+ "
">= end time 1332511201.0", str(e.exception))
is not None)
# Now do the real load
data = timestamper.TimestamperRate(testfile, start, 120)
@@ -264,6 +270,7 @@ class TestClient(object):
in_("400 Bad Request", str(e.exception))
in_("verlap", str(e.exception))
nilmdb.client.client.StreamInserter._max_data = old_max_data
client.close()
def test_client_05_extractremove(self):

View File

@@ -246,7 +246,7 @@ class TestLayoutSpeed:
parser = Parser(layout)
formatter = Formatter(layout)
parser.parse(data)
data = formatter.format(parser.data)
formatter.format(parser.data)
elapsed = time.time() - start
printf("roundtrip %s: %d ms, %.1f μs/row, %d rows/sec\n",
layout,
@@ -264,3 +264,8 @@ class TestLayoutSpeed:
return [ sprintf("%d", random.randint(0,65535))
for x in range(10) ]
do_speedtest("uint16_10", datagen)
def datagen():
return [ sprintf("%d", random.randint(0,65535))
for x in range(6) ]
do_speedtest("uint16_6", datagen)

View File

@@ -34,6 +34,10 @@ class Bar:
def __del__(self):
fprintf(err, "Deleting\n")
@classmethod
def baz(self):
fprintf(err, "Baz\n")
def close(self):
fprintf(err, "Closing\n")