Compare commits
12 Commits
nilmdb-1.5
...
nilmdb-1.6
Author | SHA1 | Date | |
---|---|---|---|
49d04db1d6 | |||
ea838d05ae | |||
f2a48bdb2a | |||
6d14e0b8aa | |||
b31b9327b9 | |||
b98ff1331a | |||
00e6ba1124 | |||
01029230c9 | |||
ecc4e5ef9d | |||
23f31c472b | |||
a1e2746360 | |||
1c40d59a52 |
@@ -421,3 +421,20 @@ and has all of the same functions. It adds three new functions:
|
||||
It is significantly faster! It is about 20 times faster to decimate a
|
||||
stream with `nilm-decimate` when the filter code is using the new
|
||||
binary/numpy interface.
|
||||
|
||||
|
||||
WSGI interface & chunked requests
|
||||
---------------------------------
|
||||
|
||||
mod_wsgi requires "WSGIChunkedRequest On" to handle
|
||||
"Transfer-encoding: Chunked" requests. However, `/stream/insert`
|
||||
doesn't handle this correctly right now, because:
|
||||
|
||||
- The `cherrpy.request.body.read()` call needs to be fixed for chunked requests
|
||||
|
||||
- We don't want to just buffer endlessly in the server, and it will
|
||||
require some thought on how to handle data in chunks (what to do about
|
||||
interval endpoints).
|
||||
|
||||
It is probably better to just keep the endpoint management on the client
|
||||
side, so leave "WSGIChunkedRequest off" for now.
|
||||
|
50
extras/fix-oversize-files.py
Normal file
50
extras/fix-oversize-files.py
Normal file
@@ -0,0 +1,50 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
import os
|
||||
import sys
|
||||
import cPickle as pickle
|
||||
import argparse
|
||||
import fcntl
|
||||
import re
|
||||
from nilmdb.client.numpyclient import layout_to_dtype
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description = """
|
||||
Fix database corruption where binary writes caused too much data to be
|
||||
written to the file. Truncates files to the correct length. This was
|
||||
fixed by b98ff1331a515ad47fd3203615e835b529b039f9.
|
||||
""")
|
||||
parser.add_argument("path", action="store", help='Database root path')
|
||||
parser.add_argument("-y", "--yes", action="store_true", help='Fix them')
|
||||
args = parser.parse_args()
|
||||
|
||||
lock = os.path.join(args.path, "data.lock")
|
||||
with open(lock, "w") as f:
|
||||
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
|
||||
fix = {}
|
||||
|
||||
for (path, dirs, files) in os.walk(args.path):
|
||||
if "_format" in files:
|
||||
with open(os.path.join(path, "_format")) as format:
|
||||
fmt = pickle.load(format)
|
||||
rowsize = layout_to_dtype(fmt["layout"]).itemsize
|
||||
maxsize = rowsize * fmt["rows_per_file"]
|
||||
fix[path] = maxsize
|
||||
if maxsize < 128000000: # sanity check
|
||||
raise Exception("bad maxsize " + str(maxsize))
|
||||
|
||||
for fixpath in fix:
|
||||
for (path, dirs, files) in os.walk(fixpath):
|
||||
for fn in files:
|
||||
if not re.match("^[0-9a-f]{4,}$", fn):
|
||||
continue
|
||||
fn = os.path.join(path, fn)
|
||||
size = os.path.getsize(fn)
|
||||
maxsize = fix[fixpath]
|
||||
if size > maxsize:
|
||||
diff = size - maxsize
|
||||
print diff, "too big:", fn
|
||||
if args.yes:
|
||||
with open(fn, "a+") as dbfile:
|
||||
dbfile.truncate(maxsize)
|
@@ -6,7 +6,6 @@ import nilmdb.utils
|
||||
import nilmdb.client.httpclient
|
||||
from nilmdb.client.errors import ClientError
|
||||
|
||||
import re
|
||||
import time
|
||||
import simplejson as json
|
||||
import contextlib
|
||||
@@ -66,12 +65,8 @@ class Client(object):
|
||||
params["layout"] = layout
|
||||
if extended:
|
||||
params["extended"] = 1
|
||||
def sort_streams_nicely(x):
|
||||
"""Human-friendly sort (/stream/2 before /stream/10)"""
|
||||
num = lambda t: int(t) if t.isdigit() else t
|
||||
key = lambda k: [ num(c) for c in re.split('([0-9]+)', k[0]) ]
|
||||
return sorted(x, key = key)
|
||||
return sort_streams_nicely(self.http.get("stream/list", params))
|
||||
streams = self.http.get("stream/list", params)
|
||||
return nilmdb.utils.sort.sort_human(streams, key = lambda s: s[0])
|
||||
|
||||
def stream_get_metadata(self, path, keys = None):
|
||||
params = { "path": path }
|
||||
@@ -122,7 +117,10 @@ class Client(object):
|
||||
params["start"] = timestamp_to_string(start)
|
||||
if end is not None:
|
||||
params["end"] = timestamp_to_string(end)
|
||||
return self.http.post("stream/remove", params)
|
||||
total = 0
|
||||
for count in self.http.post_gen("stream/remove", params):
|
||||
total += int(count)
|
||||
return total
|
||||
|
||||
@contextlib.contextmanager
|
||||
def stream_insert_context(self, path, start = None, end = None):
|
||||
@@ -146,6 +144,7 @@ class Client(object):
|
||||
ctx = StreamInserter(self, path, start, end)
|
||||
yield ctx
|
||||
ctx.finalize()
|
||||
ctx.destroy()
|
||||
|
||||
def stream_insert(self, path, data, start = None, end = None):
|
||||
"""Insert rows of data into a stream. data should be a string
|
||||
@@ -295,6 +294,15 @@ class StreamInserter(object):
|
||||
self._block_data = []
|
||||
self._block_len = 0
|
||||
|
||||
self.destroyed = False
|
||||
|
||||
def destroy(self):
|
||||
"""Ensure this object can't be used again without raising
|
||||
an error"""
|
||||
def error(*args, **kwargs):
|
||||
raise Exception("don't reuse this context object")
|
||||
self._send_block = self.insert = self.finalize = self.send = error
|
||||
|
||||
def insert(self, data):
|
||||
"""Insert a chunk of ASCII formatted data in string form. The
|
||||
overall data must consist of lines terminated by '\\n'."""
|
||||
@@ -441,7 +449,7 @@ class StreamInserter(object):
|
||||
self._interval_start = end_ts
|
||||
|
||||
# Double check endpoints
|
||||
if start_ts is None or end_ts is None:
|
||||
if (start_ts is None or end_ts is None) or (start_ts == end_ts):
|
||||
# If the block has no non-comment lines, it's OK
|
||||
try:
|
||||
self._get_first_noncomment(block)
|
||||
|
@@ -137,5 +137,14 @@ class HTTPClient(object):
|
||||
"""Simple GET (parameters in URL) returning a generator"""
|
||||
return self._req_gen("GET", url, params, binary = binary)
|
||||
|
||||
def post_gen(self, url, params = None):
|
||||
"""Simple POST (parameters in body) returning a generator"""
|
||||
if self.post_json:
|
||||
return self._req_gen("POST", url, None,
|
||||
json.dumps(params),
|
||||
{ 'Content-type': 'application/json' })
|
||||
else:
|
||||
return self._req_gen("POST", url, None, params)
|
||||
|
||||
# Not much use for a POST or PUT generator, since they don't
|
||||
# return much data.
|
||||
|
@@ -98,6 +98,7 @@ class NumpyClient(nilmdb.client.client.Client):
|
||||
ctx = StreamInserterNumpy(self, path, start, end, dtype)
|
||||
yield ctx
|
||||
ctx.finalize()
|
||||
ctx.destroy()
|
||||
|
||||
def stream_insert_numpy(self, path, data, start = None, end = None,
|
||||
layout = None):
|
||||
@@ -133,16 +134,8 @@ class StreamInserterNumpy(nilmdb.client.client.StreamInserter):
|
||||
contiguous interval and may be None. 'dtype' is the Numpy
|
||||
dtype for this stream.
|
||||
"""
|
||||
self.last_response = None
|
||||
|
||||
super(StreamInserterNumpy, self).__init__(client, path, start, end)
|
||||
self._dtype = dtype
|
||||
self._client = client
|
||||
self._path = path
|
||||
|
||||
# Start and end for the overall contiguous interval we're
|
||||
# filling
|
||||
self._interval_start = start
|
||||
self._interval_end = end
|
||||
|
||||
# Max rows to send at once
|
||||
self._max_rows = self._max_data // self._dtype.itemsize
|
||||
@@ -250,9 +243,12 @@ class StreamInserterNumpy(nilmdb.client.client.StreamInserter):
|
||||
# Next block continues where this one ended
|
||||
self._interval_start = end_ts
|
||||
|
||||
# If we have no endpoints, it's because we had no data to send.
|
||||
if start_ts is None or end_ts is None:
|
||||
return
|
||||
# If we have no endpoints, or equal endpoints, it's OK as long
|
||||
# as there's no data to send
|
||||
if (start_ts is None or end_ts is None) or (start_ts == end_ts):
|
||||
if len(array) == 0:
|
||||
return
|
||||
raise ClientError("have data to send, but invalid start/end times")
|
||||
|
||||
# Send it
|
||||
data = array.tostring()
|
||||
|
@@ -1,6 +1,7 @@
|
||||
from __future__ import print_function
|
||||
from nilmdb.utils.printf import *
|
||||
import nilmdb.client
|
||||
import sys
|
||||
|
||||
def setup(self, sub):
|
||||
cmd = sub.add_parser("extract", help="Extract data",
|
||||
@@ -24,6 +25,8 @@ def setup(self, sub):
|
||||
).completer = self.complete.time
|
||||
|
||||
group = cmd.add_argument_group("Output format")
|
||||
group.add_argument("-B", "--binary", action="store_true",
|
||||
help="Raw binary output")
|
||||
group.add_argument("-b", "--bare", action="store_true",
|
||||
help="Exclude timestamps from output lines")
|
||||
group.add_argument("-a", "--annotate", action="store_true",
|
||||
@@ -42,6 +45,11 @@ def cmd_extract_verify(self):
|
||||
if self.args.start > self.args.end:
|
||||
self.parser.error("start is after end")
|
||||
|
||||
if self.args.binary:
|
||||
if (self.args.bare or self.args.annotate or self.args.markup or
|
||||
self.args.timestamp_raw or self.args.count):
|
||||
self.parser.error("--binary cannot be combined with other options")
|
||||
|
||||
def cmd_extract(self):
|
||||
streams = self.client.stream_list(self.args.path)
|
||||
if len(streams) != 1:
|
||||
@@ -60,16 +68,23 @@ def cmd_extract(self):
|
||||
printf("# end: %s\n", time_string(self.args.end))
|
||||
|
||||
printed = False
|
||||
if self.args.binary:
|
||||
printer = sys.stdout.write
|
||||
else:
|
||||
printer = print
|
||||
bare = self.args.bare
|
||||
count = self.args.count
|
||||
for dataline in self.client.stream_extract(self.args.path,
|
||||
self.args.start,
|
||||
self.args.end,
|
||||
self.args.count,
|
||||
self.args.markup):
|
||||
if self.args.bare and not self.args.count:
|
||||
self.args.markup,
|
||||
self.args.binary):
|
||||
if bare and not count:
|
||||
# Strip timestamp (first element). Doesn't make sense
|
||||
# if we are only returning a count.
|
||||
dataline = ' '.join(dataline.split(' ')[1:])
|
||||
print(dataline)
|
||||
printer(dataline)
|
||||
printed = True
|
||||
if not printed:
|
||||
if self.args.annotate:
|
||||
|
@@ -675,6 +675,7 @@ class NilmDB(object):
|
||||
|
||||
# Count how many were removed
|
||||
removed += row_end - row_start
|
||||
remaining -= row_end - row_start
|
||||
|
||||
if restart is not None:
|
||||
break
|
||||
|
@@ -468,7 +468,7 @@ static PyObject *Rocket_append_binary(Rocket *self, PyObject *args)
|
||||
}
|
||||
|
||||
/* Write binary data */
|
||||
if (fwrite(data, data_len, 1, self->file) != 1) {
|
||||
if (fwrite(data, self->binary_size, rows, self->file) != rows) {
|
||||
PyErr_SetFromErrno(PyExc_OSError);
|
||||
return NULL;
|
||||
}
|
||||
|
@@ -347,24 +347,34 @@ class Stream(NilmApp):
|
||||
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_in()
|
||||
@cherrypy.tools.json_out()
|
||||
@exception_to_httperror(NilmDBError)
|
||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||
@chunked_response
|
||||
@response_type("application/x-json-stream")
|
||||
def remove(self, path, start = None, end = None):
|
||||
"""
|
||||
Remove data from the backend database. Removes all data in
|
||||
the interval [start, end). Returns the number of data points
|
||||
removed.
|
||||
the interval [start, end).
|
||||
|
||||
Returns the number of data points removed. Since this is a potentially
|
||||
long-running operation, multiple numbers may be returned as the
|
||||
data gets removed from the backend database. The total number of
|
||||
points removed is the sum of all of these numbers.
|
||||
"""
|
||||
(start, end) = self._get_times(start, end)
|
||||
total_removed = 0
|
||||
while True:
|
||||
(removed, restart) = self.db.stream_remove(path, start, end)
|
||||
total_removed += removed
|
||||
if restart is None:
|
||||
break
|
||||
start = restart
|
||||
return total_removed
|
||||
|
||||
if len(self.db.stream_list(path = path)) != 1:
|
||||
raise cherrypy.HTTPError("404", "No such stream: " + path)
|
||||
|
||||
@workaround_cp_bug_1200
|
||||
def content(start, end):
|
||||
# Note: disable chunked responses to see tracebacks from here.
|
||||
while True:
|
||||
(removed, restart) = self.db.stream_remove(path, start, end)
|
||||
yield json.dumps(removed) + "\r\n"
|
||||
if restart is None:
|
||||
break
|
||||
start = restart
|
||||
return content(start, end)
|
||||
|
||||
# /stream/intervals?path=/newton/prep
|
||||
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||
|
@@ -13,3 +13,4 @@ import nilmdb.utils.time
|
||||
import nilmdb.utils.iterator
|
||||
import nilmdb.utils.interval
|
||||
import nilmdb.utils.lock
|
||||
import nilmdb.utils.sort
|
||||
|
@@ -1,4 +1,5 @@
|
||||
import os
|
||||
import errno
|
||||
from math import log
|
||||
|
||||
def human_size(num):
|
||||
@@ -16,10 +17,17 @@ def human_size(num):
|
||||
return '1 byte'
|
||||
|
||||
def du(path):
|
||||
"""Like du -sb, returns total size of path in bytes."""
|
||||
size = os.path.getsize(path)
|
||||
if os.path.isdir(path):
|
||||
for thisfile in os.listdir(path):
|
||||
filepath = os.path.join(path, thisfile)
|
||||
size += du(filepath)
|
||||
return size
|
||||
"""Like du -sb, returns total size of path in bytes. Ignore
|
||||
errors that might occur if we encounter broken symlinks or
|
||||
files in the process of being removed."""
|
||||
try:
|
||||
size = os.path.getsize(path)
|
||||
if os.path.isdir(path):
|
||||
for thisfile in os.listdir(path):
|
||||
filepath = os.path.join(path, thisfile)
|
||||
size += du(filepath)
|
||||
return size
|
||||
except OSError as e: # pragma: no cover
|
||||
if e.errno != errno.ENOENT:
|
||||
raise
|
||||
return 0
|
||||
|
18
nilmdb/utils/sort.py
Normal file
18
nilmdb/utils/sort.py
Normal file
@@ -0,0 +1,18 @@
|
||||
import re
|
||||
|
||||
def sort_human(items, key = None):
|
||||
"""Human-friendly sort (/stream/2 before /stream/10)"""
|
||||
def to_num(val):
|
||||
try:
|
||||
return int(val)
|
||||
except Exception:
|
||||
return val
|
||||
|
||||
def human_key(text):
|
||||
if key:
|
||||
text = key(text)
|
||||
# Break into character and numeric chunks.
|
||||
chunks = re.split(r'([0-9]+)', text)
|
||||
return [ to_num(c) for c in chunks ]
|
||||
|
||||
return sorted(items, key = human_key)
|
@@ -105,16 +105,19 @@ class TestClient(object):
|
||||
client.http.post("/stream/list")
|
||||
client = nilmdb.client.Client(url = testurl)
|
||||
|
||||
# Create three streams
|
||||
# Create four streams
|
||||
client.stream_create("/newton/prep", "float32_8")
|
||||
client.stream_create("/newton/raw", "uint16_6")
|
||||
client.stream_create("/newton/zzz/rawnotch", "uint16_9")
|
||||
client.stream_create("/newton/zzz/rawnotch2", "uint16_9")
|
||||
client.stream_create("/newton/zzz/rawnotch11", "uint16_9")
|
||||
|
||||
# Verify we got 3 streams
|
||||
# Verify we got 4 streams in the right order
|
||||
eq_(client.stream_list(), [ ["/newton/prep", "float32_8"],
|
||||
["/newton/raw", "uint16_6"],
|
||||
["/newton/zzz/rawnotch", "uint16_9"]
|
||||
["/newton/zzz/rawnotch2", "uint16_9"],
|
||||
["/newton/zzz/rawnotch11", "uint16_9"]
|
||||
])
|
||||
|
||||
# Match just one type or one path
|
||||
eq_(client.stream_list(layout="uint16_6"),
|
||||
[ ["/newton/raw", "uint16_6"] ])
|
||||
@@ -327,6 +330,10 @@ class TestClient(object):
|
||||
2525.169921875, 8350.83984375, 3724.699951171875,
|
||||
1355.3399658203125, 2039.0))
|
||||
|
||||
# Just get some coverage
|
||||
with assert_raises(ClientError) as e:
|
||||
client.http.post("/stream/remove", { "path": "/none" })
|
||||
|
||||
client.close()
|
||||
|
||||
def test_client_06_generators(self):
|
||||
@@ -613,8 +620,12 @@ class TestClient(object):
|
||||
with client.stream_insert_context("/empty/test", end = 950):
|
||||
pass
|
||||
|
||||
# Equal start and end is OK as long as there's no data
|
||||
with client.stream_insert_context("/empty/test", start=9, end=9):
|
||||
pass
|
||||
|
||||
# Try various things that might cause problems
|
||||
with client.stream_insert_context("/empty/test", 1000, 1050):
|
||||
with client.stream_insert_context("/empty/test", 1000, 1050) as ctx:
|
||||
ctx.finalize() # inserts [1000, 1050]
|
||||
ctx.finalize() # nothing
|
||||
ctx.finalize() # nothing
|
||||
|
@@ -601,6 +601,14 @@ class TestCmdline(object):
|
||||
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
self.match("43200\n")
|
||||
|
||||
# test binary mode
|
||||
self.fail("extract -c -B /newton/prep -s min -e max")
|
||||
self.contain("binary cannot be combined")
|
||||
self.fail("extract -m -B /newton/prep -s min -e max")
|
||||
self.contain("binary cannot be combined")
|
||||
self.ok("extract -B /newton/prep -s min -e max")
|
||||
eq_(len(self.captured), 43200 * (8 + 8*4))
|
||||
|
||||
# markup for 3 intervals, plus extra markup lines whenever we had
|
||||
# a "restart" from the nilmdb.stream_extract function
|
||||
self.ok("extract -m /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
|
@@ -28,7 +28,10 @@ def setup_module():
|
||||
recursive_unlink(testdb)
|
||||
|
||||
# Start web app on a custom port
|
||||
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb)
|
||||
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
|
||||
testdb, bulkdata_args = { "file_size" : 16384,
|
||||
"files_per_dir" : 3 } )
|
||||
|
||||
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
|
||||
port = 32180, stoppable = False,
|
||||
fast_shutdown = True,
|
||||
@@ -179,6 +182,17 @@ class TestNumpyClient(object):
|
||||
assert(np.array_equal(a,b))
|
||||
assert(np.array_equal(a,c))
|
||||
|
||||
# Make sure none of the files are greater than 16384 bytes as
|
||||
# we configured with the bulkdata_args above.
|
||||
datapath = os.path.join(testdb, "data")
|
||||
for (dirpath, dirnames, filenames) in os.walk(datapath):
|
||||
for f in filenames:
|
||||
fn = os.path.join(dirpath, f)
|
||||
size = os.path.getsize(fn)
|
||||
if size > 16384:
|
||||
raise AssertionError(sprintf("%s is too big: %d > %d\n",
|
||||
fn, size, 16384))
|
||||
|
||||
nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data
|
||||
client.close()
|
||||
|
||||
@@ -295,8 +309,25 @@ class TestNumpyClient(object):
|
||||
with client.stream_insert_numpy_context("/empty/test", end = 950):
|
||||
pass
|
||||
|
||||
# Equal start and end is OK as long as there's no data
|
||||
with assert_raises(ClientError) as e:
|
||||
with client.stream_insert_numpy_context("/empty/test",
|
||||
start=9, end=9) as ctx:
|
||||
ctx.insert([[9, 9]])
|
||||
ctx.finalize()
|
||||
in_("have data to send, but invalid start/end times", str(e.exception))
|
||||
|
||||
with client.stream_insert_numpy_context("/empty/test",
|
||||
start=9, end=9) as ctx:
|
||||
pass
|
||||
|
||||
# reusing a context object is bad
|
||||
with assert_raises(Exception) as e:
|
||||
ctx.insert([[9, 9]])
|
||||
|
||||
# Try various things that might cause problems
|
||||
with client.stream_insert_numpy_context("/empty/test", 1000, 1050):
|
||||
with client.stream_insert_numpy_context("/empty/test",
|
||||
1000, 1050) as ctx:
|
||||
ctx.finalize() # inserts [1000, 1050]
|
||||
ctx.finalize() # nothing
|
||||
ctx.finalize() # nothing
|
||||
|
Reference in New Issue
Block a user