Compare commits
10 Commits
nilmdb-1.3
...
nilmdb-1.3
Author | SHA1 | Date | |
---|---|---|---|
09bc7eb48c | |||
b77f07a4cd | |||
59f0076306 | |||
83bc5bc775 | |||
6b1dfec828 | |||
d827f41fa5 | |||
7eca587fdf | |||
a351bc1b10 | |||
1d61d61a81 | |||
755255030b |
2
Makefile
2
Makefile
@@ -43,4 +43,4 @@ clean::
|
||||
gitclean::
|
||||
git clean -dXf
|
||||
|
||||
.PHONY: all version build dist sdist install docs lint test clean
|
||||
.PHONY: all version build dist sdist install docs lint test clean gitclean
|
||||
|
@@ -147,13 +147,18 @@ class Client(object):
|
||||
ctx.insert(chunk)
|
||||
return ctx.last_response
|
||||
|
||||
def stream_intervals(self, path, start = None, end = None):
|
||||
def stream_intervals(self, path, start = None, end = None, diffpath = None):
|
||||
"""
|
||||
Return a generator that yields each stream interval.
|
||||
|
||||
If diffpath is not None, yields only interval ranges that are
|
||||
present in 'path' but not in 'diffpath'.
|
||||
"""
|
||||
params = {
|
||||
"path": path
|
||||
}
|
||||
if diffpath is not None:
|
||||
params["diffpath"] = diffpath
|
||||
if start is not None:
|
||||
params["start"] = float_time_to_string(start)
|
||||
if end is not None:
|
||||
@@ -225,6 +230,7 @@ class StreamInserter(object):
|
||||
# See design.md for a discussion of how much data to send. This
|
||||
# is a soft limit -- we might send up to twice as much or so
|
||||
_max_data = 2 * 1024 * 1024
|
||||
_max_data_after_send = 64 * 1024
|
||||
|
||||
# Delta to add to the final timestamp, if "end" wasn't given
|
||||
_end_epsilon = 1e-6
|
||||
@@ -270,6 +276,10 @@ class StreamInserter(object):
|
||||
# Send the block once we have enough data
|
||||
if self._block_len >= maxdata:
|
||||
self._send_block(final = False)
|
||||
if self._block_len >= self._max_data_after_send: # pragma: no cover
|
||||
raise ValueError("too much data left over after trying"
|
||||
" to send intermediate block; is it"
|
||||
" missing newlines or malformed?")
|
||||
|
||||
def update_start(self, start):
|
||||
"""Update the start time for the next contiguous interval.
|
||||
@@ -366,7 +376,7 @@ class StreamInserter(object):
|
||||
(spos, epos) = self._get_last_noncomment(block)
|
||||
end_ts = extract_timestamp(block[spos:epos])
|
||||
except (ValueError, IndexError):
|
||||
# If we found no timestamp, give up; we'll send this
|
||||
# If we found no timestamp, give up; we could send this
|
||||
# block later when we have more data.
|
||||
return
|
||||
if spos == 0:
|
||||
@@ -398,3 +408,5 @@ class StreamInserter(object):
|
||||
"start": float_time_to_string(start_ts),
|
||||
"end": float_time_to_string(end_ts) }
|
||||
self.last_response = self._http.put("stream/insert", block, params)
|
||||
|
||||
return
|
||||
|
@@ -14,7 +14,8 @@ from argparse import ArgumentDefaultsHelpFormatter as def_form
|
||||
# Valid subcommands. Defined in separate files just to break
|
||||
# things up -- they're still called with Cmdline as self.
|
||||
subcommands = [ "help", "info", "create", "list", "metadata",
|
||||
"insert", "extract", "remove", "destroy" ]
|
||||
"insert", "extract", "remove", "destroy",
|
||||
"intervals" ]
|
||||
|
||||
# Import the subcommand modules
|
||||
subcmd_mods = {}
|
||||
|
62
nilmdb/cmdline/intervals.py
Normal file
62
nilmdb/cmdline/intervals.py
Normal file
@@ -0,0 +1,62 @@
|
||||
from nilmdb.utils.printf import *
|
||||
import nilmdb.utils.time
|
||||
|
||||
import fnmatch
|
||||
import argparse
|
||||
from argparse import ArgumentDefaultsHelpFormatter as def_form
|
||||
|
||||
def setup(self, sub):
|
||||
cmd = sub.add_parser("intervals", help="List intervals",
|
||||
formatter_class = def_form,
|
||||
description="""
|
||||
List intervals in a stream, similar to
|
||||
'list --detail path'.
|
||||
|
||||
If '--diff diffpath' is provided, only
|
||||
interval ranges that are present in 'path'
|
||||
and not present in 'diffpath' are printed.
|
||||
""")
|
||||
cmd.set_defaults(verify = cmd_intervals_verify,
|
||||
handler = cmd_intervals)
|
||||
|
||||
group = cmd.add_argument_group("Stream selection")
|
||||
group.add_argument("path", metavar="PATH",
|
||||
help="List intervals for this path")
|
||||
group.add_argument("-d", "--diff", metavar="PATH",
|
||||
help="Subtract intervals from this path")
|
||||
|
||||
group = cmd.add_argument_group("Interval details")
|
||||
group.add_argument("-s", "--start",
|
||||
metavar="TIME", type=self.arg_time,
|
||||
help="Starting timestamp for intervals "
|
||||
"(free-form, inclusive)")
|
||||
group.add_argument("-e", "--end",
|
||||
metavar="TIME", type=self.arg_time,
|
||||
help="Ending timestamp for intervals "
|
||||
"(free-form, noninclusive)")
|
||||
|
||||
group = cmd.add_argument_group("Misc options")
|
||||
group.add_argument("-T", "--timestamp-raw", action="store_true",
|
||||
help="Show raw timestamps when printing times")
|
||||
|
||||
return cmd
|
||||
|
||||
def cmd_intervals_verify(self):
|
||||
if self.args.start is not None and self.args.end is not None:
|
||||
if self.args.start >= self.args.end:
|
||||
self.parser.error("start must precede end")
|
||||
|
||||
def cmd_intervals(self):
|
||||
"""List intervals in a stream"""
|
||||
if self.args.timestamp_raw:
|
||||
time_string = nilmdb.utils.time.float_time_to_string
|
||||
else:
|
||||
time_string = nilmdb.utils.time.format_time
|
||||
|
||||
try:
|
||||
for (start, end) in self.client.stream_intervals(
|
||||
self.args.path, self.args.start, self.args.end, self.args.diff):
|
||||
printf("[ %s -> %s ]\n", time_string(start), time_string(end))
|
||||
except nilmdb.client.ClientError as e:
|
||||
self.die("error listing intervals: %s", str(e))
|
||||
|
@@ -367,22 +367,27 @@ class Table(object):
|
||||
) = f.append_string(count, data, data_offset, linenum,
|
||||
start, end, last_timestamp)
|
||||
except rocket.ParseError as e:
|
||||
(linenum, errtype, obj) = e.args
|
||||
(linenum, colnum, errtype, obj) = e.args
|
||||
where = "line %d, column %d: " % (linenum, colnum)
|
||||
# Extract out the error line, add column marker
|
||||
try:
|
||||
bad = data.splitlines()[linenum-1]
|
||||
badptr = ' ' * (colnum - 1) + '^'
|
||||
except IndexError: # pragma: no cover
|
||||
bad = ""
|
||||
if errtype == rocket.ERR_NON_MONOTONIC:
|
||||
err = sprintf("line %d: timestamp is not monotonically "
|
||||
"increasing", linenum)
|
||||
err = "timestamp is not monotonically increasing"
|
||||
elif errtype == rocket.ERR_OUT_OF_INTERVAL:
|
||||
if obj < start:
|
||||
err = sprintf("line %d: Data timestamp %s < "
|
||||
"start time %s", linenum,
|
||||
err = sprintf("Data timestamp %s < start time %s",
|
||||
ftts(obj), ftts(start))
|
||||
else:
|
||||
err = sprintf("line %d: Data timestamp %s >= "
|
||||
"end time %s", linenum,
|
||||
err = sprintf("Data timestamp %s >= end time %s",
|
||||
ftts(obj), ftts(end))
|
||||
else:
|
||||
err = sprintf("line %d: %s", linenum, str(obj))
|
||||
raise ValueError("error parsing input data: " + err)
|
||||
err = str(obj)
|
||||
raise ValueError("error parsing input data: " +
|
||||
where + err + "\n" + bad + "\n" + badptr)
|
||||
tot_rows += added_rows
|
||||
except Exception:
|
||||
# Some failure, so try to roll things back by truncating or
|
||||
|
@@ -20,6 +20,8 @@ Intervals are half-open, ie. they include data points with timestamps
|
||||
# and ends directly in the tree, like bxinterval did.
|
||||
|
||||
from ..utils.time import float_time_to_string as ftts
|
||||
from ..utils.iterator import imerge
|
||||
import itertools
|
||||
|
||||
cimport rbtree
|
||||
cdef extern from "stdint.h":
|
||||
@@ -264,21 +266,15 @@ cdef class IntervalSet:
|
||||
|
||||
def __and__(self, other not None):
|
||||
"""
|
||||
Compute a new IntervalSet from the intersection of two others
|
||||
Compute a new IntervalSet from the intersection of this
|
||||
IntervalSet with one other interval.
|
||||
|
||||
Output intervals are built as subsets of the intervals in the
|
||||
first argument (self).
|
||||
"""
|
||||
out = IntervalSet()
|
||||
|
||||
if not isinstance(other, IntervalSet):
|
||||
for i in self.intersection(other):
|
||||
out.tree.insert(rbtree.RBNode(i.start, i.end, i))
|
||||
else:
|
||||
for x in other:
|
||||
for i in self.intersection(x):
|
||||
out.tree.insert(rbtree.RBNode(i.start, i.end, i))
|
||||
|
||||
return out
|
||||
|
||||
def intersection(self, Interval interval not None, orig = False):
|
||||
@@ -313,6 +309,62 @@ cdef class IntervalSet:
|
||||
else:
|
||||
yield subset
|
||||
|
||||
def set_difference(self, IntervalSet other not None,
|
||||
Interval bounds = None):
|
||||
"""
|
||||
Compute the difference (self \\ other) between this
|
||||
IntervalSet and the given IntervalSet; i.e., the ranges
|
||||
that are present in 'self' but not 'other'.
|
||||
|
||||
If 'bounds' is not None, results are limited to the range
|
||||
specified by the interval 'bounds'.
|
||||
|
||||
Returns a generator that yields each interval in turn.
|
||||
Output intervals are built as subsets of the intervals in the
|
||||
first argument (self).
|
||||
"""
|
||||
# Iterate through all starts and ends in sorted order. Add a
|
||||
# tag to the iterator so that we can figure out which one they
|
||||
# were, after sorting.
|
||||
def decorate(it, key_start, key_end):
|
||||
for i in it:
|
||||
yield i.start, key_start, i
|
||||
yield i.end, key_end, i
|
||||
if bounds is None:
|
||||
bounds = Interval(-1e12, 1e12)
|
||||
self_iter = decorate(self.intersection(bounds), 0, 2)
|
||||
other_iter = decorate(other.intersection(bounds), 1, 3)
|
||||
|
||||
# Now iterate over the timestamps of each start and end.
|
||||
# At each point, evaluate which type of end it is, to determine
|
||||
# how to build up the output intervals.
|
||||
self_interval = None
|
||||
other_interval = None
|
||||
out_start = None
|
||||
for (ts, k, i) in imerge(self_iter, other_iter):
|
||||
if k == 0:
|
||||
# start self interval
|
||||
self_interval = i
|
||||
if other_interval is None:
|
||||
out_start = ts
|
||||
elif k == 1:
|
||||
# start other interval
|
||||
other_interval = i
|
||||
if out_start is not None and out_start != ts:
|
||||
yield self_interval.subset(out_start, ts)
|
||||
out_start = None
|
||||
elif k == 2:
|
||||
# end self interval
|
||||
if out_start is not None and out_start != ts:
|
||||
yield self_interval.subset(out_start, ts)
|
||||
out_start = None
|
||||
self_interval = None
|
||||
elif k == 3:
|
||||
# end other interval
|
||||
other_interval = None
|
||||
if self_interval:
|
||||
out_start = ts
|
||||
|
||||
cpdef intersects(self, Interval other):
|
||||
"""Return True if this IntervalSet intersects another interval"""
|
||||
for n in self.tree.intersect(other.start, other.end):
|
||||
|
@@ -290,8 +290,8 @@ class NilmDB(object):
|
||||
query = "SELECT streams.path, streams.layout"
|
||||
if extended:
|
||||
query += ", min(ranges.start_time), max(ranges.end_time) "
|
||||
query += ", sum(ranges.end_pos - ranges.start_pos) "
|
||||
query += ", sum(ranges.end_time - ranges.start_time) "
|
||||
query += ", coalesce(sum(ranges.end_pos - ranges.start_pos), 0) "
|
||||
query += ", coalesce(sum(ranges.end_time - ranges.start_time), 0) "
|
||||
query += " FROM streams"
|
||||
if extended:
|
||||
query += " LEFT JOIN ranges ON streams.id = ranges.stream_id"
|
||||
@@ -306,8 +306,13 @@ class NilmDB(object):
|
||||
result = self.con.execute(query, params).fetchall()
|
||||
return [ list(x) for x in result ]
|
||||
|
||||
def stream_intervals(self, path, start = None, end = None):
|
||||
def stream_intervals(self, path, start = None, end = None, diffpath = None):
|
||||
"""
|
||||
List all intervals in 'path' between 'start' and 'end'. If
|
||||
'diffpath' is not none, list instead the set-difference
|
||||
between the intervals in the two streams; i.e. all interval
|
||||
ranges that are present in 'path' but not 'path2'.
|
||||
|
||||
Returns (intervals, restart) tuple.
|
||||
|
||||
intervals is a list of [start,end] timestamps of all intervals
|
||||
@@ -321,10 +326,17 @@ class NilmDB(object):
|
||||
"""
|
||||
stream_id = self._stream_id(path)
|
||||
intervals = self._get_intervals(stream_id)
|
||||
if diffpath:
|
||||
diffstream_id = self._stream_id(diffpath)
|
||||
diffintervals = self._get_intervals(diffstream_id)
|
||||
(start, end) = self._check_user_times(start, end)
|
||||
requested = Interval(start, end)
|
||||
result = []
|
||||
for n, i in enumerate(intervals.intersection(requested)):
|
||||
if diffpath:
|
||||
getter = intervals.set_difference(diffintervals, requested)
|
||||
else:
|
||||
getter = intervals.intersection(requested)
|
||||
for n, i in enumerate(getter):
|
||||
if n >= self.max_results:
|
||||
restart = i.start
|
||||
break
|
||||
|
@@ -117,12 +117,12 @@ class Rocket(object):
|
||||
try:
|
||||
(ts, row) = self.layoutparser.parse(line)
|
||||
except ValueError as e:
|
||||
raise ParseError(linenum, ERR_UNKNOWN, e)
|
||||
raise ParseError(linenum, 0, ERR_UNKNOWN, e)
|
||||
if ts <= last_timestamp:
|
||||
raise ParseError(linenum, ERR_NON_MONOTONIC, ts)
|
||||
raise ParseError(linenum, 0, ERR_NON_MONOTONIC, ts)
|
||||
last_timestamp = ts
|
||||
if ts < start or ts >= end:
|
||||
raise ParseError(linenum, ERR_OUT_OF_INTERVAL, ts)
|
||||
raise ParseError(linenum, 0, ERR_OUT_OF_INTERVAL, ts)
|
||||
self.append_iter(1, [row])
|
||||
written += 1
|
||||
return (written, indata.tell(), last_timestamp, linenum)
|
||||
|
@@ -2,6 +2,7 @@
|
||||
#include <structmember.h>
|
||||
#include <endian.h>
|
||||
|
||||
#include <ctype.h>
|
||||
#include <stdint.h>
|
||||
|
||||
/* Values missing from stdint.h */
|
||||
@@ -18,7 +19,7 @@
|
||||
|
||||
/* Somewhat arbitrary, just so we can use fixed sizes for strings
|
||||
etc. */
|
||||
static const int MAX_LAYOUT_COUNT = 64;
|
||||
static const int MAX_LAYOUT_COUNT = 128;
|
||||
|
||||
/* Error object and constants */
|
||||
static PyObject *ParseError;
|
||||
@@ -35,20 +36,20 @@ static void add_parseerror_codes(PyObject *module)
|
||||
}
|
||||
|
||||
/* Helpers to raise ParseErrors. Use "return raise_str(...)" etc. */
|
||||
static PyObject *raise_str(int linenum, int code, const char *string)
|
||||
static PyObject *raise_str(int line, int col, int code, const char *string)
|
||||
{
|
||||
PyObject *o;
|
||||
o = Py_BuildValue("(iis)", linenum, code, string);
|
||||
o = Py_BuildValue("(iiis)", line, col, code, string);
|
||||
if (o != NULL) {
|
||||
PyErr_SetObject(ParseError, o);
|
||||
Py_DECREF(o);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
static PyObject *raise_num(int linenum, int code, double num)
|
||||
static PyObject *raise_num(int line, int col, int code, double num)
|
||||
{
|
||||
PyObject *o;
|
||||
o = Py_BuildValue("(iid)", linenum, code, num);
|
||||
o = Py_BuildValue("(iiid)", line, col, code, num);
|
||||
if (o != NULL) {
|
||||
PyErr_SetObject(ParseError, o);
|
||||
Py_DECREF(o);
|
||||
@@ -352,6 +353,7 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
|
||||
int count;
|
||||
const char *data;
|
||||
int offset;
|
||||
const char *linestart;
|
||||
int linenum;
|
||||
double start;
|
||||
double end;
|
||||
@@ -374,14 +376,22 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
|
||||
&start, &end, &last_timestamp))
|
||||
return NULL;
|
||||
|
||||
/* Skip spaces, but don't skip over a newline. */
|
||||
#define SKIP_BLANK(buf) do { \
|
||||
while (isspace(*buf)) { \
|
||||
if (*buf == '\n') \
|
||||
break; \
|
||||
buf++; \
|
||||
} } while(0)
|
||||
|
||||
const char *buf = &data[offset];
|
||||
while (written < count && *buf)
|
||||
{
|
||||
linestart = buf;
|
||||
linenum++;
|
||||
|
||||
/* Skip leading whitespace and commented lines */
|
||||
while (*buf == ' ' || *buf == '\t')
|
||||
buf++;
|
||||
SKIP_BLANK(buf);
|
||||
if (*buf == '#') {
|
||||
while (*buf && *buf != '\n')
|
||||
buf++;
|
||||
@@ -393,12 +403,15 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
|
||||
/* Extract timestamp */
|
||||
t64.d = strtod(buf, &endptr);
|
||||
if (endptr == buf)
|
||||
return raise_str(linenum, ERR_OTHER, "bad timestamp");
|
||||
return raise_str(linenum, buf - linestart + 1,
|
||||
ERR_OTHER, "bad timestamp");
|
||||
if (t64.d <= last_timestamp)
|
||||
return raise_num(linenum, ERR_NON_MONOTONIC, t64.d);
|
||||
return raise_num(linenum, buf - linestart + 1,
|
||||
ERR_NON_MONOTONIC, t64.d);
|
||||
last_timestamp = t64.d;
|
||||
if (t64.d < start || t64.d >= end)
|
||||
return raise_num(linenum, ERR_OUT_OF_INTERVAL, t64.d);
|
||||
return raise_num(linenum, buf - linestart + 1,
|
||||
ERR_OUT_OF_INTERVAL, t64.d);
|
||||
t64.u = le64toh(t64.u);
|
||||
if (fwrite(&t64.u, 8, 1, self->file) != 1)
|
||||
goto err;
|
||||
@@ -410,23 +423,31 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
|
||||
case LAYOUT_TYPE_##type: \
|
||||
/* parse and write in a loop */ \
|
||||
for (i = 0; i < self->layout_count; i++) { \
|
||||
parsetype = parsefunc(buf, &endptr); \
|
||||
if (endptr == buf) \
|
||||
/* skip non-newlines */ \
|
||||
SKIP_BLANK(buf); \
|
||||
if (*buf == '\n') \
|
||||
goto wrong_number_of_values; \
|
||||
/* parse number */ \
|
||||
parsetype = parsefunc(buf, &endptr); \
|
||||
if (*endptr && !isspace(*endptr)) \
|
||||
goto cant_parse_value; \
|
||||
/* check limits */ \
|
||||
if (type##_MIN != type##_MAX && \
|
||||
(parsetype < type##_MIN || \
|
||||
parsetype > type##_MAX)) \
|
||||
goto value_out_of_range; \
|
||||
/* convert to disk representation */ \
|
||||
realtype = parsetype; \
|
||||
disktype = letoh(disktype); \
|
||||
/* write it */ \
|
||||
if (fwrite(&disktype, bytes, \
|
||||
1, self->file) != 1) \
|
||||
goto err; \
|
||||
/* advance buf */ \
|
||||
buf = endptr; \
|
||||
} \
|
||||
/* Skip trailing whitespace and comments */ \
|
||||
while (*buf == ' ' || *buf == '\t') \
|
||||
buf++; \
|
||||
SKIP_BLANK(buf); \
|
||||
if (*buf == '#') \
|
||||
while (*buf && *buf != '\n') \
|
||||
buf++; \
|
||||
@@ -466,12 +487,18 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
|
||||
err:
|
||||
PyErr_SetFromErrno(PyExc_OSError);
|
||||
return NULL;
|
||||
cant_parse_value:
|
||||
return raise_str(linenum, buf - linestart + 1,
|
||||
ERR_OTHER, "can't parse value");
|
||||
wrong_number_of_values:
|
||||
return raise_str(linenum, ERR_OTHER, "wrong number of values");
|
||||
return raise_str(linenum, buf - linestart + 1,
|
||||
ERR_OTHER, "wrong number of values");
|
||||
value_out_of_range:
|
||||
return raise_str(linenum, ERR_OTHER, "value out of range");
|
||||
return raise_str(linenum, buf - linestart + 1,
|
||||
ERR_OTHER, "value out of range");
|
||||
extra_data_on_line:
|
||||
return raise_str(linenum, ERR_OTHER, "extra data on line");
|
||||
return raise_str(linenum, buf - linestart + 1,
|
||||
ERR_OTHER, "extra data on line");
|
||||
}
|
||||
|
||||
/****
|
||||
|
@@ -332,16 +332,22 @@ class Stream(NilmApp):
|
||||
|
||||
# /stream/intervals?path=/newton/prep
|
||||
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||
# /stream/intervals?path=/newton/prep&diffpath=/newton/prep2
|
||||
@cherrypy.expose
|
||||
@chunked_response
|
||||
@response_type("application/x-json-stream")
|
||||
def intervals(self, path, start = None, end = None):
|
||||
def intervals(self, path, start = None, end = None, diffpath = None):
|
||||
"""
|
||||
Get intervals from backend database. Streams the resulting
|
||||
intervals as JSON strings separated by CR LF pairs. This may
|
||||
make multiple requests to the nilmdb backend to avoid causing
|
||||
it to block for too long.
|
||||
|
||||
Returns intervals between 'start' and 'end' belonging to
|
||||
'path'. If 'diff' is provided, the set-difference between
|
||||
intervals in 'path' and intervals in 'diffpath' are
|
||||
returned instead.
|
||||
|
||||
Note that the response type is the non-standard
|
||||
'application/x-json-stream' for lack of a better option.
|
||||
"""
|
||||
@@ -355,15 +361,18 @@ class Stream(NilmApp):
|
||||
raise cherrypy.HTTPError("400 Bad Request",
|
||||
"start must precede end")
|
||||
|
||||
streams = self.db.stream_list(path = path)
|
||||
if len(streams) != 1:
|
||||
raise cherrypy.HTTPError("404 Not Found", "No such stream")
|
||||
if len(self.db.stream_list(path = path)) != 1:
|
||||
raise cherrypy.HTTPError("404", "No such stream: " + path)
|
||||
|
||||
if diffpath and len(self.db.stream_list(path = diffpath)) != 1:
|
||||
raise cherrypy.HTTPError("404", "No such stream: " + diffpath)
|
||||
|
||||
@workaround_cp_bug_1200
|
||||
def content(start, end):
|
||||
# Note: disable chunked responses to see tracebacks from here.
|
||||
while True:
|
||||
(ints, restart) = self.db.stream_intervals(path, start, end)
|
||||
(ints, restart) = self.db.stream_intervals(path, start, end,
|
||||
diffpath)
|
||||
response = ''.join([ json.dumps(i) + "\r\n" for i in ints ])
|
||||
yield response
|
||||
if restart == 0:
|
||||
|
@@ -10,3 +10,4 @@ from nilmdb.utils import atomic
|
||||
import nilmdb.utils.threadsafety
|
||||
import nilmdb.utils.fallocate
|
||||
import nilmdb.utils.time
|
||||
import nilmdb.utils.iterator
|
||||
|
36
nilmdb/utils/iterator.py
Normal file
36
nilmdb/utils/iterator.py
Normal file
@@ -0,0 +1,36 @@
|
||||
# Misc iterator tools
|
||||
|
||||
# Iterator merging, based on http://code.activestate.com/recipes/491285/
|
||||
import heapq
|
||||
def imerge(*iterables):
|
||||
'''Merge multiple sorted inputs into a single sorted output.
|
||||
|
||||
Equivalent to: sorted(itertools.chain(*iterables))
|
||||
|
||||
>>> list(imerge([1,3,5,7], [0,2,4,8], [5,10,15,20], [], [25]))
|
||||
[0, 1, 2, 3, 4, 5, 5, 7, 8, 10, 15, 20, 25]
|
||||
|
||||
'''
|
||||
heappop, siftup, _Stop = heapq.heappop, heapq._siftup, StopIteration
|
||||
|
||||
h = []
|
||||
h_append = h.append
|
||||
for it in map(iter, iterables):
|
||||
try:
|
||||
next = it.next
|
||||
h_append([next(), next])
|
||||
except _Stop:
|
||||
pass
|
||||
heapq.heapify(h)
|
||||
|
||||
while 1:
|
||||
try:
|
||||
while 1:
|
||||
v, next = s = h[0] # raises IndexError when h is empty
|
||||
yield v
|
||||
s[0] = next() # raises StopIteration when exhausted
|
||||
siftup(h, 0) # restore heap condition
|
||||
except _Stop:
|
||||
heappop(h) # remove empty iterator
|
||||
except IndexError:
|
||||
return
|
@@ -394,7 +394,7 @@ class TestCmdline(object):
|
||||
self.fail("insert -s 20120323T1004 -e 20120323T1006 /newton/prep",
|
||||
input)
|
||||
self.contain("error parsing input data")
|
||||
self.contain("line 7:")
|
||||
self.contain("line 7")
|
||||
self.contain("timestamp is not monotonically increasing")
|
||||
|
||||
# insert pre-timestamped data, from stdin
|
||||
@@ -436,6 +436,15 @@ class TestCmdline(object):
|
||||
self.fail("insert -t -r 120 -f /newton/raw "
|
||||
"tests/data/prep-20120323T1004")
|
||||
self.contain("error parsing input data")
|
||||
self.contain("can't parse value")
|
||||
|
||||
# too few rows per line
|
||||
self.ok("create /insert/test float32_20")
|
||||
self.fail("insert -t -r 120 -f /insert/test "
|
||||
"tests/data/prep-20120323T1004")
|
||||
self.contain("error parsing input data")
|
||||
self.contain("wrong number of values")
|
||||
self.ok("destroy /insert/test")
|
||||
|
||||
# empty data does nothing
|
||||
self.ok("insert -t -r 120 --start '03/23/2012 06:05:00' /newton/prep "
|
||||
@@ -893,3 +902,56 @@ class TestCmdline(object):
|
||||
# See if we can extract it all
|
||||
self.ok("extract /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||
lines_(self.captured, 15600)
|
||||
|
||||
def test_15_intervals_diff(self):
|
||||
# Test "intervals" and "intervals --diff" command.
|
||||
os.environ['TZ'] = "UTC"
|
||||
|
||||
self.ok("create /diff/1 uint8_1")
|
||||
self.match("")
|
||||
self.ok("intervals /diff/1")
|
||||
self.match("")
|
||||
self.ok("intervals /diff/1 --diff /diff/1")
|
||||
self.match("")
|
||||
self.ok("intervals --diff /diff/1 /diff/1")
|
||||
self.match("")
|
||||
self.fail("intervals /diff/2")
|
||||
self.fail("intervals /diff/1 -d /diff/2")
|
||||
|
||||
self.ok("create /diff/2 uint8_1")
|
||||
self.ok("intervals -T /diff/1 -d /diff/2")
|
||||
self.match("")
|
||||
self.ok("insert -s 01-01-2000 -e 01-01-2001 /diff/1 /dev/null")
|
||||
|
||||
self.ok("intervals /diff/1")
|
||||
self.match("[ Sat, 01 Jan 2000 00:00:00.000000 +0000 -"
|
||||
"> Mon, 01 Jan 2001 00:00:00.000000 +0000 ]\n")
|
||||
|
||||
self.ok("intervals /diff/1 -d /diff/2")
|
||||
self.match("[ Sat, 01 Jan 2000 00:00:00.000000 +0000 -"
|
||||
"> Mon, 01 Jan 2001 00:00:00.000000 +0000 ]\n")
|
||||
|
||||
self.ok("insert -s 01-01-2000 -e 01-01-2001 /diff/2 /dev/null")
|
||||
self.ok("intervals /diff/1 -d /diff/2")
|
||||
self.match("")
|
||||
|
||||
self.ok("insert -s 01-01-2001 -e 01-01-2002 /diff/1 /dev/null")
|
||||
self.ok("insert -s 01-01-2002 -e 01-01-2003 /diff/2 /dev/null")
|
||||
self.ok("intervals /diff/1 -d /diff/2")
|
||||
self.match("[ Mon, 01 Jan 2001 00:00:00.000000 +0000 -"
|
||||
"> Tue, 01 Jan 2002 00:00:00.000000 +0000 ]\n")
|
||||
|
||||
self.ok("insert -s 01-01-2004 -e 01-01-2005 /diff/1 /dev/null")
|
||||
self.ok("intervals /diff/1 -d /diff/2")
|
||||
self.match("[ Mon, 01 Jan 2001 00:00:00.000000 +0000 -"
|
||||
"> Tue, 01 Jan 2002 00:00:00.000000 +0000 ]\n"
|
||||
"[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -"
|
||||
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
|
||||
|
||||
self.fail("intervals -s 01-01-2003 -e 01-01-2000 /diff/1 -d /diff/2")
|
||||
self.ok("intervals -s 01-01-2003 -e 01-01-2008 /diff/1 -d /diff/2")
|
||||
self.match("[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -"
|
||||
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
|
||||
|
||||
self.ok("destroy /diff/1")
|
||||
self.ok("destroy /diff/2")
|
||||
|
@@ -208,64 +208,89 @@ class TestInterval:
|
||||
makeset(" [-|-----|"))
|
||||
|
||||
|
||||
def test_intervalset_intersect(self):
|
||||
def test_intervalset_intersect_difference(self):
|
||||
# Test intersection (&)
|
||||
with assert_raises(TypeError): # was AttributeError
|
||||
x = makeset("[--)") & 1234
|
||||
|
||||
# Intersection with interval
|
||||
eq_(makeset("[---|---)[)") &
|
||||
list(makeset(" [------) "))[0],
|
||||
makeset(" [-----) "))
|
||||
def do_test(a, b, c, d):
|
||||
# a & b == c
|
||||
ab = IntervalSet()
|
||||
for x in b:
|
||||
for i in (a & x):
|
||||
ab += i
|
||||
eq_(ab,c)
|
||||
|
||||
# Intersection with sets
|
||||
eq_(makeset("[---------)") &
|
||||
makeset(" [---) "),
|
||||
makeset(" [---) "))
|
||||
# a \ b == d
|
||||
eq_(IntervalSet(a.set_difference(b)), d)
|
||||
|
||||
eq_(makeset(" [---) ") &
|
||||
makeset("[---------)"),
|
||||
makeset(" [---) "))
|
||||
|
||||
eq_(makeset(" [-----)") &
|
||||
makeset(" [-----) "),
|
||||
makeset(" [--) "))
|
||||
|
||||
eq_(makeset(" [--) [--)") &
|
||||
# Intersection with intervals
|
||||
do_test(makeset("[---|---)[)"),
|
||||
makeset(" [------) "),
|
||||
makeset(" [-) [-) "))
|
||||
makeset(" [-----) "), # intersection
|
||||
makeset("[-) [)")) # difference
|
||||
|
||||
eq_(makeset(" [---)") &
|
||||
do_test(makeset("[---------)"),
|
||||
makeset(" [---) "),
|
||||
makeset(" [---) "), # intersection
|
||||
makeset("[) [----)")) # difference
|
||||
|
||||
do_test(makeset(" [---) "),
|
||||
makeset("[---------)"),
|
||||
makeset(" [---) "), # intersection
|
||||
makeset(" ")) # difference
|
||||
|
||||
do_test(makeset(" [-----)"),
|
||||
makeset(" [-----) "),
|
||||
makeset(" [--) "), # intersection
|
||||
makeset(" [--)")) # difference
|
||||
|
||||
do_test(makeset(" [--) [--)"),
|
||||
makeset(" [------) "),
|
||||
makeset(" [-) [-) "), # intersection
|
||||
makeset(" [) [)")) # difference
|
||||
|
||||
do_test(makeset(" [---)"),
|
||||
makeset(" [--) "),
|
||||
makeset(" "))
|
||||
makeset(" "), # intersection
|
||||
makeset(" [---)")) # difference
|
||||
|
||||
eq_(makeset(" [-|---)") &
|
||||
do_test(makeset(" [-|---)"),
|
||||
makeset(" [-----|-) "),
|
||||
makeset(" [----) "))
|
||||
makeset(" [----) "), # intersection
|
||||
makeset(" [)")) # difference
|
||||
|
||||
eq_(makeset(" [-|-) ") &
|
||||
do_test(makeset(" [-|-) "),
|
||||
makeset(" [-|--|--) "),
|
||||
makeset(" [---) "))
|
||||
makeset(" [---) "), # intersection
|
||||
makeset(" ")) # difference
|
||||
|
||||
do_test(makeset("[-)[-)[-)[)"),
|
||||
makeset(" [) [|)[) "),
|
||||
makeset(" [) [) "), # intersection
|
||||
makeset("[) [-) [)[)")) # difference
|
||||
|
||||
# Border cases -- will give different results if intervals are
|
||||
# half open or fully closed. Right now, they are half open,
|
||||
# although that's a little messy since the database intervals
|
||||
# often contain a data point at the endpoint.
|
||||
half_open = True
|
||||
if half_open:
|
||||
eq_(makeset(" [---)") &
|
||||
# half open or fully closed. In nilmdb, they are half open.
|
||||
do_test(makeset(" [---)"),
|
||||
makeset(" [----) "),
|
||||
makeset(" "))
|
||||
eq_(makeset(" [----)[--)") &
|
||||
makeset(" "), # intersection
|
||||
makeset(" [---)")) # difference
|
||||
|
||||
do_test(makeset(" [----)[--)"),
|
||||
makeset("[-) [--) [)"),
|
||||
makeset(" [) [-) [)"))
|
||||
else:
|
||||
eq_(makeset(" [---)") &
|
||||
makeset(" [----) "),
|
||||
makeset(" . "))
|
||||
eq_(makeset(" [----)[--)") &
|
||||
makeset("[-) [--) [)"),
|
||||
makeset(" [) [-). [)"))
|
||||
makeset(" [) [-) [)"), # intersection
|
||||
makeset(" [-) [-) ")) # difference
|
||||
|
||||
# Set difference with bounds
|
||||
a = makeset(" [----)[--)")
|
||||
b = makeset("[-) [--) [)")
|
||||
c = makeset("[----) ")
|
||||
d = makeset(" [-) ")
|
||||
eq_(a.set_difference(b, list(c)[0]), d)
|
||||
|
||||
# Empty second set
|
||||
eq_(a.set_difference(IntervalSet()), a)
|
||||
|
||||
class TestIntervalDB:
|
||||
def test_dbinterval(self):
|
||||
@@ -371,4 +396,3 @@ class TestIntervalSpeed:
|
||||
aplotter.plot(speeds.keys(), speeds.values(), plot_slope=True)
|
||||
yappi.stop()
|
||||
yappi.print_stats(sort_type=yappi.SORTTYPE_TTOT, limit=10)
|
||||
|
||||
|
Reference in New Issue
Block a user