Compare commits
10 Commits
nilmdb-1.3
...
nilmdb-1.3
Author | SHA1 | Date | |
---|---|---|---|
09bc7eb48c | |||
b77f07a4cd | |||
59f0076306 | |||
83bc5bc775 | |||
6b1dfec828 | |||
d827f41fa5 | |||
7eca587fdf | |||
a351bc1b10 | |||
1d61d61a81 | |||
755255030b |
2
Makefile
2
Makefile
@@ -43,4 +43,4 @@ clean::
|
|||||||
gitclean::
|
gitclean::
|
||||||
git clean -dXf
|
git clean -dXf
|
||||||
|
|
||||||
.PHONY: all version build dist sdist install docs lint test clean
|
.PHONY: all version build dist sdist install docs lint test clean gitclean
|
||||||
|
@@ -147,13 +147,18 @@ class Client(object):
|
|||||||
ctx.insert(chunk)
|
ctx.insert(chunk)
|
||||||
return ctx.last_response
|
return ctx.last_response
|
||||||
|
|
||||||
def stream_intervals(self, path, start = None, end = None):
|
def stream_intervals(self, path, start = None, end = None, diffpath = None):
|
||||||
"""
|
"""
|
||||||
Return a generator that yields each stream interval.
|
Return a generator that yields each stream interval.
|
||||||
|
|
||||||
|
If diffpath is not None, yields only interval ranges that are
|
||||||
|
present in 'path' but not in 'diffpath'.
|
||||||
"""
|
"""
|
||||||
params = {
|
params = {
|
||||||
"path": path
|
"path": path
|
||||||
}
|
}
|
||||||
|
if diffpath is not None:
|
||||||
|
params["diffpath"] = diffpath
|
||||||
if start is not None:
|
if start is not None:
|
||||||
params["start"] = float_time_to_string(start)
|
params["start"] = float_time_to_string(start)
|
||||||
if end is not None:
|
if end is not None:
|
||||||
@@ -225,6 +230,7 @@ class StreamInserter(object):
|
|||||||
# See design.md for a discussion of how much data to send. This
|
# See design.md for a discussion of how much data to send. This
|
||||||
# is a soft limit -- we might send up to twice as much or so
|
# is a soft limit -- we might send up to twice as much or so
|
||||||
_max_data = 2 * 1024 * 1024
|
_max_data = 2 * 1024 * 1024
|
||||||
|
_max_data_after_send = 64 * 1024
|
||||||
|
|
||||||
# Delta to add to the final timestamp, if "end" wasn't given
|
# Delta to add to the final timestamp, if "end" wasn't given
|
||||||
_end_epsilon = 1e-6
|
_end_epsilon = 1e-6
|
||||||
@@ -270,6 +276,10 @@ class StreamInserter(object):
|
|||||||
# Send the block once we have enough data
|
# Send the block once we have enough data
|
||||||
if self._block_len >= maxdata:
|
if self._block_len >= maxdata:
|
||||||
self._send_block(final = False)
|
self._send_block(final = False)
|
||||||
|
if self._block_len >= self._max_data_after_send: # pragma: no cover
|
||||||
|
raise ValueError("too much data left over after trying"
|
||||||
|
" to send intermediate block; is it"
|
||||||
|
" missing newlines or malformed?")
|
||||||
|
|
||||||
def update_start(self, start):
|
def update_start(self, start):
|
||||||
"""Update the start time for the next contiguous interval.
|
"""Update the start time for the next contiguous interval.
|
||||||
@@ -366,7 +376,7 @@ class StreamInserter(object):
|
|||||||
(spos, epos) = self._get_last_noncomment(block)
|
(spos, epos) = self._get_last_noncomment(block)
|
||||||
end_ts = extract_timestamp(block[spos:epos])
|
end_ts = extract_timestamp(block[spos:epos])
|
||||||
except (ValueError, IndexError):
|
except (ValueError, IndexError):
|
||||||
# If we found no timestamp, give up; we'll send this
|
# If we found no timestamp, give up; we could send this
|
||||||
# block later when we have more data.
|
# block later when we have more data.
|
||||||
return
|
return
|
||||||
if spos == 0:
|
if spos == 0:
|
||||||
@@ -398,3 +408,5 @@ class StreamInserter(object):
|
|||||||
"start": float_time_to_string(start_ts),
|
"start": float_time_to_string(start_ts),
|
||||||
"end": float_time_to_string(end_ts) }
|
"end": float_time_to_string(end_ts) }
|
||||||
self.last_response = self._http.put("stream/insert", block, params)
|
self.last_response = self._http.put("stream/insert", block, params)
|
||||||
|
|
||||||
|
return
|
||||||
|
@@ -14,7 +14,8 @@ from argparse import ArgumentDefaultsHelpFormatter as def_form
|
|||||||
# Valid subcommands. Defined in separate files just to break
|
# Valid subcommands. Defined in separate files just to break
|
||||||
# things up -- they're still called with Cmdline as self.
|
# things up -- they're still called with Cmdline as self.
|
||||||
subcommands = [ "help", "info", "create", "list", "metadata",
|
subcommands = [ "help", "info", "create", "list", "metadata",
|
||||||
"insert", "extract", "remove", "destroy" ]
|
"insert", "extract", "remove", "destroy",
|
||||||
|
"intervals" ]
|
||||||
|
|
||||||
# Import the subcommand modules
|
# Import the subcommand modules
|
||||||
subcmd_mods = {}
|
subcmd_mods = {}
|
||||||
|
62
nilmdb/cmdline/intervals.py
Normal file
62
nilmdb/cmdline/intervals.py
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
from nilmdb.utils.printf import *
|
||||||
|
import nilmdb.utils.time
|
||||||
|
|
||||||
|
import fnmatch
|
||||||
|
import argparse
|
||||||
|
from argparse import ArgumentDefaultsHelpFormatter as def_form
|
||||||
|
|
||||||
|
def setup(self, sub):
|
||||||
|
cmd = sub.add_parser("intervals", help="List intervals",
|
||||||
|
formatter_class = def_form,
|
||||||
|
description="""
|
||||||
|
List intervals in a stream, similar to
|
||||||
|
'list --detail path'.
|
||||||
|
|
||||||
|
If '--diff diffpath' is provided, only
|
||||||
|
interval ranges that are present in 'path'
|
||||||
|
and not present in 'diffpath' are printed.
|
||||||
|
""")
|
||||||
|
cmd.set_defaults(verify = cmd_intervals_verify,
|
||||||
|
handler = cmd_intervals)
|
||||||
|
|
||||||
|
group = cmd.add_argument_group("Stream selection")
|
||||||
|
group.add_argument("path", metavar="PATH",
|
||||||
|
help="List intervals for this path")
|
||||||
|
group.add_argument("-d", "--diff", metavar="PATH",
|
||||||
|
help="Subtract intervals from this path")
|
||||||
|
|
||||||
|
group = cmd.add_argument_group("Interval details")
|
||||||
|
group.add_argument("-s", "--start",
|
||||||
|
metavar="TIME", type=self.arg_time,
|
||||||
|
help="Starting timestamp for intervals "
|
||||||
|
"(free-form, inclusive)")
|
||||||
|
group.add_argument("-e", "--end",
|
||||||
|
metavar="TIME", type=self.arg_time,
|
||||||
|
help="Ending timestamp for intervals "
|
||||||
|
"(free-form, noninclusive)")
|
||||||
|
|
||||||
|
group = cmd.add_argument_group("Misc options")
|
||||||
|
group.add_argument("-T", "--timestamp-raw", action="store_true",
|
||||||
|
help="Show raw timestamps when printing times")
|
||||||
|
|
||||||
|
return cmd
|
||||||
|
|
||||||
|
def cmd_intervals_verify(self):
|
||||||
|
if self.args.start is not None and self.args.end is not None:
|
||||||
|
if self.args.start >= self.args.end:
|
||||||
|
self.parser.error("start must precede end")
|
||||||
|
|
||||||
|
def cmd_intervals(self):
|
||||||
|
"""List intervals in a stream"""
|
||||||
|
if self.args.timestamp_raw:
|
||||||
|
time_string = nilmdb.utils.time.float_time_to_string
|
||||||
|
else:
|
||||||
|
time_string = nilmdb.utils.time.format_time
|
||||||
|
|
||||||
|
try:
|
||||||
|
for (start, end) in self.client.stream_intervals(
|
||||||
|
self.args.path, self.args.start, self.args.end, self.args.diff):
|
||||||
|
printf("[ %s -> %s ]\n", time_string(start), time_string(end))
|
||||||
|
except nilmdb.client.ClientError as e:
|
||||||
|
self.die("error listing intervals: %s", str(e))
|
||||||
|
|
@@ -367,22 +367,27 @@ class Table(object):
|
|||||||
) = f.append_string(count, data, data_offset, linenum,
|
) = f.append_string(count, data, data_offset, linenum,
|
||||||
start, end, last_timestamp)
|
start, end, last_timestamp)
|
||||||
except rocket.ParseError as e:
|
except rocket.ParseError as e:
|
||||||
(linenum, errtype, obj) = e.args
|
(linenum, colnum, errtype, obj) = e.args
|
||||||
|
where = "line %d, column %d: " % (linenum, colnum)
|
||||||
|
# Extract out the error line, add column marker
|
||||||
|
try:
|
||||||
|
bad = data.splitlines()[linenum-1]
|
||||||
|
badptr = ' ' * (colnum - 1) + '^'
|
||||||
|
except IndexError: # pragma: no cover
|
||||||
|
bad = ""
|
||||||
if errtype == rocket.ERR_NON_MONOTONIC:
|
if errtype == rocket.ERR_NON_MONOTONIC:
|
||||||
err = sprintf("line %d: timestamp is not monotonically "
|
err = "timestamp is not monotonically increasing"
|
||||||
"increasing", linenum)
|
|
||||||
elif errtype == rocket.ERR_OUT_OF_INTERVAL:
|
elif errtype == rocket.ERR_OUT_OF_INTERVAL:
|
||||||
if obj < start:
|
if obj < start:
|
||||||
err = sprintf("line %d: Data timestamp %s < "
|
err = sprintf("Data timestamp %s < start time %s",
|
||||||
"start time %s", linenum,
|
|
||||||
ftts(obj), ftts(start))
|
ftts(obj), ftts(start))
|
||||||
else:
|
else:
|
||||||
err = sprintf("line %d: Data timestamp %s >= "
|
err = sprintf("Data timestamp %s >= end time %s",
|
||||||
"end time %s", linenum,
|
|
||||||
ftts(obj), ftts(end))
|
ftts(obj), ftts(end))
|
||||||
else:
|
else:
|
||||||
err = sprintf("line %d: %s", linenum, str(obj))
|
err = str(obj)
|
||||||
raise ValueError("error parsing input data: " + err)
|
raise ValueError("error parsing input data: " +
|
||||||
|
where + err + "\n" + bad + "\n" + badptr)
|
||||||
tot_rows += added_rows
|
tot_rows += added_rows
|
||||||
except Exception:
|
except Exception:
|
||||||
# Some failure, so try to roll things back by truncating or
|
# Some failure, so try to roll things back by truncating or
|
||||||
|
@@ -20,6 +20,8 @@ Intervals are half-open, ie. they include data points with timestamps
|
|||||||
# and ends directly in the tree, like bxinterval did.
|
# and ends directly in the tree, like bxinterval did.
|
||||||
|
|
||||||
from ..utils.time import float_time_to_string as ftts
|
from ..utils.time import float_time_to_string as ftts
|
||||||
|
from ..utils.iterator import imerge
|
||||||
|
import itertools
|
||||||
|
|
||||||
cimport rbtree
|
cimport rbtree
|
||||||
cdef extern from "stdint.h":
|
cdef extern from "stdint.h":
|
||||||
@@ -264,21 +266,15 @@ cdef class IntervalSet:
|
|||||||
|
|
||||||
def __and__(self, other not None):
|
def __and__(self, other not None):
|
||||||
"""
|
"""
|
||||||
Compute a new IntervalSet from the intersection of two others
|
Compute a new IntervalSet from the intersection of this
|
||||||
|
IntervalSet with one other interval.
|
||||||
|
|
||||||
Output intervals are built as subsets of the intervals in the
|
Output intervals are built as subsets of the intervals in the
|
||||||
first argument (self).
|
first argument (self).
|
||||||
"""
|
"""
|
||||||
out = IntervalSet()
|
out = IntervalSet()
|
||||||
|
|
||||||
if not isinstance(other, IntervalSet):
|
|
||||||
for i in self.intersection(other):
|
for i in self.intersection(other):
|
||||||
out.tree.insert(rbtree.RBNode(i.start, i.end, i))
|
out.tree.insert(rbtree.RBNode(i.start, i.end, i))
|
||||||
else:
|
|
||||||
for x in other:
|
|
||||||
for i in self.intersection(x):
|
|
||||||
out.tree.insert(rbtree.RBNode(i.start, i.end, i))
|
|
||||||
|
|
||||||
return out
|
return out
|
||||||
|
|
||||||
def intersection(self, Interval interval not None, orig = False):
|
def intersection(self, Interval interval not None, orig = False):
|
||||||
@@ -313,6 +309,62 @@ cdef class IntervalSet:
|
|||||||
else:
|
else:
|
||||||
yield subset
|
yield subset
|
||||||
|
|
||||||
|
def set_difference(self, IntervalSet other not None,
|
||||||
|
Interval bounds = None):
|
||||||
|
"""
|
||||||
|
Compute the difference (self \\ other) between this
|
||||||
|
IntervalSet and the given IntervalSet; i.e., the ranges
|
||||||
|
that are present in 'self' but not 'other'.
|
||||||
|
|
||||||
|
If 'bounds' is not None, results are limited to the range
|
||||||
|
specified by the interval 'bounds'.
|
||||||
|
|
||||||
|
Returns a generator that yields each interval in turn.
|
||||||
|
Output intervals are built as subsets of the intervals in the
|
||||||
|
first argument (self).
|
||||||
|
"""
|
||||||
|
# Iterate through all starts and ends in sorted order. Add a
|
||||||
|
# tag to the iterator so that we can figure out which one they
|
||||||
|
# were, after sorting.
|
||||||
|
def decorate(it, key_start, key_end):
|
||||||
|
for i in it:
|
||||||
|
yield i.start, key_start, i
|
||||||
|
yield i.end, key_end, i
|
||||||
|
if bounds is None:
|
||||||
|
bounds = Interval(-1e12, 1e12)
|
||||||
|
self_iter = decorate(self.intersection(bounds), 0, 2)
|
||||||
|
other_iter = decorate(other.intersection(bounds), 1, 3)
|
||||||
|
|
||||||
|
# Now iterate over the timestamps of each start and end.
|
||||||
|
# At each point, evaluate which type of end it is, to determine
|
||||||
|
# how to build up the output intervals.
|
||||||
|
self_interval = None
|
||||||
|
other_interval = None
|
||||||
|
out_start = None
|
||||||
|
for (ts, k, i) in imerge(self_iter, other_iter):
|
||||||
|
if k == 0:
|
||||||
|
# start self interval
|
||||||
|
self_interval = i
|
||||||
|
if other_interval is None:
|
||||||
|
out_start = ts
|
||||||
|
elif k == 1:
|
||||||
|
# start other interval
|
||||||
|
other_interval = i
|
||||||
|
if out_start is not None and out_start != ts:
|
||||||
|
yield self_interval.subset(out_start, ts)
|
||||||
|
out_start = None
|
||||||
|
elif k == 2:
|
||||||
|
# end self interval
|
||||||
|
if out_start is not None and out_start != ts:
|
||||||
|
yield self_interval.subset(out_start, ts)
|
||||||
|
out_start = None
|
||||||
|
self_interval = None
|
||||||
|
elif k == 3:
|
||||||
|
# end other interval
|
||||||
|
other_interval = None
|
||||||
|
if self_interval:
|
||||||
|
out_start = ts
|
||||||
|
|
||||||
cpdef intersects(self, Interval other):
|
cpdef intersects(self, Interval other):
|
||||||
"""Return True if this IntervalSet intersects another interval"""
|
"""Return True if this IntervalSet intersects another interval"""
|
||||||
for n in self.tree.intersect(other.start, other.end):
|
for n in self.tree.intersect(other.start, other.end):
|
||||||
|
@@ -290,8 +290,8 @@ class NilmDB(object):
|
|||||||
query = "SELECT streams.path, streams.layout"
|
query = "SELECT streams.path, streams.layout"
|
||||||
if extended:
|
if extended:
|
||||||
query += ", min(ranges.start_time), max(ranges.end_time) "
|
query += ", min(ranges.start_time), max(ranges.end_time) "
|
||||||
query += ", sum(ranges.end_pos - ranges.start_pos) "
|
query += ", coalesce(sum(ranges.end_pos - ranges.start_pos), 0) "
|
||||||
query += ", sum(ranges.end_time - ranges.start_time) "
|
query += ", coalesce(sum(ranges.end_time - ranges.start_time), 0) "
|
||||||
query += " FROM streams"
|
query += " FROM streams"
|
||||||
if extended:
|
if extended:
|
||||||
query += " LEFT JOIN ranges ON streams.id = ranges.stream_id"
|
query += " LEFT JOIN ranges ON streams.id = ranges.stream_id"
|
||||||
@@ -306,8 +306,13 @@ class NilmDB(object):
|
|||||||
result = self.con.execute(query, params).fetchall()
|
result = self.con.execute(query, params).fetchall()
|
||||||
return [ list(x) for x in result ]
|
return [ list(x) for x in result ]
|
||||||
|
|
||||||
def stream_intervals(self, path, start = None, end = None):
|
def stream_intervals(self, path, start = None, end = None, diffpath = None):
|
||||||
"""
|
"""
|
||||||
|
List all intervals in 'path' between 'start' and 'end'. If
|
||||||
|
'diffpath' is not none, list instead the set-difference
|
||||||
|
between the intervals in the two streams; i.e. all interval
|
||||||
|
ranges that are present in 'path' but not 'path2'.
|
||||||
|
|
||||||
Returns (intervals, restart) tuple.
|
Returns (intervals, restart) tuple.
|
||||||
|
|
||||||
intervals is a list of [start,end] timestamps of all intervals
|
intervals is a list of [start,end] timestamps of all intervals
|
||||||
@@ -321,10 +326,17 @@ class NilmDB(object):
|
|||||||
"""
|
"""
|
||||||
stream_id = self._stream_id(path)
|
stream_id = self._stream_id(path)
|
||||||
intervals = self._get_intervals(stream_id)
|
intervals = self._get_intervals(stream_id)
|
||||||
|
if diffpath:
|
||||||
|
diffstream_id = self._stream_id(diffpath)
|
||||||
|
diffintervals = self._get_intervals(diffstream_id)
|
||||||
(start, end) = self._check_user_times(start, end)
|
(start, end) = self._check_user_times(start, end)
|
||||||
requested = Interval(start, end)
|
requested = Interval(start, end)
|
||||||
result = []
|
result = []
|
||||||
for n, i in enumerate(intervals.intersection(requested)):
|
if diffpath:
|
||||||
|
getter = intervals.set_difference(diffintervals, requested)
|
||||||
|
else:
|
||||||
|
getter = intervals.intersection(requested)
|
||||||
|
for n, i in enumerate(getter):
|
||||||
if n >= self.max_results:
|
if n >= self.max_results:
|
||||||
restart = i.start
|
restart = i.start
|
||||||
break
|
break
|
||||||
|
@@ -117,12 +117,12 @@ class Rocket(object):
|
|||||||
try:
|
try:
|
||||||
(ts, row) = self.layoutparser.parse(line)
|
(ts, row) = self.layoutparser.parse(line)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
raise ParseError(linenum, ERR_UNKNOWN, e)
|
raise ParseError(linenum, 0, ERR_UNKNOWN, e)
|
||||||
if ts <= last_timestamp:
|
if ts <= last_timestamp:
|
||||||
raise ParseError(linenum, ERR_NON_MONOTONIC, ts)
|
raise ParseError(linenum, 0, ERR_NON_MONOTONIC, ts)
|
||||||
last_timestamp = ts
|
last_timestamp = ts
|
||||||
if ts < start or ts >= end:
|
if ts < start or ts >= end:
|
||||||
raise ParseError(linenum, ERR_OUT_OF_INTERVAL, ts)
|
raise ParseError(linenum, 0, ERR_OUT_OF_INTERVAL, ts)
|
||||||
self.append_iter(1, [row])
|
self.append_iter(1, [row])
|
||||||
written += 1
|
written += 1
|
||||||
return (written, indata.tell(), last_timestamp, linenum)
|
return (written, indata.tell(), last_timestamp, linenum)
|
||||||
|
@@ -2,6 +2,7 @@
|
|||||||
#include <structmember.h>
|
#include <structmember.h>
|
||||||
#include <endian.h>
|
#include <endian.h>
|
||||||
|
|
||||||
|
#include <ctype.h>
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
|
||||||
/* Values missing from stdint.h */
|
/* Values missing from stdint.h */
|
||||||
@@ -18,7 +19,7 @@
|
|||||||
|
|
||||||
/* Somewhat arbitrary, just so we can use fixed sizes for strings
|
/* Somewhat arbitrary, just so we can use fixed sizes for strings
|
||||||
etc. */
|
etc. */
|
||||||
static const int MAX_LAYOUT_COUNT = 64;
|
static const int MAX_LAYOUT_COUNT = 128;
|
||||||
|
|
||||||
/* Error object and constants */
|
/* Error object and constants */
|
||||||
static PyObject *ParseError;
|
static PyObject *ParseError;
|
||||||
@@ -35,20 +36,20 @@ static void add_parseerror_codes(PyObject *module)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Helpers to raise ParseErrors. Use "return raise_str(...)" etc. */
|
/* Helpers to raise ParseErrors. Use "return raise_str(...)" etc. */
|
||||||
static PyObject *raise_str(int linenum, int code, const char *string)
|
static PyObject *raise_str(int line, int col, int code, const char *string)
|
||||||
{
|
{
|
||||||
PyObject *o;
|
PyObject *o;
|
||||||
o = Py_BuildValue("(iis)", linenum, code, string);
|
o = Py_BuildValue("(iiis)", line, col, code, string);
|
||||||
if (o != NULL) {
|
if (o != NULL) {
|
||||||
PyErr_SetObject(ParseError, o);
|
PyErr_SetObject(ParseError, o);
|
||||||
Py_DECREF(o);
|
Py_DECREF(o);
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
static PyObject *raise_num(int linenum, int code, double num)
|
static PyObject *raise_num(int line, int col, int code, double num)
|
||||||
{
|
{
|
||||||
PyObject *o;
|
PyObject *o;
|
||||||
o = Py_BuildValue("(iid)", linenum, code, num);
|
o = Py_BuildValue("(iiid)", line, col, code, num);
|
||||||
if (o != NULL) {
|
if (o != NULL) {
|
||||||
PyErr_SetObject(ParseError, o);
|
PyErr_SetObject(ParseError, o);
|
||||||
Py_DECREF(o);
|
Py_DECREF(o);
|
||||||
@@ -352,6 +353,7 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
|
|||||||
int count;
|
int count;
|
||||||
const char *data;
|
const char *data;
|
||||||
int offset;
|
int offset;
|
||||||
|
const char *linestart;
|
||||||
int linenum;
|
int linenum;
|
||||||
double start;
|
double start;
|
||||||
double end;
|
double end;
|
||||||
@@ -374,14 +376,22 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
|
|||||||
&start, &end, &last_timestamp))
|
&start, &end, &last_timestamp))
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
||||||
|
/* Skip spaces, but don't skip over a newline. */
|
||||||
|
#define SKIP_BLANK(buf) do { \
|
||||||
|
while (isspace(*buf)) { \
|
||||||
|
if (*buf == '\n') \
|
||||||
|
break; \
|
||||||
|
buf++; \
|
||||||
|
} } while(0)
|
||||||
|
|
||||||
const char *buf = &data[offset];
|
const char *buf = &data[offset];
|
||||||
while (written < count && *buf)
|
while (written < count && *buf)
|
||||||
{
|
{
|
||||||
|
linestart = buf;
|
||||||
linenum++;
|
linenum++;
|
||||||
|
|
||||||
/* Skip leading whitespace and commented lines */
|
/* Skip leading whitespace and commented lines */
|
||||||
while (*buf == ' ' || *buf == '\t')
|
SKIP_BLANK(buf);
|
||||||
buf++;
|
|
||||||
if (*buf == '#') {
|
if (*buf == '#') {
|
||||||
while (*buf && *buf != '\n')
|
while (*buf && *buf != '\n')
|
||||||
buf++;
|
buf++;
|
||||||
@@ -393,12 +403,15 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
|
|||||||
/* Extract timestamp */
|
/* Extract timestamp */
|
||||||
t64.d = strtod(buf, &endptr);
|
t64.d = strtod(buf, &endptr);
|
||||||
if (endptr == buf)
|
if (endptr == buf)
|
||||||
return raise_str(linenum, ERR_OTHER, "bad timestamp");
|
return raise_str(linenum, buf - linestart + 1,
|
||||||
|
ERR_OTHER, "bad timestamp");
|
||||||
if (t64.d <= last_timestamp)
|
if (t64.d <= last_timestamp)
|
||||||
return raise_num(linenum, ERR_NON_MONOTONIC, t64.d);
|
return raise_num(linenum, buf - linestart + 1,
|
||||||
|
ERR_NON_MONOTONIC, t64.d);
|
||||||
last_timestamp = t64.d;
|
last_timestamp = t64.d;
|
||||||
if (t64.d < start || t64.d >= end)
|
if (t64.d < start || t64.d >= end)
|
||||||
return raise_num(linenum, ERR_OUT_OF_INTERVAL, t64.d);
|
return raise_num(linenum, buf - linestart + 1,
|
||||||
|
ERR_OUT_OF_INTERVAL, t64.d);
|
||||||
t64.u = le64toh(t64.u);
|
t64.u = le64toh(t64.u);
|
||||||
if (fwrite(&t64.u, 8, 1, self->file) != 1)
|
if (fwrite(&t64.u, 8, 1, self->file) != 1)
|
||||||
goto err;
|
goto err;
|
||||||
@@ -410,23 +423,31 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
|
|||||||
case LAYOUT_TYPE_##type: \
|
case LAYOUT_TYPE_##type: \
|
||||||
/* parse and write in a loop */ \
|
/* parse and write in a loop */ \
|
||||||
for (i = 0; i < self->layout_count; i++) { \
|
for (i = 0; i < self->layout_count; i++) { \
|
||||||
parsetype = parsefunc(buf, &endptr); \
|
/* skip non-newlines */ \
|
||||||
if (endptr == buf) \
|
SKIP_BLANK(buf); \
|
||||||
|
if (*buf == '\n') \
|
||||||
goto wrong_number_of_values; \
|
goto wrong_number_of_values; \
|
||||||
|
/* parse number */ \
|
||||||
|
parsetype = parsefunc(buf, &endptr); \
|
||||||
|
if (*endptr && !isspace(*endptr)) \
|
||||||
|
goto cant_parse_value; \
|
||||||
|
/* check limits */ \
|
||||||
if (type##_MIN != type##_MAX && \
|
if (type##_MIN != type##_MAX && \
|
||||||
(parsetype < type##_MIN || \
|
(parsetype < type##_MIN || \
|
||||||
parsetype > type##_MAX)) \
|
parsetype > type##_MAX)) \
|
||||||
goto value_out_of_range; \
|
goto value_out_of_range; \
|
||||||
|
/* convert to disk representation */ \
|
||||||
realtype = parsetype; \
|
realtype = parsetype; \
|
||||||
disktype = letoh(disktype); \
|
disktype = letoh(disktype); \
|
||||||
|
/* write it */ \
|
||||||
if (fwrite(&disktype, bytes, \
|
if (fwrite(&disktype, bytes, \
|
||||||
1, self->file) != 1) \
|
1, self->file) != 1) \
|
||||||
goto err; \
|
goto err; \
|
||||||
|
/* advance buf */ \
|
||||||
buf = endptr; \
|
buf = endptr; \
|
||||||
} \
|
} \
|
||||||
/* Skip trailing whitespace and comments */ \
|
/* Skip trailing whitespace and comments */ \
|
||||||
while (*buf == ' ' || *buf == '\t') \
|
SKIP_BLANK(buf); \
|
||||||
buf++; \
|
|
||||||
if (*buf == '#') \
|
if (*buf == '#') \
|
||||||
while (*buf && *buf != '\n') \
|
while (*buf && *buf != '\n') \
|
||||||
buf++; \
|
buf++; \
|
||||||
@@ -466,12 +487,18 @@ static PyObject *Rocket_append_string(Rocket *self, PyObject *args)
|
|||||||
err:
|
err:
|
||||||
PyErr_SetFromErrno(PyExc_OSError);
|
PyErr_SetFromErrno(PyExc_OSError);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
cant_parse_value:
|
||||||
|
return raise_str(linenum, buf - linestart + 1,
|
||||||
|
ERR_OTHER, "can't parse value");
|
||||||
wrong_number_of_values:
|
wrong_number_of_values:
|
||||||
return raise_str(linenum, ERR_OTHER, "wrong number of values");
|
return raise_str(linenum, buf - linestart + 1,
|
||||||
|
ERR_OTHER, "wrong number of values");
|
||||||
value_out_of_range:
|
value_out_of_range:
|
||||||
return raise_str(linenum, ERR_OTHER, "value out of range");
|
return raise_str(linenum, buf - linestart + 1,
|
||||||
|
ERR_OTHER, "value out of range");
|
||||||
extra_data_on_line:
|
extra_data_on_line:
|
||||||
return raise_str(linenum, ERR_OTHER, "extra data on line");
|
return raise_str(linenum, buf - linestart + 1,
|
||||||
|
ERR_OTHER, "extra data on line");
|
||||||
}
|
}
|
||||||
|
|
||||||
/****
|
/****
|
||||||
|
@@ -332,16 +332,22 @@ class Stream(NilmApp):
|
|||||||
|
|
||||||
# /stream/intervals?path=/newton/prep
|
# /stream/intervals?path=/newton/prep
|
||||||
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
|
||||||
|
# /stream/intervals?path=/newton/prep&diffpath=/newton/prep2
|
||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
@chunked_response
|
@chunked_response
|
||||||
@response_type("application/x-json-stream")
|
@response_type("application/x-json-stream")
|
||||||
def intervals(self, path, start = None, end = None):
|
def intervals(self, path, start = None, end = None, diffpath = None):
|
||||||
"""
|
"""
|
||||||
Get intervals from backend database. Streams the resulting
|
Get intervals from backend database. Streams the resulting
|
||||||
intervals as JSON strings separated by CR LF pairs. This may
|
intervals as JSON strings separated by CR LF pairs. This may
|
||||||
make multiple requests to the nilmdb backend to avoid causing
|
make multiple requests to the nilmdb backend to avoid causing
|
||||||
it to block for too long.
|
it to block for too long.
|
||||||
|
|
||||||
|
Returns intervals between 'start' and 'end' belonging to
|
||||||
|
'path'. If 'diff' is provided, the set-difference between
|
||||||
|
intervals in 'path' and intervals in 'diffpath' are
|
||||||
|
returned instead.
|
||||||
|
|
||||||
Note that the response type is the non-standard
|
Note that the response type is the non-standard
|
||||||
'application/x-json-stream' for lack of a better option.
|
'application/x-json-stream' for lack of a better option.
|
||||||
"""
|
"""
|
||||||
@@ -355,15 +361,18 @@ class Stream(NilmApp):
|
|||||||
raise cherrypy.HTTPError("400 Bad Request",
|
raise cherrypy.HTTPError("400 Bad Request",
|
||||||
"start must precede end")
|
"start must precede end")
|
||||||
|
|
||||||
streams = self.db.stream_list(path = path)
|
if len(self.db.stream_list(path = path)) != 1:
|
||||||
if len(streams) != 1:
|
raise cherrypy.HTTPError("404", "No such stream: " + path)
|
||||||
raise cherrypy.HTTPError("404 Not Found", "No such stream")
|
|
||||||
|
if diffpath and len(self.db.stream_list(path = diffpath)) != 1:
|
||||||
|
raise cherrypy.HTTPError("404", "No such stream: " + diffpath)
|
||||||
|
|
||||||
@workaround_cp_bug_1200
|
@workaround_cp_bug_1200
|
||||||
def content(start, end):
|
def content(start, end):
|
||||||
# Note: disable chunked responses to see tracebacks from here.
|
# Note: disable chunked responses to see tracebacks from here.
|
||||||
while True:
|
while True:
|
||||||
(ints, restart) = self.db.stream_intervals(path, start, end)
|
(ints, restart) = self.db.stream_intervals(path, start, end,
|
||||||
|
diffpath)
|
||||||
response = ''.join([ json.dumps(i) + "\r\n" for i in ints ])
|
response = ''.join([ json.dumps(i) + "\r\n" for i in ints ])
|
||||||
yield response
|
yield response
|
||||||
if restart == 0:
|
if restart == 0:
|
||||||
|
@@ -10,3 +10,4 @@ from nilmdb.utils import atomic
|
|||||||
import nilmdb.utils.threadsafety
|
import nilmdb.utils.threadsafety
|
||||||
import nilmdb.utils.fallocate
|
import nilmdb.utils.fallocate
|
||||||
import nilmdb.utils.time
|
import nilmdb.utils.time
|
||||||
|
import nilmdb.utils.iterator
|
||||||
|
36
nilmdb/utils/iterator.py
Normal file
36
nilmdb/utils/iterator.py
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
# Misc iterator tools
|
||||||
|
|
||||||
|
# Iterator merging, based on http://code.activestate.com/recipes/491285/
|
||||||
|
import heapq
|
||||||
|
def imerge(*iterables):
|
||||||
|
'''Merge multiple sorted inputs into a single sorted output.
|
||||||
|
|
||||||
|
Equivalent to: sorted(itertools.chain(*iterables))
|
||||||
|
|
||||||
|
>>> list(imerge([1,3,5,7], [0,2,4,8], [5,10,15,20], [], [25]))
|
||||||
|
[0, 1, 2, 3, 4, 5, 5, 7, 8, 10, 15, 20, 25]
|
||||||
|
|
||||||
|
'''
|
||||||
|
heappop, siftup, _Stop = heapq.heappop, heapq._siftup, StopIteration
|
||||||
|
|
||||||
|
h = []
|
||||||
|
h_append = h.append
|
||||||
|
for it in map(iter, iterables):
|
||||||
|
try:
|
||||||
|
next = it.next
|
||||||
|
h_append([next(), next])
|
||||||
|
except _Stop:
|
||||||
|
pass
|
||||||
|
heapq.heapify(h)
|
||||||
|
|
||||||
|
while 1:
|
||||||
|
try:
|
||||||
|
while 1:
|
||||||
|
v, next = s = h[0] # raises IndexError when h is empty
|
||||||
|
yield v
|
||||||
|
s[0] = next() # raises StopIteration when exhausted
|
||||||
|
siftup(h, 0) # restore heap condition
|
||||||
|
except _Stop:
|
||||||
|
heappop(h) # remove empty iterator
|
||||||
|
except IndexError:
|
||||||
|
return
|
@@ -394,7 +394,7 @@ class TestCmdline(object):
|
|||||||
self.fail("insert -s 20120323T1004 -e 20120323T1006 /newton/prep",
|
self.fail("insert -s 20120323T1004 -e 20120323T1006 /newton/prep",
|
||||||
input)
|
input)
|
||||||
self.contain("error parsing input data")
|
self.contain("error parsing input data")
|
||||||
self.contain("line 7:")
|
self.contain("line 7")
|
||||||
self.contain("timestamp is not monotonically increasing")
|
self.contain("timestamp is not monotonically increasing")
|
||||||
|
|
||||||
# insert pre-timestamped data, from stdin
|
# insert pre-timestamped data, from stdin
|
||||||
@@ -436,6 +436,15 @@ class TestCmdline(object):
|
|||||||
self.fail("insert -t -r 120 -f /newton/raw "
|
self.fail("insert -t -r 120 -f /newton/raw "
|
||||||
"tests/data/prep-20120323T1004")
|
"tests/data/prep-20120323T1004")
|
||||||
self.contain("error parsing input data")
|
self.contain("error parsing input data")
|
||||||
|
self.contain("can't parse value")
|
||||||
|
|
||||||
|
# too few rows per line
|
||||||
|
self.ok("create /insert/test float32_20")
|
||||||
|
self.fail("insert -t -r 120 -f /insert/test "
|
||||||
|
"tests/data/prep-20120323T1004")
|
||||||
|
self.contain("error parsing input data")
|
||||||
|
self.contain("wrong number of values")
|
||||||
|
self.ok("destroy /insert/test")
|
||||||
|
|
||||||
# empty data does nothing
|
# empty data does nothing
|
||||||
self.ok("insert -t -r 120 --start '03/23/2012 06:05:00' /newton/prep "
|
self.ok("insert -t -r 120 --start '03/23/2012 06:05:00' /newton/prep "
|
||||||
@@ -893,3 +902,56 @@ class TestCmdline(object):
|
|||||||
# See if we can extract it all
|
# See if we can extract it all
|
||||||
self.ok("extract /newton/prep --start 2000-01-01 --end 2020-01-01")
|
self.ok("extract /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||||
lines_(self.captured, 15600)
|
lines_(self.captured, 15600)
|
||||||
|
|
||||||
|
def test_15_intervals_diff(self):
|
||||||
|
# Test "intervals" and "intervals --diff" command.
|
||||||
|
os.environ['TZ'] = "UTC"
|
||||||
|
|
||||||
|
self.ok("create /diff/1 uint8_1")
|
||||||
|
self.match("")
|
||||||
|
self.ok("intervals /diff/1")
|
||||||
|
self.match("")
|
||||||
|
self.ok("intervals /diff/1 --diff /diff/1")
|
||||||
|
self.match("")
|
||||||
|
self.ok("intervals --diff /diff/1 /diff/1")
|
||||||
|
self.match("")
|
||||||
|
self.fail("intervals /diff/2")
|
||||||
|
self.fail("intervals /diff/1 -d /diff/2")
|
||||||
|
|
||||||
|
self.ok("create /diff/2 uint8_1")
|
||||||
|
self.ok("intervals -T /diff/1 -d /diff/2")
|
||||||
|
self.match("")
|
||||||
|
self.ok("insert -s 01-01-2000 -e 01-01-2001 /diff/1 /dev/null")
|
||||||
|
|
||||||
|
self.ok("intervals /diff/1")
|
||||||
|
self.match("[ Sat, 01 Jan 2000 00:00:00.000000 +0000 -"
|
||||||
|
"> Mon, 01 Jan 2001 00:00:00.000000 +0000 ]\n")
|
||||||
|
|
||||||
|
self.ok("intervals /diff/1 -d /diff/2")
|
||||||
|
self.match("[ Sat, 01 Jan 2000 00:00:00.000000 +0000 -"
|
||||||
|
"> Mon, 01 Jan 2001 00:00:00.000000 +0000 ]\n")
|
||||||
|
|
||||||
|
self.ok("insert -s 01-01-2000 -e 01-01-2001 /diff/2 /dev/null")
|
||||||
|
self.ok("intervals /diff/1 -d /diff/2")
|
||||||
|
self.match("")
|
||||||
|
|
||||||
|
self.ok("insert -s 01-01-2001 -e 01-01-2002 /diff/1 /dev/null")
|
||||||
|
self.ok("insert -s 01-01-2002 -e 01-01-2003 /diff/2 /dev/null")
|
||||||
|
self.ok("intervals /diff/1 -d /diff/2")
|
||||||
|
self.match("[ Mon, 01 Jan 2001 00:00:00.000000 +0000 -"
|
||||||
|
"> Tue, 01 Jan 2002 00:00:00.000000 +0000 ]\n")
|
||||||
|
|
||||||
|
self.ok("insert -s 01-01-2004 -e 01-01-2005 /diff/1 /dev/null")
|
||||||
|
self.ok("intervals /diff/1 -d /diff/2")
|
||||||
|
self.match("[ Mon, 01 Jan 2001 00:00:00.000000 +0000 -"
|
||||||
|
"> Tue, 01 Jan 2002 00:00:00.000000 +0000 ]\n"
|
||||||
|
"[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -"
|
||||||
|
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
|
||||||
|
|
||||||
|
self.fail("intervals -s 01-01-2003 -e 01-01-2000 /diff/1 -d /diff/2")
|
||||||
|
self.ok("intervals -s 01-01-2003 -e 01-01-2008 /diff/1 -d /diff/2")
|
||||||
|
self.match("[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -"
|
||||||
|
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
|
||||||
|
|
||||||
|
self.ok("destroy /diff/1")
|
||||||
|
self.ok("destroy /diff/2")
|
||||||
|
@@ -208,64 +208,89 @@ class TestInterval:
|
|||||||
makeset(" [-|-----|"))
|
makeset(" [-|-----|"))
|
||||||
|
|
||||||
|
|
||||||
def test_intervalset_intersect(self):
|
def test_intervalset_intersect_difference(self):
|
||||||
# Test intersection (&)
|
# Test intersection (&)
|
||||||
with assert_raises(TypeError): # was AttributeError
|
with assert_raises(TypeError): # was AttributeError
|
||||||
x = makeset("[--)") & 1234
|
x = makeset("[--)") & 1234
|
||||||
|
|
||||||
# Intersection with interval
|
def do_test(a, b, c, d):
|
||||||
eq_(makeset("[---|---)[)") &
|
# a & b == c
|
||||||
list(makeset(" [------) "))[0],
|
ab = IntervalSet()
|
||||||
makeset(" [-----) "))
|
for x in b:
|
||||||
|
for i in (a & x):
|
||||||
|
ab += i
|
||||||
|
eq_(ab,c)
|
||||||
|
|
||||||
# Intersection with sets
|
# a \ b == d
|
||||||
eq_(makeset("[---------)") &
|
eq_(IntervalSet(a.set_difference(b)), d)
|
||||||
makeset(" [---) "),
|
|
||||||
makeset(" [---) "))
|
|
||||||
|
|
||||||
eq_(makeset(" [---) ") &
|
# Intersection with intervals
|
||||||
makeset("[---------)"),
|
do_test(makeset("[---|---)[)"),
|
||||||
makeset(" [---) "))
|
|
||||||
|
|
||||||
eq_(makeset(" [-----)") &
|
|
||||||
makeset(" [-----) "),
|
|
||||||
makeset(" [--) "))
|
|
||||||
|
|
||||||
eq_(makeset(" [--) [--)") &
|
|
||||||
makeset(" [------) "),
|
makeset(" [------) "),
|
||||||
makeset(" [-) [-) "))
|
makeset(" [-----) "), # intersection
|
||||||
|
makeset("[-) [)")) # difference
|
||||||
|
|
||||||
eq_(makeset(" [---)") &
|
do_test(makeset("[---------)"),
|
||||||
|
makeset(" [---) "),
|
||||||
|
makeset(" [---) "), # intersection
|
||||||
|
makeset("[) [----)")) # difference
|
||||||
|
|
||||||
|
do_test(makeset(" [---) "),
|
||||||
|
makeset("[---------)"),
|
||||||
|
makeset(" [---) "), # intersection
|
||||||
|
makeset(" ")) # difference
|
||||||
|
|
||||||
|
do_test(makeset(" [-----)"),
|
||||||
|
makeset(" [-----) "),
|
||||||
|
makeset(" [--) "), # intersection
|
||||||
|
makeset(" [--)")) # difference
|
||||||
|
|
||||||
|
do_test(makeset(" [--) [--)"),
|
||||||
|
makeset(" [------) "),
|
||||||
|
makeset(" [-) [-) "), # intersection
|
||||||
|
makeset(" [) [)")) # difference
|
||||||
|
|
||||||
|
do_test(makeset(" [---)"),
|
||||||
makeset(" [--) "),
|
makeset(" [--) "),
|
||||||
makeset(" "))
|
makeset(" "), # intersection
|
||||||
|
makeset(" [---)")) # difference
|
||||||
|
|
||||||
eq_(makeset(" [-|---)") &
|
do_test(makeset(" [-|---)"),
|
||||||
makeset(" [-----|-) "),
|
makeset(" [-----|-) "),
|
||||||
makeset(" [----) "))
|
makeset(" [----) "), # intersection
|
||||||
|
makeset(" [)")) # difference
|
||||||
|
|
||||||
eq_(makeset(" [-|-) ") &
|
do_test(makeset(" [-|-) "),
|
||||||
makeset(" [-|--|--) "),
|
makeset(" [-|--|--) "),
|
||||||
makeset(" [---) "))
|
makeset(" [---) "), # intersection
|
||||||
|
makeset(" ")) # difference
|
||||||
|
|
||||||
|
do_test(makeset("[-)[-)[-)[)"),
|
||||||
|
makeset(" [) [|)[) "),
|
||||||
|
makeset(" [) [) "), # intersection
|
||||||
|
makeset("[) [-) [)[)")) # difference
|
||||||
|
|
||||||
# Border cases -- will give different results if intervals are
|
# Border cases -- will give different results if intervals are
|
||||||
# half open or fully closed. Right now, they are half open,
|
# half open or fully closed. In nilmdb, they are half open.
|
||||||
# although that's a little messy since the database intervals
|
do_test(makeset(" [---)"),
|
||||||
# often contain a data point at the endpoint.
|
|
||||||
half_open = True
|
|
||||||
if half_open:
|
|
||||||
eq_(makeset(" [---)") &
|
|
||||||
makeset(" [----) "),
|
makeset(" [----) "),
|
||||||
makeset(" "))
|
makeset(" "), # intersection
|
||||||
eq_(makeset(" [----)[--)") &
|
makeset(" [---)")) # difference
|
||||||
|
|
||||||
|
do_test(makeset(" [----)[--)"),
|
||||||
makeset("[-) [--) [)"),
|
makeset("[-) [--) [)"),
|
||||||
makeset(" [) [-) [)"))
|
makeset(" [) [-) [)"), # intersection
|
||||||
else:
|
makeset(" [-) [-) ")) # difference
|
||||||
eq_(makeset(" [---)") &
|
|
||||||
makeset(" [----) "),
|
# Set difference with bounds
|
||||||
makeset(" . "))
|
a = makeset(" [----)[--)")
|
||||||
eq_(makeset(" [----)[--)") &
|
b = makeset("[-) [--) [)")
|
||||||
makeset("[-) [--) [)"),
|
c = makeset("[----) ")
|
||||||
makeset(" [) [-). [)"))
|
d = makeset(" [-) ")
|
||||||
|
eq_(a.set_difference(b, list(c)[0]), d)
|
||||||
|
|
||||||
|
# Empty second set
|
||||||
|
eq_(a.set_difference(IntervalSet()), a)
|
||||||
|
|
||||||
class TestIntervalDB:
|
class TestIntervalDB:
|
||||||
def test_dbinterval(self):
|
def test_dbinterval(self):
|
||||||
@@ -371,4 +396,3 @@ class TestIntervalSpeed:
|
|||||||
aplotter.plot(speeds.keys(), speeds.values(), plot_slope=True)
|
aplotter.plot(speeds.keys(), speeds.values(), plot_slope=True)
|
||||||
yappi.stop()
|
yappi.stop()
|
||||||
yappi.print_stats(sort_type=yappi.SORTTYPE_TTOT, limit=10)
|
yappi.print_stats(sort_type=yappi.SORTTYPE_TTOT, limit=10)
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user