Compare commits
15 Commits
nilmdb-1.5
...
nilmdb-1.6
Author | SHA1 | Date | |
---|---|---|---|
2317894355 | |||
539c92226c | |||
77c766d85d | |||
49d04db1d6 | |||
ea838d05ae | |||
f2a48bdb2a | |||
6d14e0b8aa | |||
b31b9327b9 | |||
b98ff1331a | |||
00e6ba1124 | |||
01029230c9 | |||
ecc4e5ef9d | |||
23f31c472b | |||
a1e2746360 | |||
1c40d59a52 |
@@ -421,3 +421,20 @@ and has all of the same functions. It adds three new functions:
|
|||||||
It is significantly faster! It is about 20 times faster to decimate a
|
It is significantly faster! It is about 20 times faster to decimate a
|
||||||
stream with `nilm-decimate` when the filter code is using the new
|
stream with `nilm-decimate` when the filter code is using the new
|
||||||
binary/numpy interface.
|
binary/numpy interface.
|
||||||
|
|
||||||
|
|
||||||
|
WSGI interface & chunked requests
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
mod_wsgi requires "WSGIChunkedRequest On" to handle
|
||||||
|
"Transfer-encoding: Chunked" requests. However, `/stream/insert`
|
||||||
|
doesn't handle this correctly right now, because:
|
||||||
|
|
||||||
|
- The `cherrpy.request.body.read()` call needs to be fixed for chunked requests
|
||||||
|
|
||||||
|
- We don't want to just buffer endlessly in the server, and it will
|
||||||
|
require some thought on how to handle data in chunks (what to do about
|
||||||
|
interval endpoints).
|
||||||
|
|
||||||
|
It is probably better to just keep the endpoint management on the client
|
||||||
|
side, so leave "WSGIChunkedRequest off" for now.
|
||||||
|
50
extras/fix-oversize-files.py
Normal file
50
extras/fix-oversize-files.py
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
#!/usr/bin/python
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import cPickle as pickle
|
||||||
|
import argparse
|
||||||
|
import fcntl
|
||||||
|
import re
|
||||||
|
from nilmdb.client.numpyclient import layout_to_dtype
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description = """
|
||||||
|
Fix database corruption where binary writes caused too much data to be
|
||||||
|
written to the file. Truncates files to the correct length. This was
|
||||||
|
fixed by b98ff1331a515ad47fd3203615e835b529b039f9.
|
||||||
|
""")
|
||||||
|
parser.add_argument("path", action="store", help='Database root path')
|
||||||
|
parser.add_argument("-y", "--yes", action="store_true", help='Fix them')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
lock = os.path.join(args.path, "data.lock")
|
||||||
|
with open(lock, "w") as f:
|
||||||
|
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||||
|
|
||||||
|
fix = {}
|
||||||
|
|
||||||
|
for (path, dirs, files) in os.walk(args.path):
|
||||||
|
if "_format" in files:
|
||||||
|
with open(os.path.join(path, "_format")) as format:
|
||||||
|
fmt = pickle.load(format)
|
||||||
|
rowsize = layout_to_dtype(fmt["layout"]).itemsize
|
||||||
|
maxsize = rowsize * fmt["rows_per_file"]
|
||||||
|
fix[path] = maxsize
|
||||||
|
if maxsize < 128000000: # sanity check
|
||||||
|
raise Exception("bad maxsize " + str(maxsize))
|
||||||
|
|
||||||
|
for fixpath in fix:
|
||||||
|
for (path, dirs, files) in os.walk(fixpath):
|
||||||
|
for fn in files:
|
||||||
|
if not re.match("^[0-9a-f]{4,}$", fn):
|
||||||
|
continue
|
||||||
|
fn = os.path.join(path, fn)
|
||||||
|
size = os.path.getsize(fn)
|
||||||
|
maxsize = fix[fixpath]
|
||||||
|
if size > maxsize:
|
||||||
|
diff = size - maxsize
|
||||||
|
print diff, "too big:", fn
|
||||||
|
if args.yes:
|
||||||
|
with open(fn, "a+") as dbfile:
|
||||||
|
dbfile.truncate(maxsize)
|
@@ -6,7 +6,6 @@ import nilmdb.utils
|
|||||||
import nilmdb.client.httpclient
|
import nilmdb.client.httpclient
|
||||||
from nilmdb.client.errors import ClientError
|
from nilmdb.client.errors import ClientError
|
||||||
|
|
||||||
import re
|
|
||||||
import time
|
import time
|
||||||
import simplejson as json
|
import simplejson as json
|
||||||
import contextlib
|
import contextlib
|
||||||
@@ -66,12 +65,8 @@ class Client(object):
|
|||||||
params["layout"] = layout
|
params["layout"] = layout
|
||||||
if extended:
|
if extended:
|
||||||
params["extended"] = 1
|
params["extended"] = 1
|
||||||
def sort_streams_nicely(x):
|
streams = self.http.get("stream/list", params)
|
||||||
"""Human-friendly sort (/stream/2 before /stream/10)"""
|
return nilmdb.utils.sort.sort_human(streams, key = lambda s: s[0])
|
||||||
num = lambda t: int(t) if t.isdigit() else t
|
|
||||||
key = lambda k: [ num(c) for c in re.split('([0-9]+)', k[0]) ]
|
|
||||||
return sorted(x, key = key)
|
|
||||||
return sort_streams_nicely(self.http.get("stream/list", params))
|
|
||||||
|
|
||||||
def stream_get_metadata(self, path, keys = None):
|
def stream_get_metadata(self, path, keys = None):
|
||||||
params = { "path": path }
|
params = { "path": path }
|
||||||
@@ -122,7 +117,10 @@ class Client(object):
|
|||||||
params["start"] = timestamp_to_string(start)
|
params["start"] = timestamp_to_string(start)
|
||||||
if end is not None:
|
if end is not None:
|
||||||
params["end"] = timestamp_to_string(end)
|
params["end"] = timestamp_to_string(end)
|
||||||
return self.http.post("stream/remove", params)
|
total = 0
|
||||||
|
for count in self.http.post_gen("stream/remove", params):
|
||||||
|
total += int(count)
|
||||||
|
return total
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def stream_insert_context(self, path, start = None, end = None):
|
def stream_insert_context(self, path, start = None, end = None):
|
||||||
@@ -146,6 +144,7 @@ class Client(object):
|
|||||||
ctx = StreamInserter(self, path, start, end)
|
ctx = StreamInserter(self, path, start, end)
|
||||||
yield ctx
|
yield ctx
|
||||||
ctx.finalize()
|
ctx.finalize()
|
||||||
|
ctx.destroy()
|
||||||
|
|
||||||
def stream_insert(self, path, data, start = None, end = None):
|
def stream_insert(self, path, data, start = None, end = None):
|
||||||
"""Insert rows of data into a stream. data should be a string
|
"""Insert rows of data into a stream. data should be a string
|
||||||
@@ -295,6 +294,15 @@ class StreamInserter(object):
|
|||||||
self._block_data = []
|
self._block_data = []
|
||||||
self._block_len = 0
|
self._block_len = 0
|
||||||
|
|
||||||
|
self.destroyed = False
|
||||||
|
|
||||||
|
def destroy(self):
|
||||||
|
"""Ensure this object can't be used again without raising
|
||||||
|
an error"""
|
||||||
|
def error(*args, **kwargs):
|
||||||
|
raise Exception("don't reuse this context object")
|
||||||
|
self._send_block = self.insert = self.finalize = self.send = error
|
||||||
|
|
||||||
def insert(self, data):
|
def insert(self, data):
|
||||||
"""Insert a chunk of ASCII formatted data in string form. The
|
"""Insert a chunk of ASCII formatted data in string form. The
|
||||||
overall data must consist of lines terminated by '\\n'."""
|
overall data must consist of lines terminated by '\\n'."""
|
||||||
@@ -441,7 +449,7 @@ class StreamInserter(object):
|
|||||||
self._interval_start = end_ts
|
self._interval_start = end_ts
|
||||||
|
|
||||||
# Double check endpoints
|
# Double check endpoints
|
||||||
if start_ts is None or end_ts is None:
|
if (start_ts is None or end_ts is None) or (start_ts == end_ts):
|
||||||
# If the block has no non-comment lines, it's OK
|
# If the block has no non-comment lines, it's OK
|
||||||
try:
|
try:
|
||||||
self._get_first_noncomment(block)
|
self._get_first_noncomment(block)
|
||||||
|
@@ -137,5 +137,14 @@ class HTTPClient(object):
|
|||||||
"""Simple GET (parameters in URL) returning a generator"""
|
"""Simple GET (parameters in URL) returning a generator"""
|
||||||
return self._req_gen("GET", url, params, binary = binary)
|
return self._req_gen("GET", url, params, binary = binary)
|
||||||
|
|
||||||
|
def post_gen(self, url, params = None):
|
||||||
|
"""Simple POST (parameters in body) returning a generator"""
|
||||||
|
if self.post_json:
|
||||||
|
return self._req_gen("POST", url, None,
|
||||||
|
json.dumps(params),
|
||||||
|
{ 'Content-type': 'application/json' })
|
||||||
|
else:
|
||||||
|
return self._req_gen("POST", url, None, params)
|
||||||
|
|
||||||
# Not much use for a POST or PUT generator, since they don't
|
# Not much use for a POST or PUT generator, since they don't
|
||||||
# return much data.
|
# return much data.
|
||||||
|
@@ -98,6 +98,7 @@ class NumpyClient(nilmdb.client.client.Client):
|
|||||||
ctx = StreamInserterNumpy(self, path, start, end, dtype)
|
ctx = StreamInserterNumpy(self, path, start, end, dtype)
|
||||||
yield ctx
|
yield ctx
|
||||||
ctx.finalize()
|
ctx.finalize()
|
||||||
|
ctx.destroy()
|
||||||
|
|
||||||
def stream_insert_numpy(self, path, data, start = None, end = None,
|
def stream_insert_numpy(self, path, data, start = None, end = None,
|
||||||
layout = None):
|
layout = None):
|
||||||
@@ -133,16 +134,8 @@ class StreamInserterNumpy(nilmdb.client.client.StreamInserter):
|
|||||||
contiguous interval and may be None. 'dtype' is the Numpy
|
contiguous interval and may be None. 'dtype' is the Numpy
|
||||||
dtype for this stream.
|
dtype for this stream.
|
||||||
"""
|
"""
|
||||||
self.last_response = None
|
super(StreamInserterNumpy, self).__init__(client, path, start, end)
|
||||||
|
|
||||||
self._dtype = dtype
|
self._dtype = dtype
|
||||||
self._client = client
|
|
||||||
self._path = path
|
|
||||||
|
|
||||||
# Start and end for the overall contiguous interval we're
|
|
||||||
# filling
|
|
||||||
self._interval_start = start
|
|
||||||
self._interval_end = end
|
|
||||||
|
|
||||||
# Max rows to send at once
|
# Max rows to send at once
|
||||||
self._max_rows = self._max_data // self._dtype.itemsize
|
self._max_rows = self._max_data // self._dtype.itemsize
|
||||||
@@ -250,9 +243,12 @@ class StreamInserterNumpy(nilmdb.client.client.StreamInserter):
|
|||||||
# Next block continues where this one ended
|
# Next block continues where this one ended
|
||||||
self._interval_start = end_ts
|
self._interval_start = end_ts
|
||||||
|
|
||||||
# If we have no endpoints, it's because we had no data to send.
|
# If we have no endpoints, or equal endpoints, it's OK as long
|
||||||
if start_ts is None or end_ts is None:
|
# as there's no data to send
|
||||||
return
|
if (start_ts is None or end_ts is None) or (start_ts == end_ts):
|
||||||
|
if len(array) == 0:
|
||||||
|
return
|
||||||
|
raise ClientError("have data to send, but invalid start/end times")
|
||||||
|
|
||||||
# Send it
|
# Send it
|
||||||
data = array.tostring()
|
data = array.tostring()
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
from __future__ import print_function
|
from __future__ import print_function
|
||||||
from nilmdb.utils.printf import *
|
from nilmdb.utils.printf import *
|
||||||
import nilmdb.client
|
import nilmdb.client
|
||||||
|
import sys
|
||||||
|
|
||||||
def setup(self, sub):
|
def setup(self, sub):
|
||||||
cmd = sub.add_parser("extract", help="Extract data",
|
cmd = sub.add_parser("extract", help="Extract data",
|
||||||
@@ -24,6 +25,8 @@ def setup(self, sub):
|
|||||||
).completer = self.complete.time
|
).completer = self.complete.time
|
||||||
|
|
||||||
group = cmd.add_argument_group("Output format")
|
group = cmd.add_argument_group("Output format")
|
||||||
|
group.add_argument("-B", "--binary", action="store_true",
|
||||||
|
help="Raw binary output")
|
||||||
group.add_argument("-b", "--bare", action="store_true",
|
group.add_argument("-b", "--bare", action="store_true",
|
||||||
help="Exclude timestamps from output lines")
|
help="Exclude timestamps from output lines")
|
||||||
group.add_argument("-a", "--annotate", action="store_true",
|
group.add_argument("-a", "--annotate", action="store_true",
|
||||||
@@ -42,6 +45,11 @@ def cmd_extract_verify(self):
|
|||||||
if self.args.start > self.args.end:
|
if self.args.start > self.args.end:
|
||||||
self.parser.error("start is after end")
|
self.parser.error("start is after end")
|
||||||
|
|
||||||
|
if self.args.binary:
|
||||||
|
if (self.args.bare or self.args.annotate or self.args.markup or
|
||||||
|
self.args.timestamp_raw or self.args.count):
|
||||||
|
self.parser.error("--binary cannot be combined with other options")
|
||||||
|
|
||||||
def cmd_extract(self):
|
def cmd_extract(self):
|
||||||
streams = self.client.stream_list(self.args.path)
|
streams = self.client.stream_list(self.args.path)
|
||||||
if len(streams) != 1:
|
if len(streams) != 1:
|
||||||
@@ -60,16 +68,23 @@ def cmd_extract(self):
|
|||||||
printf("# end: %s\n", time_string(self.args.end))
|
printf("# end: %s\n", time_string(self.args.end))
|
||||||
|
|
||||||
printed = False
|
printed = False
|
||||||
|
if self.args.binary:
|
||||||
|
printer = sys.stdout.write
|
||||||
|
else:
|
||||||
|
printer = print
|
||||||
|
bare = self.args.bare
|
||||||
|
count = self.args.count
|
||||||
for dataline in self.client.stream_extract(self.args.path,
|
for dataline in self.client.stream_extract(self.args.path,
|
||||||
self.args.start,
|
self.args.start,
|
||||||
self.args.end,
|
self.args.end,
|
||||||
self.args.count,
|
self.args.count,
|
||||||
self.args.markup):
|
self.args.markup,
|
||||||
if self.args.bare and not self.args.count:
|
self.args.binary):
|
||||||
|
if bare and not count:
|
||||||
# Strip timestamp (first element). Doesn't make sense
|
# Strip timestamp (first element). Doesn't make sense
|
||||||
# if we are only returning a count.
|
# if we are only returning a count.
|
||||||
dataline = ' '.join(dataline.split(' ')[1:])
|
dataline = ' '.join(dataline.split(' ')[1:])
|
||||||
print(dataline)
|
printer(dataline)
|
||||||
printed = True
|
printed = True
|
||||||
if not printed:
|
if not printed:
|
||||||
if self.args.annotate:
|
if self.args.annotate:
|
||||||
|
@@ -21,5 +21,8 @@ def cmd_info(self):
|
|||||||
printf("Server URL: %s\n", self.client.geturl())
|
printf("Server URL: %s\n", self.client.geturl())
|
||||||
dbinfo = self.client.dbinfo()
|
dbinfo = self.client.dbinfo()
|
||||||
printf("Server database path: %s\n", dbinfo["path"])
|
printf("Server database path: %s\n", dbinfo["path"])
|
||||||
printf("Server database size: %s\n", human_size(dbinfo["size"]))
|
for (desc, field) in [("used by NilmDB", "size"),
|
||||||
printf("Server database free space: %s\n", human_size(dbinfo["free"]))
|
("used by other", "other"),
|
||||||
|
("reserved", "reserved"),
|
||||||
|
("free", "free")]:
|
||||||
|
printf("Server disk space %s: %s\n", desc, human_size(dbinfo[field]))
|
||||||
|
@@ -19,8 +19,8 @@ from . import rocket
|
|||||||
|
|
||||||
# Up to 256 open file descriptors at any given time.
|
# Up to 256 open file descriptors at any given time.
|
||||||
# These variables are global so they can be used in the decorator arguments.
|
# These variables are global so they can be used in the decorator arguments.
|
||||||
table_cache_size = 16
|
table_cache_size = 32
|
||||||
fd_cache_size = 16
|
fd_cache_size = 8
|
||||||
|
|
||||||
@nilmdb.utils.must_close(wrap_verify = False)
|
@nilmdb.utils.must_close(wrap_verify = False)
|
||||||
class BulkData(object):
|
class BulkData(object):
|
||||||
|
@@ -176,7 +176,7 @@ class NilmDB(object):
|
|||||||
raise NilmDBError("start must precede end")
|
raise NilmDBError("start must precede end")
|
||||||
return (start, end)
|
return (start, end)
|
||||||
|
|
||||||
@nilmdb.utils.lru_cache(size = 16)
|
@nilmdb.utils.lru_cache(size = 64)
|
||||||
def _get_intervals(self, stream_id):
|
def _get_intervals(self, stream_id):
|
||||||
"""
|
"""
|
||||||
Return a mutable IntervalSet corresponding to the given stream ID.
|
Return a mutable IntervalSet corresponding to the given stream ID.
|
||||||
@@ -675,6 +675,7 @@ class NilmDB(object):
|
|||||||
|
|
||||||
# Count how many were removed
|
# Count how many were removed
|
||||||
removed += row_end - row_start
|
removed += row_end - row_start
|
||||||
|
remaining -= row_end - row_start
|
||||||
|
|
||||||
if restart is not None:
|
if restart is not None:
|
||||||
break
|
break
|
||||||
|
@@ -28,7 +28,7 @@ const static char __long_ok[1 - 2*!(sizeof(int64_t) ==
|
|||||||
|
|
||||||
/* Somewhat arbitrary, just so we can use fixed sizes for strings
|
/* Somewhat arbitrary, just so we can use fixed sizes for strings
|
||||||
etc. */
|
etc. */
|
||||||
static const int MAX_LAYOUT_COUNT = 128;
|
static const int MAX_LAYOUT_COUNT = 1024;
|
||||||
|
|
||||||
/* Error object and constants */
|
/* Error object and constants */
|
||||||
static PyObject *ParseError;
|
static PyObject *ParseError;
|
||||||
@@ -468,7 +468,7 @@ static PyObject *Rocket_append_binary(Rocket *self, PyObject *args)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Write binary data */
|
/* Write binary data */
|
||||||
if (fwrite(data, data_len, 1, self->file) != 1) {
|
if (fwrite(data, self->binary_size, rows, self->file) != rows) {
|
||||||
PyErr_SetFromErrno(PyExc_OSError);
|
PyErr_SetFromErrno(PyExc_OSError);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@@ -167,9 +167,13 @@ class Root(NilmApp):
|
|||||||
"""Return a dictionary with the database path,
|
"""Return a dictionary with the database path,
|
||||||
size of the database in bytes, and free disk space in bytes"""
|
size of the database in bytes, and free disk space in bytes"""
|
||||||
path = self.db.get_basepath()
|
path = self.db.get_basepath()
|
||||||
|
usage = psutil.disk_usage(path)
|
||||||
|
dbsize = nilmdb.utils.du(path)
|
||||||
return { "path": path,
|
return { "path": path,
|
||||||
"size": nilmdb.utils.du(path),
|
"size": dbsize,
|
||||||
"free": psutil.disk_usage(path).free }
|
"other": usage.used - dbsize,
|
||||||
|
"reserved": usage.total - usage.used - usage.free,
|
||||||
|
"free": usage.free }
|
||||||
|
|
||||||
class Stream(NilmApp):
|
class Stream(NilmApp):
|
||||||
"""Stream-specific operations"""
|
"""Stream-specific operations"""
|
||||||
@@ -347,24 +351,34 @@ class Stream(NilmApp):
|
|||||||
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
# /stream/remove?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
@cherrypy.tools.json_in()
|
@cherrypy.tools.json_in()
|
||||||
@cherrypy.tools.json_out()
|
|
||||||
@exception_to_httperror(NilmDBError)
|
|
||||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||||
|
@chunked_response
|
||||||
|
@response_type("application/x-json-stream")
|
||||||
def remove(self, path, start = None, end = None):
|
def remove(self, path, start = None, end = None):
|
||||||
"""
|
"""
|
||||||
Remove data from the backend database. Removes all data in
|
Remove data from the backend database. Removes all data in
|
||||||
the interval [start, end). Returns the number of data points
|
the interval [start, end).
|
||||||
removed.
|
|
||||||
|
Returns the number of data points removed. Since this is a potentially
|
||||||
|
long-running operation, multiple numbers may be returned as the
|
||||||
|
data gets removed from the backend database. The total number of
|
||||||
|
points removed is the sum of all of these numbers.
|
||||||
"""
|
"""
|
||||||
(start, end) = self._get_times(start, end)
|
(start, end) = self._get_times(start, end)
|
||||||
total_removed = 0
|
|
||||||
while True:
|
if len(self.db.stream_list(path = path)) != 1:
|
||||||
(removed, restart) = self.db.stream_remove(path, start, end)
|
raise cherrypy.HTTPError("404", "No such stream: " + path)
|
||||||
total_removed += removed
|
|
||||||
if restart is None:
|
@workaround_cp_bug_1200
|
||||||
break
|
def content(start, end):
|
||||||
start = restart
|
# Note: disable chunked responses to see tracebacks from here.
|
||||||
return total_removed
|
while True:
|
||||||
|
(removed, restart) = self.db.stream_remove(path, start, end)
|
||||||
|
yield json.dumps(removed) + "\r\n"
|
||||||
|
if restart is None:
|
||||||
|
break
|
||||||
|
start = restart
|
||||||
|
return content(start, end)
|
||||||
|
|
||||||
# /stream/intervals?path=/newton/prep
|
# /stream/intervals?path=/newton/prep
|
||||||
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||||
|
@@ -13,3 +13,4 @@ import nilmdb.utils.time
|
|||||||
import nilmdb.utils.iterator
|
import nilmdb.utils.iterator
|
||||||
import nilmdb.utils.interval
|
import nilmdb.utils.interval
|
||||||
import nilmdb.utils.lock
|
import nilmdb.utils.lock
|
||||||
|
import nilmdb.utils.sort
|
||||||
|
@@ -1,4 +1,5 @@
|
|||||||
import os
|
import os
|
||||||
|
import errno
|
||||||
from math import log
|
from math import log
|
||||||
|
|
||||||
def human_size(num):
|
def human_size(num):
|
||||||
@@ -16,10 +17,17 @@ def human_size(num):
|
|||||||
return '1 byte'
|
return '1 byte'
|
||||||
|
|
||||||
def du(path):
|
def du(path):
|
||||||
"""Like du -sb, returns total size of path in bytes."""
|
"""Like du -sb, returns total size of path in bytes. Ignore
|
||||||
size = os.path.getsize(path)
|
errors that might occur if we encounter broken symlinks or
|
||||||
if os.path.isdir(path):
|
files in the process of being removed."""
|
||||||
for thisfile in os.listdir(path):
|
try:
|
||||||
filepath = os.path.join(path, thisfile)
|
size = os.path.getsize(path)
|
||||||
size += du(filepath)
|
if os.path.isdir(path):
|
||||||
return size
|
for thisfile in os.listdir(path):
|
||||||
|
filepath = os.path.join(path, thisfile)
|
||||||
|
size += du(filepath)
|
||||||
|
return size
|
||||||
|
except OSError as e: # pragma: no cover
|
||||||
|
if e.errno != errno.ENOENT:
|
||||||
|
raise
|
||||||
|
return 0
|
||||||
|
18
nilmdb/utils/sort.py
Normal file
18
nilmdb/utils/sort.py
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
import re
|
||||||
|
|
||||||
|
def sort_human(items, key = None):
|
||||||
|
"""Human-friendly sort (/stream/2 before /stream/10)"""
|
||||||
|
def to_num(val):
|
||||||
|
try:
|
||||||
|
return int(val)
|
||||||
|
except Exception:
|
||||||
|
return val
|
||||||
|
|
||||||
|
def human_key(text):
|
||||||
|
if key:
|
||||||
|
text = key(text)
|
||||||
|
# Break into character and numeric chunks.
|
||||||
|
chunks = re.split(r'([0-9]+)', text)
|
||||||
|
return [ to_num(c) for c in chunks ]
|
||||||
|
|
||||||
|
return sorted(items, key = human_key)
|
@@ -105,16 +105,19 @@ class TestClient(object):
|
|||||||
client.http.post("/stream/list")
|
client.http.post("/stream/list")
|
||||||
client = nilmdb.client.Client(url = testurl)
|
client = nilmdb.client.Client(url = testurl)
|
||||||
|
|
||||||
# Create three streams
|
# Create four streams
|
||||||
client.stream_create("/newton/prep", "float32_8")
|
client.stream_create("/newton/prep", "float32_8")
|
||||||
client.stream_create("/newton/raw", "uint16_6")
|
client.stream_create("/newton/raw", "uint16_6")
|
||||||
client.stream_create("/newton/zzz/rawnotch", "uint16_9")
|
client.stream_create("/newton/zzz/rawnotch2", "uint16_9")
|
||||||
|
client.stream_create("/newton/zzz/rawnotch11", "uint16_9")
|
||||||
|
|
||||||
# Verify we got 3 streams
|
# Verify we got 4 streams in the right order
|
||||||
eq_(client.stream_list(), [ ["/newton/prep", "float32_8"],
|
eq_(client.stream_list(), [ ["/newton/prep", "float32_8"],
|
||||||
["/newton/raw", "uint16_6"],
|
["/newton/raw", "uint16_6"],
|
||||||
["/newton/zzz/rawnotch", "uint16_9"]
|
["/newton/zzz/rawnotch2", "uint16_9"],
|
||||||
|
["/newton/zzz/rawnotch11", "uint16_9"]
|
||||||
])
|
])
|
||||||
|
|
||||||
# Match just one type or one path
|
# Match just one type or one path
|
||||||
eq_(client.stream_list(layout="uint16_6"),
|
eq_(client.stream_list(layout="uint16_6"),
|
||||||
[ ["/newton/raw", "uint16_6"] ])
|
[ ["/newton/raw", "uint16_6"] ])
|
||||||
@@ -327,6 +330,10 @@ class TestClient(object):
|
|||||||
2525.169921875, 8350.83984375, 3724.699951171875,
|
2525.169921875, 8350.83984375, 3724.699951171875,
|
||||||
1355.3399658203125, 2039.0))
|
1355.3399658203125, 2039.0))
|
||||||
|
|
||||||
|
# Just get some coverage
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
client.http.post("/stream/remove", { "path": "/none" })
|
||||||
|
|
||||||
client.close()
|
client.close()
|
||||||
|
|
||||||
def test_client_06_generators(self):
|
def test_client_06_generators(self):
|
||||||
@@ -613,8 +620,12 @@ class TestClient(object):
|
|||||||
with client.stream_insert_context("/empty/test", end = 950):
|
with client.stream_insert_context("/empty/test", end = 950):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# Equal start and end is OK as long as there's no data
|
||||||
|
with client.stream_insert_context("/empty/test", start=9, end=9):
|
||||||
|
pass
|
||||||
|
|
||||||
# Try various things that might cause problems
|
# Try various things that might cause problems
|
||||||
with client.stream_insert_context("/empty/test", 1000, 1050):
|
with client.stream_insert_context("/empty/test", 1000, 1050) as ctx:
|
||||||
ctx.finalize() # inserts [1000, 1050]
|
ctx.finalize() # inserts [1000, 1050]
|
||||||
ctx.finalize() # nothing
|
ctx.finalize() # nothing
|
||||||
ctx.finalize() # nothing
|
ctx.finalize() # nothing
|
||||||
|
@@ -245,8 +245,10 @@ class TestCmdline(object):
|
|||||||
self.contain("Client version: " + nilmdb.__version__)
|
self.contain("Client version: " + nilmdb.__version__)
|
||||||
self.contain("Server version: " + test_server.version)
|
self.contain("Server version: " + test_server.version)
|
||||||
self.contain("Server database path")
|
self.contain("Server database path")
|
||||||
self.contain("Server database size")
|
self.contain("Server disk space used by NilmDB")
|
||||||
self.contain("Server database free space")
|
self.contain("Server disk space used by other")
|
||||||
|
self.contain("Server disk space reserved")
|
||||||
|
self.contain("Server disk space free")
|
||||||
|
|
||||||
def test_04_createlist(self):
|
def test_04_createlist(self):
|
||||||
# Basic stream tests, like those in test_client.
|
# Basic stream tests, like those in test_client.
|
||||||
@@ -601,6 +603,14 @@ class TestCmdline(object):
|
|||||||
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
|
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||||
self.match("43200\n")
|
self.match("43200\n")
|
||||||
|
|
||||||
|
# test binary mode
|
||||||
|
self.fail("extract -c -B /newton/prep -s min -e max")
|
||||||
|
self.contain("binary cannot be combined")
|
||||||
|
self.fail("extract -m -B /newton/prep -s min -e max")
|
||||||
|
self.contain("binary cannot be combined")
|
||||||
|
self.ok("extract -B /newton/prep -s min -e max")
|
||||||
|
eq_(len(self.captured), 43200 * (8 + 8*4))
|
||||||
|
|
||||||
# markup for 3 intervals, plus extra markup lines whenever we had
|
# markup for 3 intervals, plus extra markup lines whenever we had
|
||||||
# a "restart" from the nilmdb.stream_extract function
|
# a "restart" from the nilmdb.stream_extract function
|
||||||
self.ok("extract -m /newton/prep --start 2000-01-01 --end 2020-01-01")
|
self.ok("extract -m /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||||
|
@@ -28,7 +28,10 @@ def setup_module():
|
|||||||
recursive_unlink(testdb)
|
recursive_unlink(testdb)
|
||||||
|
|
||||||
# Start web app on a custom port
|
# Start web app on a custom port
|
||||||
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(testdb)
|
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
|
||||||
|
testdb, bulkdata_args = { "file_size" : 16384,
|
||||||
|
"files_per_dir" : 3 } )
|
||||||
|
|
||||||
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
|
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
|
||||||
port = 32180, stoppable = False,
|
port = 32180, stoppable = False,
|
||||||
fast_shutdown = True,
|
fast_shutdown = True,
|
||||||
@@ -179,6 +182,17 @@ class TestNumpyClient(object):
|
|||||||
assert(np.array_equal(a,b))
|
assert(np.array_equal(a,b))
|
||||||
assert(np.array_equal(a,c))
|
assert(np.array_equal(a,c))
|
||||||
|
|
||||||
|
# Make sure none of the files are greater than 16384 bytes as
|
||||||
|
# we configured with the bulkdata_args above.
|
||||||
|
datapath = os.path.join(testdb, "data")
|
||||||
|
for (dirpath, dirnames, filenames) in os.walk(datapath):
|
||||||
|
for f in filenames:
|
||||||
|
fn = os.path.join(dirpath, f)
|
||||||
|
size = os.path.getsize(fn)
|
||||||
|
if size > 16384:
|
||||||
|
raise AssertionError(sprintf("%s is too big: %d > %d\n",
|
||||||
|
fn, size, 16384))
|
||||||
|
|
||||||
nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data
|
nilmdb.client.numpyclient.StreamInserterNumpy._max_data = old_max_data
|
||||||
client.close()
|
client.close()
|
||||||
|
|
||||||
@@ -295,8 +309,25 @@ class TestNumpyClient(object):
|
|||||||
with client.stream_insert_numpy_context("/empty/test", end = 950):
|
with client.stream_insert_numpy_context("/empty/test", end = 950):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# Equal start and end is OK as long as there's no data
|
||||||
|
with assert_raises(ClientError) as e:
|
||||||
|
with client.stream_insert_numpy_context("/empty/test",
|
||||||
|
start=9, end=9) as ctx:
|
||||||
|
ctx.insert([[9, 9]])
|
||||||
|
ctx.finalize()
|
||||||
|
in_("have data to send, but invalid start/end times", str(e.exception))
|
||||||
|
|
||||||
|
with client.stream_insert_numpy_context("/empty/test",
|
||||||
|
start=9, end=9) as ctx:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# reusing a context object is bad
|
||||||
|
with assert_raises(Exception) as e:
|
||||||
|
ctx.insert([[9, 9]])
|
||||||
|
|
||||||
# Try various things that might cause problems
|
# Try various things that might cause problems
|
||||||
with client.stream_insert_numpy_context("/empty/test", 1000, 1050):
|
with client.stream_insert_numpy_context("/empty/test",
|
||||||
|
1000, 1050) as ctx:
|
||||||
ctx.finalize() # inserts [1000, 1050]
|
ctx.finalize() # inserts [1000, 1050]
|
||||||
ctx.finalize() # nothing
|
ctx.finalize() # nothing
|
||||||
ctx.finalize() # nothing
|
ctx.finalize() # nothing
|
||||||
|
Reference in New Issue
Block a user