Compare commits
14 Commits
nilmdb-1.4
...
nilmdb-1.4
Author | SHA1 | Date | |
---|---|---|---|
89be6f5931 | |||
4cdef3285d | |||
bcd82c4d59 | |||
caf63ab01f | |||
2d72891162 | |||
cda2ac3e77 | |||
57d3d60f6a | |||
d6b5befe76 | |||
7429c1788d | |||
0ef71c193b | |||
4a50dd015e | |||
22274550ab | |||
4f06d6ae68 | |||
c54d8041c3 |
@@ -186,6 +186,19 @@ IntervalSet speed
|
||||
- rbtree and interval converted to cython:
|
||||
8.4 μS, total 12 s, 134 MB RAM
|
||||
|
||||
- Would like to move Interval itself back to Python so other
|
||||
non-cythonized code like client code can use it more easily.
|
||||
Testing speed with just `test_interval` being tested, with
|
||||
`range(5,22)`, using `/usr/bin/time -v python tests/runtests.py`,
|
||||
times recorded for 2097152:
|
||||
- 52ae397 (Interval in cython):
|
||||
12.6133 μs each, ratio 0.866533, total 47 sec, 399 MB RAM
|
||||
- 9759dcf (Interval in python):
|
||||
21.2937 μs each, ratio 1.462870, total 83 sec, 1107 MB RAM
|
||||
That's a huge difference! Instead, will keep Interval and DBInterval
|
||||
cythonized inside nilmdb, and just have an additional copy in
|
||||
nilmdb.utils for clients to use.
|
||||
|
||||
Layouts
|
||||
-------
|
||||
Current/old design has specific layouts: RawData, PrepData, RawNotchedData.
|
||||
|
@@ -17,4 +17,4 @@ _nilmtool_argcomplete() {
|
||||
unset COMPREPLY
|
||||
fi
|
||||
}
|
||||
complete -o nospace -o default -F _nilmtool_argcomplete nilmtool
|
||||
complete -o nospace -F _nilmtool_argcomplete nilmtool
|
||||
|
@@ -97,7 +97,7 @@ class Client(object):
|
||||
return self.http.post("stream/create", params)
|
||||
|
||||
def stream_destroy(self, path):
|
||||
"""Delete stream and its contents"""
|
||||
"""Delete stream. Fails if any data is still present."""
|
||||
params = { "path": path }
|
||||
return self.http.post("stream/destroy", params)
|
||||
|
||||
@@ -171,7 +171,8 @@ class Client(object):
|
||||
params["end"] = timestamp_to_string(end)
|
||||
return self.http.get_gen("stream/intervals", params)
|
||||
|
||||
def stream_extract(self, path, start = None, end = None, count = False):
|
||||
def stream_extract(self, path, start = None, end = None,
|
||||
count = False, markup = False):
|
||||
"""
|
||||
Extract data from a stream. Returns a generator that yields
|
||||
lines of ASCII-formatted data that matches the database
|
||||
@@ -179,6 +180,9 @@ class Client(object):
|
||||
|
||||
Specify count = True to return a count of matching data points
|
||||
rather than the actual data. The output format is unchanged.
|
||||
|
||||
Specify markup = True to include comments in the returned data
|
||||
that indicate interval starts and ends.
|
||||
"""
|
||||
params = {
|
||||
"path": path,
|
||||
@@ -189,6 +193,8 @@ class Client(object):
|
||||
params["end"] = timestamp_to_string(end)
|
||||
if count:
|
||||
params["count"] = 1
|
||||
if markup:
|
||||
params["markup"] = 1
|
||||
return self.http.get_gen("stream/extract", params)
|
||||
|
||||
def stream_count(self, path, start = None, end = None):
|
||||
|
@@ -7,11 +7,14 @@ def setup(self, sub):
|
||||
cmd = sub.add_parser("destroy", help="Delete a stream and all data",
|
||||
formatter_class = def_form,
|
||||
description="""
|
||||
Destroy the stream at the specified path. All
|
||||
data and metadata related to the stream is
|
||||
permanently deleted.
|
||||
Destroy the stream at the specified path.
|
||||
The stream must be empty. All metadata
|
||||
related to the stream is permanently deleted.
|
||||
""")
|
||||
cmd.set_defaults(handler = cmd_destroy)
|
||||
group = cmd.add_argument_group("Options")
|
||||
group.add_argument("-R", "--remove", action="store_true",
|
||||
help="Remove all data before destroying stream")
|
||||
group = cmd.add_argument_group("Required arguments")
|
||||
group.add_argument("path",
|
||||
help="Path of the stream to delete, e.g. /foo/bar",
|
||||
@@ -20,6 +23,11 @@ def setup(self, sub):
|
||||
|
||||
def cmd_destroy(self):
|
||||
"""Destroy stream"""
|
||||
if self.args.remove:
|
||||
try:
|
||||
count = self.client.stream_remove(self.args.path)
|
||||
except nilmdb.client.ClientError as e:
|
||||
self.die("error removing data: %s", str(e))
|
||||
try:
|
||||
self.client.stream_destroy(self.args.path)
|
||||
except nilmdb.client.ClientError as e:
|
||||
|
@@ -29,6 +29,8 @@ def setup(self, sub):
|
||||
group.add_argument("-a", "--annotate", action="store_true",
|
||||
help="Include comments with some information "
|
||||
"about the stream")
|
||||
group.add_argument("-m", "--markup", action="store_true",
|
||||
help="Include comments with interval starts and ends")
|
||||
group.add_argument("-T", "--timestamp-raw", action="store_true",
|
||||
help="Show raw timestamps in annotated information")
|
||||
group.add_argument("-c", "--count", action="store_true",
|
||||
@@ -61,7 +63,8 @@ def cmd_extract(self):
|
||||
for dataline in self.client.stream_extract(self.args.path,
|
||||
self.args.start,
|
||||
self.args.end,
|
||||
self.args.count):
|
||||
self.args.count,
|
||||
self.args.markup):
|
||||
if self.args.bare and not self.args.count:
|
||||
# Strip timestamp (first element). Doesn't make sense
|
||||
# if we are only returning a count.
|
||||
|
@@ -1,5 +1,9 @@
|
||||
"""Interval, IntervalSet
|
||||
|
||||
The Interval implemented here is just like
|
||||
nilmdb.utils.interval.Interval, except implemented in Cython for
|
||||
speed.
|
||||
|
||||
Represents an interval of time, and a set of such intervals.
|
||||
|
||||
Intervals are half-open, ie. they include data points with timestamps
|
||||
@@ -23,6 +27,7 @@ from ..utils.time import min_timestamp as nilmdb_min_timestamp
|
||||
from ..utils.time import max_timestamp as nilmdb_max_timestamp
|
||||
from ..utils.time import timestamp_to_string
|
||||
from ..utils.iterator import imerge
|
||||
from ..utils.interval import IntervalError
|
||||
import itertools
|
||||
|
||||
cimport rbtree
|
||||
@@ -30,10 +35,6 @@ from libc.stdint cimport uint64_t, int64_t
|
||||
|
||||
ctypedef int64_t timestamp_t
|
||||
|
||||
class IntervalError(Exception):
|
||||
"""Error due to interval overlap, etc"""
|
||||
pass
|
||||
|
||||
cdef class Interval:
|
||||
"""Represents an interval of time."""
|
||||
|
||||
@@ -59,17 +60,7 @@ cdef class Interval:
|
||||
|
||||
def __cmp__(self, Interval other):
|
||||
"""Compare two intervals. If non-equal, order by start then end"""
|
||||
if not isinstance(other, Interval):
|
||||
raise TypeError("bad type")
|
||||
if self.start == other.start:
|
||||
if self.end < other.end:
|
||||
return -1
|
||||
if self.end > other.end:
|
||||
return 1
|
||||
return 0
|
||||
if self.start < other.start:
|
||||
return -1
|
||||
return 1
|
||||
return cmp(self.start, other.start) or cmp(self.end, other.end)
|
||||
|
||||
cpdef intersects(self, Interval other):
|
||||
"""Return True if two Interval objects intersect"""
|
||||
@@ -295,80 +286,18 @@ cdef class IntervalSet:
|
||||
(potentially) subsetted to make the one that is being
|
||||
returned.
|
||||
"""
|
||||
if not isinstance(interval, Interval):
|
||||
raise TypeError("bad type")
|
||||
for n in self.tree.intersect(interval.start, interval.end):
|
||||
i = n.obj
|
||||
if i:
|
||||
if i.start >= interval.start and i.end <= interval.end:
|
||||
if orig:
|
||||
yield (i, i)
|
||||
else:
|
||||
yield i
|
||||
else:
|
||||
subset = i.subset(max(i.start, interval.start),
|
||||
min(i.end, interval.end))
|
||||
if orig:
|
||||
yield (subset, i)
|
||||
else:
|
||||
yield subset
|
||||
|
||||
def set_difference(self, IntervalSet other not None,
|
||||
Interval bounds = None):
|
||||
"""
|
||||
Compute the difference (self \\ other) between this
|
||||
IntervalSet and the given IntervalSet; i.e., the ranges
|
||||
that are present in 'self' but not 'other'.
|
||||
|
||||
If 'bounds' is not None, results are limited to the range
|
||||
specified by the interval 'bounds'.
|
||||
|
||||
Returns a generator that yields each interval in turn.
|
||||
Output intervals are built as subsets of the intervals in the
|
||||
first argument (self).
|
||||
"""
|
||||
# Iterate through all starts and ends in sorted order. Add a
|
||||
# tag to the iterator so that we can figure out which one they
|
||||
# were, after sorting.
|
||||
def decorate(it, key_start, key_end):
|
||||
for i in it:
|
||||
yield i.start, key_start, i
|
||||
yield i.end, key_end, i
|
||||
if bounds is None:
|
||||
bounds = Interval(nilmdb_min_timestamp,
|
||||
nilmdb_max_timestamp)
|
||||
self_iter = decorate(self.intersection(bounds), 0, 2)
|
||||
other_iter = decorate(other.intersection(bounds), 1, 3)
|
||||
|
||||
# Now iterate over the timestamps of each start and end.
|
||||
# At each point, evaluate which type of end it is, to determine
|
||||
# how to build up the output intervals.
|
||||
self_interval = None
|
||||
other_interval = None
|
||||
out_start = None
|
||||
for (ts, k, i) in imerge(self_iter, other_iter):
|
||||
if k == 0:
|
||||
# start self interval
|
||||
self_interval = i
|
||||
if other_interval is None:
|
||||
out_start = ts
|
||||
elif k == 1:
|
||||
# start other interval
|
||||
other_interval = i
|
||||
if out_start is not None and out_start != ts:
|
||||
yield self_interval.subset(out_start, ts)
|
||||
out_start = None
|
||||
elif k == 2:
|
||||
# end self interval
|
||||
if out_start is not None and out_start != ts:
|
||||
yield self_interval.subset(out_start, ts)
|
||||
out_start = None
|
||||
self_interval = None
|
||||
elif k == 3:
|
||||
# end other interval
|
||||
other_interval = None
|
||||
if self_interval:
|
||||
out_start = ts
|
||||
if orig:
|
||||
for n in self.tree.intersect(interval.start, interval.end):
|
||||
i = n.obj
|
||||
subset = i.subset(max(i.start, interval.start),
|
||||
min(i.end, interval.end))
|
||||
yield (subset, i)
|
||||
else:
|
||||
for n in self.tree.intersect(interval.start, interval.end):
|
||||
i = n.obj
|
||||
subset = i.subset(max(i.start, interval.start),
|
||||
min(i.end, interval.end))
|
||||
yield subset
|
||||
|
||||
cpdef intersects(self, Interval other):
|
||||
"""Return True if this IntervalSet intersects another interval"""
|
||||
|
@@ -1,204 +0,0 @@
|
||||
# cython: profile=False
|
||||
|
||||
import time
|
||||
import sys
|
||||
import inspect
|
||||
import cStringIO
|
||||
|
||||
from ..utils.time import min_timestamp as nilmdb_min_timestamp
|
||||
|
||||
cdef enum:
|
||||
max_value_count = 64
|
||||
|
||||
cimport cython
|
||||
cimport libc.stdlib
|
||||
cimport libc.stdio
|
||||
cimport libc.string
|
||||
|
||||
class ParserError(Exception):
|
||||
def __init__(self, line, message):
|
||||
self.message = "line " + str(line) + ": " + message
|
||||
Exception.__init__(self, self.message)
|
||||
|
||||
class FormatterError(Exception):
|
||||
pass
|
||||
|
||||
class Layout:
|
||||
"""Represents a NILM database layout"""
|
||||
|
||||
def __init__(self, typestring):
|
||||
"""Initialize this Layout object to handle the specified
|
||||
type string"""
|
||||
try:
|
||||
[ datatype, count ] = typestring.split("_")
|
||||
except:
|
||||
raise KeyError("invalid layout string")
|
||||
|
||||
try:
|
||||
self.count = int(count)
|
||||
except ValueError:
|
||||
raise KeyError("invalid count")
|
||||
if self.count < 1 or self.count > max_value_count:
|
||||
raise KeyError("invalid count")
|
||||
|
||||
if datatype == 'uint16':
|
||||
self.parse = self.parse_uint16
|
||||
self.format_str = "%.6f" + " %d" * self.count
|
||||
self.format = self.format_generic
|
||||
elif datatype == 'float32':
|
||||
self.parse = self.parse_float64
|
||||
self.format_str = "%.6f" + " %.6e" * self.count
|
||||
self.format = self.format_generic
|
||||
elif datatype == 'float64':
|
||||
self.parse = self.parse_float64
|
||||
self.format_str = "%.6f" + " %.16e" * self.count
|
||||
self.format = self.format_generic
|
||||
else:
|
||||
raise KeyError("invalid type")
|
||||
|
||||
self.datatype = datatype
|
||||
|
||||
# Parsers
|
||||
def parse_float64(self, char *text):
|
||||
cdef int n
|
||||
cdef double ts
|
||||
# Return doubles even in float32 case, since they're going into
|
||||
# a Python array which would upconvert to double anyway.
|
||||
result = [0] * (self.count + 1)
|
||||
cdef char *end
|
||||
ts = libc.stdlib.strtod(text, &end)
|
||||
if end == text:
|
||||
raise ValueError("bad timestamp")
|
||||
result[0] = ts
|
||||
for n in range(self.count):
|
||||
text = end
|
||||
result[n+1] = libc.stdlib.strtod(text, &end)
|
||||
if end == text:
|
||||
raise ValueError("wrong number of values")
|
||||
n = 0
|
||||
while end[n] == ' ':
|
||||
n += 1
|
||||
if end[n] != '\n' and end[n] != '#' and end[n] != '\0':
|
||||
raise ValueError("extra data on line")
|
||||
return (ts, result)
|
||||
|
||||
def parse_uint16(self, char *text):
|
||||
cdef int n
|
||||
cdef double ts
|
||||
cdef int v
|
||||
cdef char *end
|
||||
result = [0] * (self.count + 1)
|
||||
ts = libc.stdlib.strtod(text, &end)
|
||||
if end == text:
|
||||
raise ValueError("bad timestamp")
|
||||
result[0] = ts
|
||||
for n in range(self.count):
|
||||
text = end
|
||||
v = libc.stdlib.strtol(text, &end, 10)
|
||||
if v < 0 or v > 65535:
|
||||
raise ValueError("value out of range")
|
||||
result[n+1] = v
|
||||
if end == text:
|
||||
raise ValueError("wrong number of values")
|
||||
n = 0
|
||||
while end[n] == ' ':
|
||||
n += 1
|
||||
if end[n] != '\n' and end[n] != '#' and end[n] != '\0':
|
||||
raise ValueError("extra data on line")
|
||||
return (ts, result)
|
||||
|
||||
# Formatters
|
||||
def format_generic(self, d):
|
||||
n = len(d) - 1
|
||||
if n != self.count:
|
||||
raise ValueError("wrong number of values for layout type: "
|
||||
"got %d, wanted %d" % (n, self.count))
|
||||
return (self.format_str % tuple(d)) + "\n"
|
||||
|
||||
# Get a layout by name
|
||||
def get_named(typestring):
|
||||
try:
|
||||
return Layout(typestring)
|
||||
except KeyError:
|
||||
compat = { "PrepData": "float32_8",
|
||||
"RawData": "uint16_6",
|
||||
"RawNotchedData": "uint16_9" }
|
||||
return Layout(compat[typestring])
|
||||
|
||||
class Parser(object):
|
||||
"""Object that parses and stores ASCII data for inclusion into the
|
||||
database"""
|
||||
|
||||
def __init__(self, layout):
|
||||
if issubclass(layout.__class__, Layout):
|
||||
self.layout = layout
|
||||
else:
|
||||
try:
|
||||
self.layout = get_named(layout)
|
||||
except KeyError:
|
||||
raise TypeError("unknown layout")
|
||||
|
||||
self.data = []
|
||||
self.min_timestamp = None
|
||||
self.max_timestamp = None
|
||||
|
||||
def parse(self, textdata):
|
||||
"""
|
||||
Parse the data, provided as lines of text, using the current
|
||||
layout, into an internal data structure suitable for a
|
||||
pytables 'table.append(parser.data)'.
|
||||
"""
|
||||
cdef double last_ts = nilmdb_min_timestamp
|
||||
cdef double ts
|
||||
cdef int n = 0, i
|
||||
cdef char *line
|
||||
|
||||
indata = cStringIO.StringIO(textdata)
|
||||
# Assume any parsing error is a real error.
|
||||
# In the future we might want to skip completely empty lines,
|
||||
# or partial lines right before EOF?
|
||||
try:
|
||||
self.data = []
|
||||
for pyline in indata:
|
||||
line = pyline
|
||||
n += 1
|
||||
if line[0] == '\#':
|
||||
continue
|
||||
(ts, row) = self.layout.parse(line)
|
||||
if ts <= last_ts:
|
||||
raise ValueError("timestamp is not "
|
||||
"monotonically increasing")
|
||||
last_ts = ts
|
||||
self.data.append(row)
|
||||
except (ValueError, IndexError, TypeError) as e:
|
||||
raise ParserError(n, "error: " + e.message)
|
||||
|
||||
# Mark timestamp ranges
|
||||
if len(self.data):
|
||||
self.min_timestamp = self.data[0][0]
|
||||
self.max_timestamp = self.data[-1][0]
|
||||
|
||||
class Formatter(object):
|
||||
"""Object that formats database data into ASCII"""
|
||||
|
||||
def __init__(self, layout):
|
||||
if issubclass(layout.__class__, Layout):
|
||||
self.layout = layout
|
||||
else:
|
||||
try:
|
||||
self.layout = get_named(layout)
|
||||
except KeyError:
|
||||
raise TypeError("unknown layout")
|
||||
|
||||
def format(self, data):
|
||||
"""
|
||||
Format raw data from the database, using the current layout,
|
||||
as lines of ACSII text.
|
||||
"""
|
||||
text = cStringIO.StringIO()
|
||||
try:
|
||||
for row in data:
|
||||
text.write(self.layout.format(row))
|
||||
except (ValueError, IndexError, TypeError) as e:
|
||||
raise FormatterError("formatting error: " + e.message)
|
||||
return text.getvalue()
|
@@ -12,8 +12,11 @@ Manages both the SQL database and the table storage backend.
|
||||
from __future__ import absolute_import
|
||||
import nilmdb.utils
|
||||
from nilmdb.utils.printf import *
|
||||
from nilmdb.server.interval import (Interval, DBInterval,
|
||||
IntervalSet, IntervalError)
|
||||
from nilmdb.utils.time import timestamp_to_string
|
||||
|
||||
from nilmdb.utils.interval import IntervalError
|
||||
from nilmdb.server.interval import Interval, DBInterval, IntervalSet
|
||||
|
||||
from nilmdb.server import bulkdata
|
||||
from nilmdb.server.errors import NilmDBError, StreamError, OverlapError
|
||||
|
||||
@@ -81,7 +84,18 @@ class NilmDB(object):
|
||||
verbose = 0
|
||||
|
||||
def __init__(self, basepath, max_results=None,
|
||||
bulkdata_args=None):
|
||||
max_removals=None, bulkdata_args=None):
|
||||
"""Initialize NilmDB at the given basepath.
|
||||
Other arguments are for debugging / testing:
|
||||
|
||||
'max_results' is the max rows to send in a single
|
||||
stream_intervals or stream_extract response.
|
||||
|
||||
'max_removals' is the max rows to delete at once
|
||||
in stream_move.
|
||||
|
||||
'bulkdata_args' is kwargs for the bulkdata module.
|
||||
"""
|
||||
if bulkdata_args is None:
|
||||
bulkdata_args = {}
|
||||
|
||||
@@ -111,11 +125,11 @@ class NilmDB(object):
|
||||
self.con.execute("PRAGMA journal_mode=WAL")
|
||||
|
||||
# Approximate largest number of elements that we want to send
|
||||
# in a single reply (for stream_intervals, stream_extract)
|
||||
if max_results:
|
||||
self.max_results = max_results
|
||||
else:
|
||||
self.max_results = 16384
|
||||
# in a single reply (for stream_intervals, stream_extract).
|
||||
self.max_results = max_results or 16384
|
||||
|
||||
# Remove up to this many rows per call to stream_remove.
|
||||
self.max_removals = max_removals or 1048576
|
||||
|
||||
def get_basepath(self):
|
||||
return self.basepath
|
||||
@@ -328,18 +342,18 @@ class NilmDB(object):
|
||||
List all intervals in 'path' between 'start' and 'end'. If
|
||||
'diffpath' is not none, list instead the set-difference
|
||||
between the intervals in the two streams; i.e. all interval
|
||||
ranges that are present in 'path' but not 'path2'.
|
||||
ranges that are present in 'path' but not 'diffpath'.
|
||||
|
||||
Returns (intervals, restart) tuple.
|
||||
|
||||
intervals is a list of [start,end] timestamps of all intervals
|
||||
'intervals' is a list of [start,end] timestamps of all intervals
|
||||
that exist for path, between start and end.
|
||||
|
||||
restart, if nonzero, means that there were too many results to
|
||||
return in a single request. The data is complete from the
|
||||
starting timestamp to the point at which it was truncated,
|
||||
and a new request with a start time of 'restart' will fetch
|
||||
the next block of data.
|
||||
'restart', if not None, means that there were too many results
|
||||
to return in a single request. The data is complete from the
|
||||
starting timestamp to the point at which it was truncated, and
|
||||
a new request with a start time of 'restart' will fetch the
|
||||
next block of data.
|
||||
"""
|
||||
stream_id = self._stream_id(path)
|
||||
intervals = self._get_intervals(stream_id)
|
||||
@@ -350,7 +364,9 @@ class NilmDB(object):
|
||||
requested = Interval(start, end)
|
||||
result = []
|
||||
if diffpath:
|
||||
getter = intervals.set_difference(diffintervals, requested)
|
||||
getter = nilmdb.utils.interval.set_difference(
|
||||
intervals.intersection(requested),
|
||||
diffintervals.intersection(requested))
|
||||
else:
|
||||
getter = intervals.intersection(requested)
|
||||
for n, i in enumerate(getter):
|
||||
@@ -359,7 +375,7 @@ class NilmDB(object):
|
||||
break
|
||||
result.append([i.start, i.end])
|
||||
else:
|
||||
restart = 0
|
||||
restart = None
|
||||
return (result, restart)
|
||||
|
||||
def stream_create(self, path, layout_name):
|
||||
@@ -435,17 +451,22 @@ class NilmDB(object):
|
||||
(newpath, stream_id))
|
||||
|
||||
def stream_destroy(self, path):
|
||||
"""Fully remove a table and all of its data from the database.
|
||||
No way to undo it! Metadata is removed."""
|
||||
"""Fully remove a table from the database. Fails if there are
|
||||
any intervals data present; remove them first. Metadata is
|
||||
also removed."""
|
||||
stream_id = self._stream_id(path)
|
||||
|
||||
# Delete the cached interval data (if it was cached)
|
||||
# Verify that no intervals are present, and clear the cache
|
||||
iset = self._get_intervals(stream_id)
|
||||
if len(iset):
|
||||
raise NilmDBError("all intervals must be removed before "
|
||||
"destroying a stream")
|
||||
self._get_intervals.cache_remove(self, stream_id)
|
||||
|
||||
# Delete the data
|
||||
# Delete the bulkdata storage
|
||||
self.data.destroy(path)
|
||||
|
||||
# Delete metadata, stream, intervals
|
||||
# Delete metadata, stream, intervals (should be none)
|
||||
with self.con as con:
|
||||
con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,))
|
||||
con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
|
||||
@@ -513,23 +534,28 @@ class NilmDB(object):
|
||||
dbinterval.db_startpos,
|
||||
dbinterval.db_endpos)
|
||||
|
||||
def stream_extract(self, path, start = None, end = None, count = False):
|
||||
def stream_extract(self, path, start = None, end = None,
|
||||
count = False, markup = False):
|
||||
"""
|
||||
Returns (data, restart) tuple.
|
||||
|
||||
data is ASCII-formatted data from the database, formatted
|
||||
'data' is ASCII-formatted data from the database, formatted
|
||||
according to the layout of the stream.
|
||||
|
||||
restart, if nonzero, means that there were too many results to
|
||||
'restart', if not None, means that there were too many results to
|
||||
return in a single request. The data is complete from the
|
||||
starting timestamp to the point at which it was truncated,
|
||||
and a new request with a start time of 'restart' will fetch
|
||||
the next block of data.
|
||||
|
||||
count, if true, means to not return raw data, but just the count
|
||||
'count', if true, means to not return raw data, but just the count
|
||||
of rows that would have been returned. This is much faster
|
||||
than actually fetching the data. It is not limited by
|
||||
max_results.
|
||||
|
||||
'markup', if true, indicates that returned data should be
|
||||
marked with a comment denoting when a particular interval
|
||||
starts, and another comment when an interval ends.
|
||||
"""
|
||||
stream_id = self._stream_id(path)
|
||||
table = self.data.getnode(path)
|
||||
@@ -539,7 +565,7 @@ class NilmDB(object):
|
||||
result = []
|
||||
matched = 0
|
||||
remaining = self.max_results
|
||||
restart = 0
|
||||
restart = None
|
||||
for interval in intervals.intersection(requested):
|
||||
# Reading single rows from the table is too slow, so
|
||||
# we use two bisections to find both the starting and
|
||||
@@ -558,14 +584,26 @@ class NilmDB(object):
|
||||
row_end = row_max
|
||||
restart = table[row_max]
|
||||
|
||||
# Add markup
|
||||
if markup:
|
||||
result.append("# interval-start " +
|
||||
timestamp_to_string(interval.start) + "\n")
|
||||
|
||||
# Gather these results up
|
||||
result.append(table.get_data(row_start, row_end))
|
||||
|
||||
# Count them
|
||||
remaining -= row_end - row_start
|
||||
|
||||
if restart:
|
||||
# Add markup, and exit if restart is set.
|
||||
if restart is not None:
|
||||
if markup:
|
||||
result.append("# interval-end " +
|
||||
timestamp_to_string(restart) + "\n")
|
||||
break
|
||||
if markup:
|
||||
result.append("# interval-end " +
|
||||
timestamp_to_string(interval.end) + "\n")
|
||||
|
||||
if count:
|
||||
return matched
|
||||
@@ -574,9 +612,17 @@ class NilmDB(object):
|
||||
def stream_remove(self, path, start = None, end = None):
|
||||
"""
|
||||
Remove data from the specified time interval within a stream.
|
||||
Removes all data in the interval [start, end), and intervals
|
||||
are truncated or split appropriately. Returns the number of
|
||||
data points removed.
|
||||
|
||||
Removes data in the interval [start, end), and intervals are
|
||||
truncated or split appropriately.
|
||||
|
||||
Returns a (removed, restart) tuple.
|
||||
|
||||
'removed' is the number of data points that were removed.
|
||||
|
||||
'restart', if not None, means there were too many rows to
|
||||
remove in a single request. This function should be called
|
||||
again with a start time of 'restart' to complete the removal.
|
||||
"""
|
||||
stream_id = self._stream_id(path)
|
||||
table = self.data.getnode(path)
|
||||
@@ -584,6 +630,8 @@ class NilmDB(object):
|
||||
(start, end) = self._check_user_times(start, end)
|
||||
to_remove = Interval(start, end)
|
||||
removed = 0
|
||||
remaining = self.max_removals
|
||||
restart = None
|
||||
|
||||
# Can't remove intervals from within the iterator, so we need to
|
||||
# remember what's currently in the intersection now.
|
||||
@@ -594,6 +642,13 @@ class NilmDB(object):
|
||||
row_start = self._find_start(table, dbint)
|
||||
row_end = self._find_end(table, dbint)
|
||||
|
||||
# Shorten it if we'll hit the maximum number of removals
|
||||
row_max = row_start + remaining
|
||||
if row_max < row_end:
|
||||
row_end = row_max
|
||||
dbint.end = table[row_max]
|
||||
restart = dbint.end
|
||||
|
||||
# Adjust the DBInterval to match the newly found ends
|
||||
dbint.db_start = dbint.start
|
||||
dbint.db_end = dbint.end
|
||||
@@ -609,4 +664,7 @@ class NilmDB(object):
|
||||
# Count how many were removed
|
||||
removed += row_end - row_start
|
||||
|
||||
return removed
|
||||
if restart is not None:
|
||||
break
|
||||
|
||||
return (removed, restart)
|
||||
|
@@ -210,7 +210,7 @@ class Stream(NilmApp):
|
||||
@exception_to_httperror(NilmDBError)
|
||||
@cherrypy.tools.CORS_allow(methods = ["POST"])
|
||||
def destroy(self, path):
|
||||
"""Delete a stream and its associated data."""
|
||||
"""Delete a stream. Fails if any data is still present."""
|
||||
return self.db.stream_destroy(path)
|
||||
|
||||
# /stream/rename?oldpath=/newton/prep&newpath=/newton/prep/1
|
||||
@@ -339,7 +339,14 @@ class Stream(NilmApp):
|
||||
if start >= end:
|
||||
raise cherrypy.HTTPError("400 Bad Request",
|
||||
"start must precede end")
|
||||
return self.db.stream_remove(path, start, end)
|
||||
total_removed = 0
|
||||
while True:
|
||||
(removed, restart) = self.db.stream_remove(path, start, end)
|
||||
total_removed += removed
|
||||
if restart is None:
|
||||
break
|
||||
start = restart
|
||||
return total_removed
|
||||
|
||||
# /stream/intervals?path=/newton/prep
|
||||
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||
@@ -386,7 +393,7 @@ class Stream(NilmApp):
|
||||
diffpath)
|
||||
response = ''.join([ json.dumps(i) + "\r\n" for i in ints ])
|
||||
yield response
|
||||
if restart == 0:
|
||||
if restart is None:
|
||||
break
|
||||
start = restart
|
||||
return content(start, end)
|
||||
@@ -395,14 +402,18 @@ class Stream(NilmApp):
|
||||
@cherrypy.expose
|
||||
@chunked_response
|
||||
@response_type("text/plain")
|
||||
def extract(self, path, start = None, end = None, count = False):
|
||||
def extract(self, path, start = None, end = None,
|
||||
count = False, markup = False):
|
||||
"""
|
||||
Extract data from backend database. Streams the resulting
|
||||
entries as ASCII text lines separated by newlines. This may
|
||||
make multiple requests to the nilmdb backend to avoid causing
|
||||
it to block for too long.
|
||||
|
||||
Add count=True to return a count rather than actual data.
|
||||
If 'count' is True, returns a count rather than actual data.
|
||||
|
||||
If 'markup' is True, adds comments to the stream denoting each
|
||||
interval's start and end timestamp.
|
||||
"""
|
||||
if start is not None:
|
||||
start = string_to_timestamp(start)
|
||||
@@ -421,21 +432,23 @@ class Stream(NilmApp):
|
||||
raise cherrypy.HTTPError("404 Not Found", "No such stream")
|
||||
|
||||
@workaround_cp_bug_1200
|
||||
def content(start, end, count):
|
||||
def content(start, end):
|
||||
# Note: disable chunked responses to see tracebacks from here.
|
||||
if count:
|
||||
matched = self.db.stream_extract(path, start, end, count)
|
||||
matched = self.db.stream_extract(path, start, end,
|
||||
count = True)
|
||||
yield sprintf("%d\n", matched)
|
||||
return
|
||||
|
||||
while True:
|
||||
(data, restart) = self.db.stream_extract(path, start, end)
|
||||
(data, restart) = self.db.stream_extract(
|
||||
path, start, end, count = False, markup = markup)
|
||||
yield data
|
||||
|
||||
if restart == 0:
|
||||
if restart is None:
|
||||
return
|
||||
start = restart
|
||||
return content(start, end, count)
|
||||
return content(start, end)
|
||||
|
||||
class Exiter(object):
|
||||
"""App that exits the server, for testing"""
|
||||
|
@@ -11,3 +11,4 @@ import nilmdb.utils.threadsafety
|
||||
import nilmdb.utils.fallocate
|
||||
import nilmdb.utils.time
|
||||
import nilmdb.utils.iterator
|
||||
import nilmdb.utils.interval
|
||||
|
106
nilmdb/utils/interval.py
Normal file
106
nilmdb/utils/interval.py
Normal file
@@ -0,0 +1,106 @@
|
||||
"""Interval. Like nilmdb.server.interval, but re-implemented here
|
||||
in plain Python so clients have easier access to it.
|
||||
|
||||
Intervals are half-open, ie. they include data points with timestamps
|
||||
[start, end)
|
||||
"""
|
||||
|
||||
import nilmdb.utils.time
|
||||
import nilmdb.utils.iterator
|
||||
|
||||
class IntervalError(Exception):
|
||||
"""Error due to interval overlap, etc"""
|
||||
pass
|
||||
|
||||
# Interval
|
||||
class Interval:
|
||||
"""Represents an interval of time."""
|
||||
|
||||
def __init__(self, start, end):
|
||||
"""
|
||||
'start' and 'end' are arbitrary numbers that represent time
|
||||
"""
|
||||
if start >= end:
|
||||
# Explicitly disallow zero-width intervals (since they're half-open)
|
||||
raise IntervalError("start %s must precede end %s" % (start, end))
|
||||
self.start = start
|
||||
self.end = end
|
||||
|
||||
def __repr__(self):
|
||||
s = repr(self.start) + ", " + repr(self.end)
|
||||
return self.__class__.__name__ + "(" + s + ")"
|
||||
|
||||
def __str__(self):
|
||||
return ("[" + nilmdb.utils.time.timestamp_to_string(self.start) +
|
||||
" -> " + nilmdb.utils.time.timestamp_to_string(self.end) + ")")
|
||||
|
||||
def __cmp__(self, other):
|
||||
"""Compare two intervals. If non-equal, order by start then end"""
|
||||
return cmp(self.start, other.start) or cmp(self.end, other.end)
|
||||
|
||||
def intersects(self, other):
|
||||
"""Return True if two Interval objects intersect"""
|
||||
if not isinstance(other, Interval):
|
||||
raise TypeError("need an Interval")
|
||||
if self.end <= other.start or self.start >= other.end:
|
||||
return False
|
||||
return True
|
||||
|
||||
def subset(self, start, end):
|
||||
"""Return a new Interval that is a subset of this one"""
|
||||
# A subclass that tracks additional data might override this.
|
||||
if start < self.start or end > self.end:
|
||||
raise IntervalError("not a subset")
|
||||
return Interval(start, end)
|
||||
|
||||
def set_difference(a, b):
|
||||
"""
|
||||
Compute the difference (a \\ b) between the intervals in 'a' and
|
||||
the intervals in 'b'; i.e., the ranges that are present in 'self'
|
||||
but not 'other'.
|
||||
|
||||
'a' and 'b' must both be iterables.
|
||||
|
||||
Returns a generator that yields each interval in turn.
|
||||
Output intervals are built as subsets of the intervals in the
|
||||
first argument (a).
|
||||
"""
|
||||
# Iterate through all starts and ends in sorted order. Add a
|
||||
# tag to the iterator so that we can figure out which one they
|
||||
# were, after sorting.
|
||||
def decorate(it, key_start, key_end):
|
||||
for i in it:
|
||||
yield i.start, key_start, i
|
||||
yield i.end, key_end, i
|
||||
a_iter = decorate(iter(a), 0, 2)
|
||||
b_iter = decorate(iter(b), 1, 3)
|
||||
|
||||
# Now iterate over the timestamps of each start and end.
|
||||
# At each point, evaluate which type of end it is, to determine
|
||||
# how to build up the output intervals.
|
||||
a_interval = None
|
||||
b_interval = None
|
||||
out_start = None
|
||||
for (ts, k, i) in nilmdb.utils.iterator.imerge(a_iter, b_iter):
|
||||
if k == 0:
|
||||
# start a interval
|
||||
a_interval = i
|
||||
if b_interval is None:
|
||||
out_start = ts
|
||||
elif k == 1:
|
||||
# start b interval
|
||||
b_interval = i
|
||||
if out_start is not None and out_start != ts:
|
||||
yield a_interval.subset(out_start, ts)
|
||||
out_start = None
|
||||
elif k == 2:
|
||||
# end a interval
|
||||
if out_start is not None and out_start != ts:
|
||||
yield a_interval.subset(out_start, ts)
|
||||
out_start = None
|
||||
a_interval = None
|
||||
elif k == 3:
|
||||
# end b interval
|
||||
b_interval = None
|
||||
if a_interval:
|
||||
out_start = ts
|
@@ -1,5 +1,8 @@
|
||||
from __future__ import absolute_import
|
||||
|
||||
from nilmdb.utils import datetime_tz
|
||||
import re
|
||||
import time
|
||||
|
||||
# Range
|
||||
min_timestamp = (-2**63)
|
||||
@@ -36,6 +39,7 @@ def unix_to_timestamp(unix):
|
||||
"""Convert a Unix timestamp (floating point seconds since epoch)
|
||||
into a NILM timestamp (integer microseconds since epoch)"""
|
||||
return int(round(unix * 1e6))
|
||||
seconds_to_timestamp = unix_to_timestamp
|
||||
|
||||
def timestamp_to_unix(timestamp):
|
||||
"""Convert a NILM timestamp (integer microseconds since epoch)
|
||||
@@ -56,6 +60,11 @@ def parse_time(toparse):
|
||||
timestamp, the current local timezone is assumed (e.g. from the TZ
|
||||
env var).
|
||||
"""
|
||||
if toparse == "min":
|
||||
return min_timestamp
|
||||
if toparse == "max":
|
||||
return max_timestamp
|
||||
|
||||
# If string isn't "now" and doesn't contain at least 4 digits,
|
||||
# consider it invalid. smartparse might otherwise accept
|
||||
# empty strings and strings with just separators.
|
||||
@@ -118,4 +127,4 @@ def parse_time(toparse):
|
||||
|
||||
def now():
|
||||
"""Return current timestamp"""
|
||||
return unix_to_timestamp(datetime_tz.datetime_tz.utcnow().totimestamp())
|
||||
return unix_to_timestamp(time.time())
|
||||
|
1
setup.py
1
setup.py
@@ -43,7 +43,6 @@ except: pass
|
||||
|
||||
# Use Cython if it's new enough, otherwise use preexisting C files.
|
||||
cython_modules = [ 'nilmdb.server.interval',
|
||||
'nilmdb.server.layout',
|
||||
'nilmdb.server.rbtree' ]
|
||||
try:
|
||||
import Cython
|
||||
|
28
tests/data/extract-8
Normal file
28
tests/data/extract-8
Normal file
@@ -0,0 +1,28 @@
|
||||
# interval-start 1332496919900000
|
||||
1332496919900000 2.523050e+05 2.254020e+05 4.779410e+03 3.638030e+03 8.138070e+03 4.334460e+03 1.083780e+03 3.743730e+03
|
||||
1332496919908333 2.551190e+05 2.237870e+05 5.965640e+03 2.076350e+03 9.468790e+03 3.693880e+03 1.247860e+03 3.393680e+03
|
||||
1332496919916667 2.616370e+05 2.247980e+05 4.848970e+03 2.315620e+03 9.323300e+03 4.225460e+03 1.805780e+03 2.593050e+03
|
||||
1332496919925000 2.606460e+05 2.251300e+05 3.061360e+03 3.951840e+03 7.662910e+03 5.341410e+03 1.986520e+03 2.276780e+03
|
||||
1332496919933333 2.559710e+05 2.235030e+05 4.096030e+03 3.296970e+03 7.827080e+03 5.452120e+03 2.492520e+03 2.929450e+03
|
||||
1332496919941667 2.579260e+05 2.217080e+05 5.472320e+03 1.555700e+03 8.495760e+03 4.491140e+03 2.379780e+03 3.741710e+03
|
||||
1332496919950000 2.610180e+05 2.242350e+05 4.669770e+03 1.876190e+03 8.366680e+03 3.677510e+03 9.021690e+02 3.549040e+03
|
||||
1332496919958333 2.569150e+05 2.274650e+05 2.785070e+03 3.751930e+03 7.440320e+03 3.964860e+03 -3.227860e+02 2.460890e+03
|
||||
1332496919966667 2.509510e+05 2.262000e+05 3.772710e+03 3.131950e+03 8.159860e+03 4.539860e+03 7.375190e+02 2.126750e+03
|
||||
1332496919975000 2.556710e+05 2.223720e+05 5.826200e+03 8.715560e+02 9.120240e+03 4.545110e+03 2.804310e+03 2.721000e+03
|
||||
1332496919983333 2.649730e+05 2.214860e+05 5.839130e+03 4.659180e+02 8.628300e+03 3.934870e+03 2.972490e+03 3.773730e+03
|
||||
1332496919991667 2.652170e+05 2.233920e+05 3.718770e+03 2.834970e+03 7.209900e+03 3.460260e+03 1.324930e+03 4.075960e+03
|
||||
# interval-end 1332496919991668
|
||||
# interval-start 1332496920000000
|
||||
1332496920000000 2.564370e+05 2.244300e+05 4.011610e+03 3.475340e+03 7.495890e+03 3.388940e+03 2.613970e+02 3.731260e+03
|
||||
1332496920008333 2.539630e+05 2.241670e+05 5.621070e+03 1.548010e+03 9.165170e+03 3.522930e+03 1.058930e+03 2.996960e+03
|
||||
1332496920016667 2.585080e+05 2.249300e+05 6.011400e+03 8.188660e+02 9.039950e+03 4.482440e+03 2.490390e+03 2.679340e+03
|
||||
1332496920025000 2.596270e+05 2.260220e+05 4.474500e+03 2.423020e+03 7.414190e+03 5.071970e+03 2.439380e+03 2.962960e+03
|
||||
1332496920033333 2.551870e+05 2.246320e+05 4.738570e+03 3.398040e+03 7.395120e+03 4.726450e+03 1.839030e+03 3.393530e+03
|
||||
1332496920041667 2.571020e+05 2.216230e+05 6.144130e+03 1.441090e+03 8.756480e+03 3.495320e+03 1.869940e+03 3.752530e+03
|
||||
1332496920050000 2.636530e+05 2.217700e+05 6.221770e+03 7.389620e+02 9.547600e+03 2.666820e+03 1.462660e+03 3.332570e+03
|
||||
1332496920058333 2.636130e+05 2.252560e+05 4.477120e+03 2.437450e+03 8.510210e+03 3.855630e+03 9.594420e+02 2.387180e+03
|
||||
1332496920066667 2.553500e+05 2.262640e+05 4.283720e+03 3.923940e+03 7.912470e+03 5.466520e+03 1.284990e+03 2.093720e+03
|
||||
1332496920075000 2.527270e+05 2.246090e+05 5.851930e+03 2.491980e+03 8.540630e+03 5.623050e+03 2.339780e+03 3.007140e+03
|
||||
1332496920083333 2.584750e+05 2.235780e+05 5.924870e+03 1.394480e+03 8.779620e+03 4.544180e+03 2.132030e+03 3.849760e+03
|
||||
1332496920091667 2.615630e+05 2.246090e+05 4.336140e+03 2.455750e+03 8.055380e+03 3.469110e+03 6.278730e+02 3.664200e+03
|
||||
# interval-end 1332496920100000
|
@@ -7,7 +7,6 @@ test_serializer.py
|
||||
test_iteratorizer.py
|
||||
|
||||
test_timestamper.py
|
||||
test_layout.py
|
||||
test_rbtree.py
|
||||
test_interval.py
|
||||
|
||||
|
@@ -375,6 +375,7 @@ class TestClient(object):
|
||||
|
||||
# Delete streams that exist
|
||||
for stream in client.stream_list():
|
||||
client.stream_remove(stream[0])
|
||||
client.stream_destroy(stream[0])
|
||||
|
||||
# Database is empty
|
||||
@@ -506,6 +507,10 @@ class TestClient(object):
|
||||
[ 109, 118 ],
|
||||
[ 200, 300 ] ])
|
||||
|
||||
# destroy stream (try without removing data first)
|
||||
with assert_raises(ClientError):
|
||||
client.stream_destroy("/context/test")
|
||||
client.stream_remove("/context/test")
|
||||
client.stream_destroy("/context/test")
|
||||
client.close()
|
||||
|
||||
@@ -600,6 +605,7 @@ class TestClient(object):
|
||||
])
|
||||
|
||||
# Clean up
|
||||
client.stream_remove("/empty/test")
|
||||
client.stream_destroy("/empty/test")
|
||||
client.close()
|
||||
|
||||
@@ -635,8 +641,9 @@ class TestClient(object):
|
||||
eq_(connections(), (1, 5))
|
||||
|
||||
# Clean up
|
||||
c.stream_remove("/persist/test")
|
||||
c.stream_destroy("/persist/test")
|
||||
eq_(connections(), (1, 6))
|
||||
eq_(connections(), (1, 7))
|
||||
|
||||
def test_client_13_timestamp_rounding(self):
|
||||
# Test potentially bad timestamps (due to floating point
|
||||
@@ -661,5 +668,6 @@ class TestClient(object):
|
||||
# Server will round this and give an error on finalize()
|
||||
ctx.insert("299999999.99 1\n")
|
||||
|
||||
client.stream_remove("/rounding/test")
|
||||
client.stream_destroy("/rounding/test")
|
||||
client.close()
|
||||
|
@@ -21,12 +21,13 @@ from testutil.helpers import *
|
||||
|
||||
testdb = "tests/cmdline-testdb"
|
||||
|
||||
def server_start(max_results = None, bulkdata_args = {}):
|
||||
def server_start(max_results = None, max_removals = None, bulkdata_args = {}):
|
||||
global test_server, test_db
|
||||
# Start web app on a custom port
|
||||
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
|
||||
testdb,
|
||||
max_results = max_results,
|
||||
max_removals = max_removals,
|
||||
bulkdata_args = bulkdata_args)
|
||||
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
|
||||
port = 32180, stoppable = False,
|
||||
@@ -233,6 +234,8 @@ class TestCmdline(object):
|
||||
eq_(parse_time("1333648800.0"), test)
|
||||
eq_(parse_time("1333648800000000"), test)
|
||||
eq_(parse_time("@1333648800000000"), test)
|
||||
eq_(parse_time("min"), nilmdb.utils.time.min_timestamp)
|
||||
eq_(parse_time("max"), nilmdb.utils.time.max_timestamp)
|
||||
with assert_raises(ValueError):
|
||||
parse_time("@hashtag12345")
|
||||
|
||||
@@ -593,6 +596,8 @@ class TestCmdline(object):
|
||||
test(6, "10:00:30", "10:00:31", extra="-b")
|
||||
test(7, "10:00:30", "10:00:30.999", extra="-a -T")
|
||||
test(7, "10:00:30", "10:00:30.999", extra="-a --timestamp-raw")
|
||||
test(8, "10:01:59.9", "10:02:00.1", extra="--markup")
|
||||
test(8, "10:01:59.9", "10:02:00.1", extra="-m")
|
||||
|
||||
# all data put in by tests
|
||||
self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
@@ -600,6 +605,11 @@ class TestCmdline(object):
|
||||
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
self.match("43200\n")
|
||||
|
||||
# markup for 3 intervals, plus extra markup lines whenever we had
|
||||
# a "restart" from the nilmdb.stream_extract function
|
||||
self.ok("extract -m /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
lines_(self.captured, 43210)
|
||||
|
||||
def test_09_truncated(self):
|
||||
# Test truncated responses by overriding the nilmdb max_results
|
||||
server_stop()
|
||||
@@ -699,11 +709,9 @@ class TestCmdline(object):
|
||||
|
||||
# Reinsert some data, to verify that no overlaps with deleted
|
||||
# data are reported
|
||||
os.environ['TZ'] = "UTC"
|
||||
self.ok("insert --timestamp -f --rate 120 /newton/prep "
|
||||
"tests/data/prep-20120323T1000")
|
||||
self.ok("insert -t --filename --rate 120 /newton/prep "
|
||||
"tests/data/prep-20120323T1002")
|
||||
for minute in ["0", "2"]:
|
||||
self.ok("insert --timestamp -f --rate 120 /newton/prep"
|
||||
" tests/data/prep-20120323T100" + minute)
|
||||
|
||||
def test_11_destroy(self):
|
||||
# Delete records
|
||||
@@ -715,6 +723,9 @@ class TestCmdline(object):
|
||||
self.fail("destroy /no/such/stream")
|
||||
self.contain("No stream at path")
|
||||
|
||||
self.fail("destroy -R /no/such/stream")
|
||||
self.contain("No stream at path")
|
||||
|
||||
self.fail("destroy asdfasdf")
|
||||
self.contain("No stream at path")
|
||||
|
||||
@@ -728,8 +739,14 @@ class TestCmdline(object):
|
||||
self.ok("list --detail")
|
||||
lines_(self.captured, 7)
|
||||
|
||||
# Delete some
|
||||
self.ok("destroy /newton/prep")
|
||||
# Fail to destroy because intervals still present
|
||||
self.fail("destroy /newton/prep")
|
||||
self.contain("all intervals must be removed")
|
||||
self.ok("list --detail")
|
||||
lines_(self.captured, 7)
|
||||
|
||||
# Destroy for real
|
||||
self.ok("destroy -R /newton/prep")
|
||||
self.ok("list")
|
||||
self.match("/newton/raw uint16_6\n"
|
||||
"/newton/zzz/rawnotch uint16_9\n")
|
||||
@@ -740,7 +757,8 @@ class TestCmdline(object):
|
||||
|
||||
self.ok("destroy /newton/raw")
|
||||
self.ok("create /newton/raw uint16_6")
|
||||
self.ok("destroy /newton/raw")
|
||||
# Specify --remove with no data
|
||||
self.ok("destroy --remove /newton/raw")
|
||||
self.ok("list")
|
||||
self.match("")
|
||||
|
||||
@@ -815,7 +833,7 @@ class TestCmdline(object):
|
||||
|
||||
# Now recreate the data one more time and make sure there are
|
||||
# fewer files.
|
||||
self.ok("destroy /newton/prep")
|
||||
self.ok("destroy --remove /newton/prep")
|
||||
self.fail("destroy /newton/prep") # already destroyed
|
||||
self.ok("create /newton/prep float32_8")
|
||||
os.environ['TZ'] = "UTC"
|
||||
@@ -826,14 +844,16 @@ class TestCmdline(object):
|
||||
for (dirpath, dirnames, filenames) in os.walk(testdb):
|
||||
nfiles += len(filenames)
|
||||
lt_(nfiles, 50)
|
||||
self.ok("destroy /newton/prep") # destroy again
|
||||
self.ok("destroy -R /newton/prep") # destroy again
|
||||
|
||||
def test_14_remove_files(self):
|
||||
# Test BulkData's ability to remove when data is split into
|
||||
# multiple files. Should be a fairly comprehensive test of
|
||||
# remove functionality.
|
||||
# Also limit max_removals, to cover more functionality.
|
||||
server_stop()
|
||||
server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file
|
||||
server_start(max_removals = 4321,
|
||||
bulkdata_args = { "file_size" : 920, # 23 rows per file
|
||||
"files_per_dir" : 3 })
|
||||
|
||||
# Insert data. Just for fun, insert out of order
|
||||
@@ -974,8 +994,8 @@ class TestCmdline(object):
|
||||
self.match("[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -"
|
||||
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
|
||||
|
||||
self.ok("destroy /diff/1")
|
||||
self.ok("destroy /diff/2")
|
||||
self.ok("destroy -R /diff/1")
|
||||
self.ok("destroy -R /diff/2")
|
||||
|
||||
def test_16_rename(self):
|
||||
# Test renaming. Force file size smaller so we get more files
|
||||
@@ -1039,7 +1059,7 @@ class TestCmdline(object):
|
||||
self.fail("rename /foo/bar /xxx/yyy/zzz/www")
|
||||
self.contain("path is subdir of existing node")
|
||||
self.ok("rename /foo/bar /xxx/yyy/mmm")
|
||||
self.ok("destroy /xxx/yyy/zzz")
|
||||
self.ok("destroy -R /xxx/yyy/zzz")
|
||||
check_path("xxx", "yyy", "mmm")
|
||||
|
||||
# Extract it at the final path
|
||||
@@ -1047,7 +1067,7 @@ class TestCmdline(object):
|
||||
"--end '2012-03-23 10:04:01'")
|
||||
eq_(self.captured, extract_before)
|
||||
|
||||
self.ok("destroy /xxx/yyy/mmm")
|
||||
self.ok("destroy -R /xxx/yyy/mmm")
|
||||
|
||||
# Make sure temporary rename dirs weren't left around
|
||||
for (dirpath, dirnames, filenames) in os.walk(testdb):
|
||||
|
@@ -8,8 +8,11 @@ from nose.tools import *
|
||||
from nose.tools import assert_raises
|
||||
import itertools
|
||||
|
||||
from nilmdb.server.interval import (Interval, DBInterval,
|
||||
IntervalSet, IntervalError)
|
||||
from nilmdb.utils.interval import IntervalError
|
||||
from nilmdb.server.interval import Interval, DBInterval, IntervalSet
|
||||
|
||||
# so we can test them separately
|
||||
from nilmdb.utils.interval import Interval as UtilsInterval
|
||||
|
||||
from testutil.helpers import *
|
||||
import unittest
|
||||
@@ -47,6 +50,15 @@ def makeset(string):
|
||||
return iset
|
||||
|
||||
class TestInterval:
|
||||
def test_client_interval(self):
|
||||
# Run interval tests against the Python version of Interval.
|
||||
global Interval
|
||||
NilmdbInterval = Interval
|
||||
Interval = UtilsInterval
|
||||
self.test_interval()
|
||||
self.test_interval_intersect()
|
||||
Interval = NilmdbInterval
|
||||
|
||||
def test_interval(self):
|
||||
# Test Interval class
|
||||
os.environ['TZ'] = "America/New_York"
|
||||
@@ -222,7 +234,7 @@ class TestInterval:
|
||||
eq_(ab,c)
|
||||
|
||||
# a \ b == d
|
||||
eq_(IntervalSet(a.set_difference(b)), d)
|
||||
eq_(IntervalSet(nilmdb.utils.interval.set_difference(a,b)), d)
|
||||
|
||||
# Intersection with intervals
|
||||
do_test(makeset("[---|---)[)"),
|
||||
@@ -287,10 +299,11 @@ class TestInterval:
|
||||
b = makeset("[-) [--) [)")
|
||||
c = makeset("[----) ")
|
||||
d = makeset(" [-) ")
|
||||
eq_(a.set_difference(b, list(c)[0]), d)
|
||||
eq_(nilmdb.utils.interval.set_difference(
|
||||
a.intersection(list(c)[0]), b.intersection(list(c)[0])), d)
|
||||
|
||||
# Empty second set
|
||||
eq_(a.set_difference(IntervalSet()), a)
|
||||
eq_(nilmdb.utils.interval.set_difference(a, IntervalSet()), a)
|
||||
|
||||
class TestIntervalDB:
|
||||
def test_dbinterval(self):
|
||||
@@ -379,7 +392,7 @@ class TestIntervalSpeed:
|
||||
print
|
||||
yappi.start()
|
||||
speeds = {}
|
||||
limit = 10 # was 20
|
||||
limit = 22 # was 20
|
||||
for j in [ 2**x for x in range(5,limit) ]:
|
||||
start = time.time()
|
||||
iset = IntervalSet()
|
||||
|
@@ -1,266 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import nilmdb
|
||||
|
||||
from nilmdb.utils.printf import *
|
||||
|
||||
from nose.tools import *
|
||||
from nose.tools import assert_raises
|
||||
import distutils.version
|
||||
import itertools
|
||||
import os
|
||||
import sys
|
||||
import random
|
||||
import unittest
|
||||
|
||||
from testutil.helpers import *
|
||||
|
||||
from nilmdb.server.layout import *
|
||||
|
||||
class TestLayouts(object):
|
||||
# Some nilmdb.layout tests. Not complete, just fills in missing
|
||||
# coverage.
|
||||
def test_layouts(self):
|
||||
x = nilmdb.server.layout.get_named("float32_8")
|
||||
y = nilmdb.server.layout.get_named("float32_8")
|
||||
eq_(x.count, y.count)
|
||||
eq_(x.datatype, y.datatype)
|
||||
y = nilmdb.server.layout.get_named("float32_7")
|
||||
ne_(x.count, y.count)
|
||||
eq_(x.datatype, y.datatype)
|
||||
|
||||
def test_parsing(self):
|
||||
self.real_t_parsing("float32_8", "uint16_6", "uint16_9")
|
||||
self.real_t_parsing("float32_8", "uint16_6", "uint16_9")
|
||||
def real_t_parsing(self, name_prep, name_raw, name_rawnotch):
|
||||
# invalid layouts
|
||||
with assert_raises(TypeError) as e:
|
||||
parser = Parser("NoSuchLayout")
|
||||
with assert_raises(TypeError) as e:
|
||||
parser = Parser("float32")
|
||||
|
||||
# too little data
|
||||
parser = Parser(name_prep)
|
||||
data = ( "1234567890.000000 1.1 2.2 3.3 4.4 5.5\n" +
|
||||
"1234567890.100000 1.1 2.2 3.3 4.4 5.5\n")
|
||||
with assert_raises(ParserError) as e:
|
||||
parser.parse(data)
|
||||
in_("error", str(e.exception))
|
||||
|
||||
# too much data
|
||||
parser = Parser(name_prep)
|
||||
data = ( "1234567890.000000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8 9.9\n" +
|
||||
"1234567890.100000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8 9.9\n")
|
||||
with assert_raises(ParserError) as e:
|
||||
parser.parse(data)
|
||||
in_("error", str(e.exception))
|
||||
|
||||
# just right
|
||||
parser = Parser(name_prep)
|
||||
data = ( "1234567890.000000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8\n" +
|
||||
"1234567890.100000 1.1 2.2 3.3 4.4 5.5 6.6 7.7 8.8\n")
|
||||
parser.parse(data)
|
||||
eq_(parser.min_timestamp, 1234567890.0)
|
||||
eq_(parser.max_timestamp, 1234567890.1)
|
||||
eq_(parser.data, [[1234567890.0,1.1,2.2,3.3,4.4,5.5,6.6,7.7,8.8],
|
||||
[1234567890.1,1.1,2.2,3.3,4.4,5.5,6.6,7.7,8.8]])
|
||||
|
||||
# try uint16_6 too, with clamping
|
||||
parser = Parser(name_raw)
|
||||
data = ( "1234567890.000000 1 2 3 4 5 6\n" +
|
||||
"1234567890.100000 1 2 3 4 5 6\n" )
|
||||
parser.parse(data)
|
||||
eq_(parser.data, [[1234567890.0,1,2,3,4,5,6],
|
||||
[1234567890.1,1,2,3,4,5,6]])
|
||||
|
||||
# pass an instantiated class
|
||||
parser = Parser(get_named(name_rawnotch))
|
||||
data = ( "1234567890.000000 1 2 3 4 5 6 7 8 9\n" +
|
||||
"1234567890.100000 1 2 3 4 5 6 7 8 9\n" )
|
||||
parser.parse(data)
|
||||
|
||||
# non-monotonic
|
||||
parser = Parser(name_raw)
|
||||
data = ( "1234567890.100000 1 2 3 4 5 6\n" +
|
||||
"1234567890.099999 1 2 3 4 5 6\n" )
|
||||
with assert_raises(ParserError) as e:
|
||||
parser.parse(data)
|
||||
in_("not monotonically increasing", str(e.exception))
|
||||
|
||||
parser = Parser(name_raw)
|
||||
data = ( "1234567890.100000 1 2 3 4 5 6\n" +
|
||||
"1234567890.100000 1 2 3 4 5 6\n" )
|
||||
with assert_raises(ParserError) as e:
|
||||
parser.parse(data)
|
||||
in_("not monotonically increasing", str(e.exception))
|
||||
|
||||
parser = Parser(name_raw)
|
||||
data = ( "1234567890.100000 1 2 3 4 5 6\n" +
|
||||
"1234567890.100001 1 2 3 4 5 6\n" )
|
||||
parser.parse(data)
|
||||
|
||||
# uint16_6 with values out of bounds
|
||||
parser = Parser(name_raw)
|
||||
data = ( "1234567890.000000 1 2 3 4 500000 6\n" +
|
||||
"1234567890.100000 1 2 3 4 5 6\n" )
|
||||
with assert_raises(ParserError) as e:
|
||||
parser.parse(data)
|
||||
in_("value out of range", str(e.exception))
|
||||
|
||||
# Empty data should work but is useless
|
||||
parser = Parser(name_raw)
|
||||
data = ""
|
||||
parser.parse(data)
|
||||
assert(parser.min_timestamp is None)
|
||||
assert(parser.max_timestamp is None)
|
||||
|
||||
def test_formatting(self):
|
||||
self.real_t_formatting("float32_8", "uint16_6", "uint16_9")
|
||||
self.real_t_formatting("float32_8", "uint16_6", "uint16_9")
|
||||
def real_t_formatting(self, name_prep, name_raw, name_rawnotch):
|
||||
# invalid layout
|
||||
with assert_raises(TypeError) as e:
|
||||
formatter = Formatter("NoSuchLayout")
|
||||
|
||||
# too little data
|
||||
formatter = Formatter(name_prep)
|
||||
data = [ [ 1234567890.000000, 1.1, 2.2, 3.3, 4.4, 5.5 ],
|
||||
[ 1234567890.100000, 1.1, 2.2, 3.3, 4.4, 5.5 ] ]
|
||||
with assert_raises(FormatterError) as e:
|
||||
formatter.format(data)
|
||||
in_("error", str(e.exception))
|
||||
|
||||
# too much data
|
||||
formatter = Formatter(name_prep)
|
||||
data = [ [ 1234567890.000000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ],
|
||||
[ 1234567890.100000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ] ]
|
||||
with assert_raises(FormatterError) as e:
|
||||
formatter.format(data)
|
||||
in_("error", str(e.exception))
|
||||
|
||||
# just right
|
||||
formatter = Formatter(name_prep)
|
||||
data = [ [ 1234567890.000000, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8 ],
|
||||
[ 1234567890.100000, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8 ] ]
|
||||
text = formatter.format(data)
|
||||
eq_(text,
|
||||
"1234567890.000000 1.100000e+00 2.200000e+00 3.300000e+00 "
|
||||
"4.400000e+00 5.500000e+00 6.600000e+00 7.700000e+00 "
|
||||
"8.800000e+00\n" +
|
||||
"1234567890.100000 1.100000e+00 2.200000e+00 3.300000e+00 "
|
||||
"4.400000e+00 5.500000e+00 6.600000e+00 7.700000e+00 "
|
||||
"8.800000e+00\n")
|
||||
|
||||
# try uint16_6 too
|
||||
formatter = Formatter(name_raw)
|
||||
data = [ [ 1234567890.000000, 1, 2, 3, 4, 5, 6 ],
|
||||
[ 1234567890.100000, 1, 2, 3, 4, 5, 6 ] ]
|
||||
text = formatter.format(data)
|
||||
eq_(text,
|
||||
"1234567890.000000 1 2 3 4 5 6\n" +
|
||||
"1234567890.100000 1 2 3 4 5 6\n")
|
||||
|
||||
# pass an instantiated class
|
||||
formatter = Formatter(get_named(name_rawnotch))
|
||||
data = [ [ 1234567890.000000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ],
|
||||
[ 1234567890.100000, 1, 2, 3, 4, 5, 6, 7, 8, 9 ] ]
|
||||
text = formatter.format(data)
|
||||
eq_(text,
|
||||
"1234567890.000000 1 2 3 4 5 6 7 8 9\n" +
|
||||
"1234567890.100000 1 2 3 4 5 6 7 8 9\n")
|
||||
|
||||
# Empty data should work but is useless
|
||||
formatter = Formatter(name_raw)
|
||||
data = []
|
||||
text = formatter.format(data)
|
||||
eq_(text, "")
|
||||
|
||||
def test_roundtrip(self):
|
||||
self.real_t_roundtrip("float32_8", "uint16_6", "uint16_9")
|
||||
self.real_t_roundtrip("float32_8", "uint16_6", "uint16_9")
|
||||
def real_t_roundtrip(self, name_prep, name_raw, name_rawnotch):
|
||||
# Verify that textual data passed into the Parser, and then
|
||||
# back through the Formatter, then back into the Parser,
|
||||
# gives identical parsed representations
|
||||
random.seed(12345)
|
||||
|
||||
def do_roundtrip(layout, datagen):
|
||||
for i in range(100):
|
||||
rows = random.randint(1,100)
|
||||
data = ""
|
||||
ts = 1234567890
|
||||
for r in range(rows):
|
||||
ts += random.uniform(0,1)
|
||||
row = sprintf("%f", ts) + " "
|
||||
row += " ".join(datagen())
|
||||
row += "\n"
|
||||
data += row
|
||||
parser1 = Parser(layout)
|
||||
formatter = Formatter(layout)
|
||||
parser2 = Parser(layout)
|
||||
parser1.parse(data)
|
||||
parser2.parse(formatter.format(parser1.data))
|
||||
eq_(parser1.data, parser2.data)
|
||||
|
||||
def datagen():
|
||||
return [ sprintf("%.6e", random.uniform(-1000,1000))
|
||||
for x in range(8) ]
|
||||
do_roundtrip(name_prep, datagen)
|
||||
|
||||
def datagen():
|
||||
return [ sprintf("%d", random.randint(0,65535))
|
||||
for x in range(6) ]
|
||||
do_roundtrip(name_raw, datagen)
|
||||
|
||||
def datagen():
|
||||
return [ sprintf("%d", random.randint(0,65535))
|
||||
for x in range(9) ]
|
||||
do_roundtrip(name_rawnotch, datagen)
|
||||
|
||||
class TestLayoutSpeed:
|
||||
@unittest.skip("this is slow")
|
||||
def test_layout_speed(self):
|
||||
import time
|
||||
|
||||
random.seed(54321)
|
||||
|
||||
def do_speedtest(layout, datagen, rows = 5000, times = 100):
|
||||
# Build data once
|
||||
data = ""
|
||||
ts = 1234567890
|
||||
for r in range(rows):
|
||||
ts += random.uniform(0,1)
|
||||
row = sprintf("%f", ts) + " "
|
||||
row += " ".join(datagen())
|
||||
row += "\n"
|
||||
data += row
|
||||
|
||||
# Do lots of roundtrips
|
||||
start = time.time()
|
||||
for i in range(times):
|
||||
parser = Parser(layout)
|
||||
formatter = Formatter(layout)
|
||||
parser.parse(data)
|
||||
formatter.format(parser.data)
|
||||
elapsed = time.time() - start
|
||||
printf("roundtrip %s: %d ms, %.1f μs/row, %d rows/sec\n",
|
||||
layout,
|
||||
elapsed * 1e3,
|
||||
(elapsed * 1e6) / (rows * times),
|
||||
(rows * times) / elapsed)
|
||||
|
||||
print ""
|
||||
def datagen():
|
||||
return [ sprintf("%.6e", random.uniform(-1000,1000))
|
||||
for x in range(10) ]
|
||||
do_speedtest("float32_10", datagen)
|
||||
|
||||
def datagen():
|
||||
return [ sprintf("%d", random.randint(0,65535))
|
||||
for x in range(10) ]
|
||||
do_speedtest("uint16_10", datagen)
|
||||
|
||||
def datagen():
|
||||
return [ sprintf("%d", random.randint(0,65535))
|
||||
for x in range(6) ]
|
||||
do_speedtest("uint16_6", datagen)
|
Reference in New Issue
Block a user