Compare commits
1 Commits
nilmdb-1.4
...
nilmdb-1.4
Author | SHA1 | Date | |
---|---|---|---|
89be6f5931 |
@@ -171,7 +171,8 @@ class Client(object):
|
|||||||
params["end"] = timestamp_to_string(end)
|
params["end"] = timestamp_to_string(end)
|
||||||
return self.http.get_gen("stream/intervals", params)
|
return self.http.get_gen("stream/intervals", params)
|
||||||
|
|
||||||
def stream_extract(self, path, start = None, end = None, count = False):
|
def stream_extract(self, path, start = None, end = None,
|
||||||
|
count = False, markup = False):
|
||||||
"""
|
"""
|
||||||
Extract data from a stream. Returns a generator that yields
|
Extract data from a stream. Returns a generator that yields
|
||||||
lines of ASCII-formatted data that matches the database
|
lines of ASCII-formatted data that matches the database
|
||||||
@@ -179,6 +180,9 @@ class Client(object):
|
|||||||
|
|
||||||
Specify count = True to return a count of matching data points
|
Specify count = True to return a count of matching data points
|
||||||
rather than the actual data. The output format is unchanged.
|
rather than the actual data. The output format is unchanged.
|
||||||
|
|
||||||
|
Specify markup = True to include comments in the returned data
|
||||||
|
that indicate interval starts and ends.
|
||||||
"""
|
"""
|
||||||
params = {
|
params = {
|
||||||
"path": path,
|
"path": path,
|
||||||
@@ -189,6 +193,8 @@ class Client(object):
|
|||||||
params["end"] = timestamp_to_string(end)
|
params["end"] = timestamp_to_string(end)
|
||||||
if count:
|
if count:
|
||||||
params["count"] = 1
|
params["count"] = 1
|
||||||
|
if markup:
|
||||||
|
params["markup"] = 1
|
||||||
return self.http.get_gen("stream/extract", params)
|
return self.http.get_gen("stream/extract", params)
|
||||||
|
|
||||||
def stream_count(self, path, start = None, end = None):
|
def stream_count(self, path, start = None, end = None):
|
||||||
|
@@ -29,6 +29,8 @@ def setup(self, sub):
|
|||||||
group.add_argument("-a", "--annotate", action="store_true",
|
group.add_argument("-a", "--annotate", action="store_true",
|
||||||
help="Include comments with some information "
|
help="Include comments with some information "
|
||||||
"about the stream")
|
"about the stream")
|
||||||
|
group.add_argument("-m", "--markup", action="store_true",
|
||||||
|
help="Include comments with interval starts and ends")
|
||||||
group.add_argument("-T", "--timestamp-raw", action="store_true",
|
group.add_argument("-T", "--timestamp-raw", action="store_true",
|
||||||
help="Show raw timestamps in annotated information")
|
help="Show raw timestamps in annotated information")
|
||||||
group.add_argument("-c", "--count", action="store_true",
|
group.add_argument("-c", "--count", action="store_true",
|
||||||
@@ -61,7 +63,8 @@ def cmd_extract(self):
|
|||||||
for dataline in self.client.stream_extract(self.args.path,
|
for dataline in self.client.stream_extract(self.args.path,
|
||||||
self.args.start,
|
self.args.start,
|
||||||
self.args.end,
|
self.args.end,
|
||||||
self.args.count):
|
self.args.count,
|
||||||
|
self.args.markup):
|
||||||
if self.args.bare and not self.args.count:
|
if self.args.bare and not self.args.count:
|
||||||
# Strip timestamp (first element). Doesn't make sense
|
# Strip timestamp (first element). Doesn't make sense
|
||||||
# if we are only returning a count.
|
# if we are only returning a count.
|
||||||
|
@@ -12,6 +12,7 @@ Manages both the SQL database and the table storage backend.
|
|||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
import nilmdb.utils
|
import nilmdb.utils
|
||||||
from nilmdb.utils.printf import *
|
from nilmdb.utils.printf import *
|
||||||
|
from nilmdb.utils.time import timestamp_to_string
|
||||||
|
|
||||||
from nilmdb.utils.interval import IntervalError
|
from nilmdb.utils.interval import IntervalError
|
||||||
from nilmdb.server.interval import Interval, DBInterval, IntervalSet
|
from nilmdb.server.interval import Interval, DBInterval, IntervalSet
|
||||||
@@ -533,7 +534,8 @@ class NilmDB(object):
|
|||||||
dbinterval.db_startpos,
|
dbinterval.db_startpos,
|
||||||
dbinterval.db_endpos)
|
dbinterval.db_endpos)
|
||||||
|
|
||||||
def stream_extract(self, path, start = None, end = None, count = False):
|
def stream_extract(self, path, start = None, end = None,
|
||||||
|
count = False, markup = False):
|
||||||
"""
|
"""
|
||||||
Returns (data, restart) tuple.
|
Returns (data, restart) tuple.
|
||||||
|
|
||||||
@@ -546,10 +548,14 @@ class NilmDB(object):
|
|||||||
and a new request with a start time of 'restart' will fetch
|
and a new request with a start time of 'restart' will fetch
|
||||||
the next block of data.
|
the next block of data.
|
||||||
|
|
||||||
count, if true, means to not return raw data, but just the count
|
'count', if true, means to not return raw data, but just the count
|
||||||
of rows that would have been returned. This is much faster
|
of rows that would have been returned. This is much faster
|
||||||
than actually fetching the data. It is not limited by
|
than actually fetching the data. It is not limited by
|
||||||
max_results.
|
max_results.
|
||||||
|
|
||||||
|
'markup', if true, indicates that returned data should be
|
||||||
|
marked with a comment denoting when a particular interval
|
||||||
|
starts, and another comment when an interval ends.
|
||||||
"""
|
"""
|
||||||
stream_id = self._stream_id(path)
|
stream_id = self._stream_id(path)
|
||||||
table = self.data.getnode(path)
|
table = self.data.getnode(path)
|
||||||
@@ -578,14 +584,26 @@ class NilmDB(object):
|
|||||||
row_end = row_max
|
row_end = row_max
|
||||||
restart = table[row_max]
|
restart = table[row_max]
|
||||||
|
|
||||||
|
# Add markup
|
||||||
|
if markup:
|
||||||
|
result.append("# interval-start " +
|
||||||
|
timestamp_to_string(interval.start) + "\n")
|
||||||
|
|
||||||
# Gather these results up
|
# Gather these results up
|
||||||
result.append(table.get_data(row_start, row_end))
|
result.append(table.get_data(row_start, row_end))
|
||||||
|
|
||||||
# Count them
|
# Count them
|
||||||
remaining -= row_end - row_start
|
remaining -= row_end - row_start
|
||||||
|
|
||||||
|
# Add markup, and exit if restart is set.
|
||||||
if restart is not None:
|
if restart is not None:
|
||||||
|
if markup:
|
||||||
|
result.append("# interval-end " +
|
||||||
|
timestamp_to_string(restart) + "\n")
|
||||||
break
|
break
|
||||||
|
if markup:
|
||||||
|
result.append("# interval-end " +
|
||||||
|
timestamp_to_string(interval.end) + "\n")
|
||||||
|
|
||||||
if count:
|
if count:
|
||||||
return matched
|
return matched
|
||||||
|
@@ -402,14 +402,18 @@ class Stream(NilmApp):
|
|||||||
@cherrypy.expose
|
@cherrypy.expose
|
||||||
@chunked_response
|
@chunked_response
|
||||||
@response_type("text/plain")
|
@response_type("text/plain")
|
||||||
def extract(self, path, start = None, end = None, count = False):
|
def extract(self, path, start = None, end = None,
|
||||||
|
count = False, markup = False):
|
||||||
"""
|
"""
|
||||||
Extract data from backend database. Streams the resulting
|
Extract data from backend database. Streams the resulting
|
||||||
entries as ASCII text lines separated by newlines. This may
|
entries as ASCII text lines separated by newlines. This may
|
||||||
make multiple requests to the nilmdb backend to avoid causing
|
make multiple requests to the nilmdb backend to avoid causing
|
||||||
it to block for too long.
|
it to block for too long.
|
||||||
|
|
||||||
Add count=True to return a count rather than actual data.
|
If 'count' is True, returns a count rather than actual data.
|
||||||
|
|
||||||
|
If 'markup' is True, adds comments to the stream denoting each
|
||||||
|
interval's start and end timestamp.
|
||||||
"""
|
"""
|
||||||
if start is not None:
|
if start is not None:
|
||||||
start = string_to_timestamp(start)
|
start = string_to_timestamp(start)
|
||||||
@@ -428,21 +432,23 @@ class Stream(NilmApp):
|
|||||||
raise cherrypy.HTTPError("404 Not Found", "No such stream")
|
raise cherrypy.HTTPError("404 Not Found", "No such stream")
|
||||||
|
|
||||||
@workaround_cp_bug_1200
|
@workaround_cp_bug_1200
|
||||||
def content(start, end, count):
|
def content(start, end):
|
||||||
# Note: disable chunked responses to see tracebacks from here.
|
# Note: disable chunked responses to see tracebacks from here.
|
||||||
if count:
|
if count:
|
||||||
matched = self.db.stream_extract(path, start, end, count)
|
matched = self.db.stream_extract(path, start, end,
|
||||||
|
count = True)
|
||||||
yield sprintf("%d\n", matched)
|
yield sprintf("%d\n", matched)
|
||||||
return
|
return
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
(data, restart) = self.db.stream_extract(path, start, end)
|
(data, restart) = self.db.stream_extract(
|
||||||
|
path, start, end, count = False, markup = markup)
|
||||||
yield data
|
yield data
|
||||||
|
|
||||||
if restart is None:
|
if restart is None:
|
||||||
return
|
return
|
||||||
start = restart
|
start = restart
|
||||||
return content(start, end, count)
|
return content(start, end)
|
||||||
|
|
||||||
class Exiter(object):
|
class Exiter(object):
|
||||||
"""App that exits the server, for testing"""
|
"""App that exits the server, for testing"""
|
||||||
|
28
tests/data/extract-8
Normal file
28
tests/data/extract-8
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
# interval-start 1332496919900000
|
||||||
|
1332496919900000 2.523050e+05 2.254020e+05 4.779410e+03 3.638030e+03 8.138070e+03 4.334460e+03 1.083780e+03 3.743730e+03
|
||||||
|
1332496919908333 2.551190e+05 2.237870e+05 5.965640e+03 2.076350e+03 9.468790e+03 3.693880e+03 1.247860e+03 3.393680e+03
|
||||||
|
1332496919916667 2.616370e+05 2.247980e+05 4.848970e+03 2.315620e+03 9.323300e+03 4.225460e+03 1.805780e+03 2.593050e+03
|
||||||
|
1332496919925000 2.606460e+05 2.251300e+05 3.061360e+03 3.951840e+03 7.662910e+03 5.341410e+03 1.986520e+03 2.276780e+03
|
||||||
|
1332496919933333 2.559710e+05 2.235030e+05 4.096030e+03 3.296970e+03 7.827080e+03 5.452120e+03 2.492520e+03 2.929450e+03
|
||||||
|
1332496919941667 2.579260e+05 2.217080e+05 5.472320e+03 1.555700e+03 8.495760e+03 4.491140e+03 2.379780e+03 3.741710e+03
|
||||||
|
1332496919950000 2.610180e+05 2.242350e+05 4.669770e+03 1.876190e+03 8.366680e+03 3.677510e+03 9.021690e+02 3.549040e+03
|
||||||
|
1332496919958333 2.569150e+05 2.274650e+05 2.785070e+03 3.751930e+03 7.440320e+03 3.964860e+03 -3.227860e+02 2.460890e+03
|
||||||
|
1332496919966667 2.509510e+05 2.262000e+05 3.772710e+03 3.131950e+03 8.159860e+03 4.539860e+03 7.375190e+02 2.126750e+03
|
||||||
|
1332496919975000 2.556710e+05 2.223720e+05 5.826200e+03 8.715560e+02 9.120240e+03 4.545110e+03 2.804310e+03 2.721000e+03
|
||||||
|
1332496919983333 2.649730e+05 2.214860e+05 5.839130e+03 4.659180e+02 8.628300e+03 3.934870e+03 2.972490e+03 3.773730e+03
|
||||||
|
1332496919991667 2.652170e+05 2.233920e+05 3.718770e+03 2.834970e+03 7.209900e+03 3.460260e+03 1.324930e+03 4.075960e+03
|
||||||
|
# interval-end 1332496919991668
|
||||||
|
# interval-start 1332496920000000
|
||||||
|
1332496920000000 2.564370e+05 2.244300e+05 4.011610e+03 3.475340e+03 7.495890e+03 3.388940e+03 2.613970e+02 3.731260e+03
|
||||||
|
1332496920008333 2.539630e+05 2.241670e+05 5.621070e+03 1.548010e+03 9.165170e+03 3.522930e+03 1.058930e+03 2.996960e+03
|
||||||
|
1332496920016667 2.585080e+05 2.249300e+05 6.011400e+03 8.188660e+02 9.039950e+03 4.482440e+03 2.490390e+03 2.679340e+03
|
||||||
|
1332496920025000 2.596270e+05 2.260220e+05 4.474500e+03 2.423020e+03 7.414190e+03 5.071970e+03 2.439380e+03 2.962960e+03
|
||||||
|
1332496920033333 2.551870e+05 2.246320e+05 4.738570e+03 3.398040e+03 7.395120e+03 4.726450e+03 1.839030e+03 3.393530e+03
|
||||||
|
1332496920041667 2.571020e+05 2.216230e+05 6.144130e+03 1.441090e+03 8.756480e+03 3.495320e+03 1.869940e+03 3.752530e+03
|
||||||
|
1332496920050000 2.636530e+05 2.217700e+05 6.221770e+03 7.389620e+02 9.547600e+03 2.666820e+03 1.462660e+03 3.332570e+03
|
||||||
|
1332496920058333 2.636130e+05 2.252560e+05 4.477120e+03 2.437450e+03 8.510210e+03 3.855630e+03 9.594420e+02 2.387180e+03
|
||||||
|
1332496920066667 2.553500e+05 2.262640e+05 4.283720e+03 3.923940e+03 7.912470e+03 5.466520e+03 1.284990e+03 2.093720e+03
|
||||||
|
1332496920075000 2.527270e+05 2.246090e+05 5.851930e+03 2.491980e+03 8.540630e+03 5.623050e+03 2.339780e+03 3.007140e+03
|
||||||
|
1332496920083333 2.584750e+05 2.235780e+05 5.924870e+03 1.394480e+03 8.779620e+03 4.544180e+03 2.132030e+03 3.849760e+03
|
||||||
|
1332496920091667 2.615630e+05 2.246090e+05 4.336140e+03 2.455750e+03 8.055380e+03 3.469110e+03 6.278730e+02 3.664200e+03
|
||||||
|
# interval-end 1332496920100000
|
@@ -596,6 +596,8 @@ class TestCmdline(object):
|
|||||||
test(6, "10:00:30", "10:00:31", extra="-b")
|
test(6, "10:00:30", "10:00:31", extra="-b")
|
||||||
test(7, "10:00:30", "10:00:30.999", extra="-a -T")
|
test(7, "10:00:30", "10:00:30.999", extra="-a -T")
|
||||||
test(7, "10:00:30", "10:00:30.999", extra="-a --timestamp-raw")
|
test(7, "10:00:30", "10:00:30.999", extra="-a --timestamp-raw")
|
||||||
|
test(8, "10:01:59.9", "10:02:00.1", extra="--markup")
|
||||||
|
test(8, "10:01:59.9", "10:02:00.1", extra="-m")
|
||||||
|
|
||||||
# all data put in by tests
|
# all data put in by tests
|
||||||
self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01")
|
self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||||
@@ -603,6 +605,11 @@ class TestCmdline(object):
|
|||||||
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
|
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||||
self.match("43200\n")
|
self.match("43200\n")
|
||||||
|
|
||||||
|
# markup for 3 intervals, plus extra markup lines whenever we had
|
||||||
|
# a "restart" from the nilmdb.stream_extract function
|
||||||
|
self.ok("extract -m /newton/prep --start 2000-01-01 --end 2020-01-01")
|
||||||
|
lines_(self.captured, 43210)
|
||||||
|
|
||||||
def test_09_truncated(self):
|
def test_09_truncated(self):
|
||||||
# Test truncated responses by overriding the nilmdb max_results
|
# Test truncated responses by overriding the nilmdb max_results
|
||||||
server_stop()
|
server_stop()
|
||||||
|
Reference in New Issue
Block a user