Compare commits

...

8 Commits

Author SHA1 Message Date
89be6f5931 Add option to include interval start/end markup on extract
When enabled, lines like "# interval-start 1234567890123456" and "#
interval-end 1234567890123456" will be added to the data output.  Note
that there may be an "interval-end" timestamp followed by an identical
"interval-start" timestamp, if the response at the nilmdb level was
split up into multiple chunks.

In general, assume contiguous data if previous_interval_end ==
new_interval_start.
2013-03-19 14:23:33 -04:00
4cdef3285d Destroy now requires that all data has been previously removed.
Added new flag "-R" to command line to perform an automatic removal.
This should be the last of the ways in which a single command could
block the nilmdb thread for a long time.
2013-03-18 19:39:03 -04:00
bcd82c4d59 Limit the number of rows removed per call to nilmdb.stream_remove
Server class will retry as needed, as with stream_extract and
stream_intervals.
2013-03-18 18:22:45 -04:00
caf63ab01f Fix stream_extract/stream_intervals restart around timestamp == 0. 2013-03-18 18:20:25 -04:00
2d72891162 Accept "min" and "max" as timestamps on command line 2013-03-18 18:19:24 -04:00
cda2ac3e77 Don't return a mutable interval from IntervalSet.intersection()
Instead, always take the subset, which creates a new interval.
Also adds a small optimization by moving the 'if orig' check outside the
loop.
2013-03-18 18:16:35 -04:00
57d3d60f6a Fix relative import problems 2013-03-18 16:27:27 -04:00
d6b5befe76 Don't use filenames as default arg completion 2013-03-16 17:27:58 -04:00
11 changed files with 221 additions and 79 deletions

View File

@@ -17,4 +17,4 @@ _nilmtool_argcomplete() {
unset COMPREPLY unset COMPREPLY
fi fi
} }
complete -o nospace -o default -F _nilmtool_argcomplete nilmtool complete -o nospace -F _nilmtool_argcomplete nilmtool

View File

@@ -97,7 +97,7 @@ class Client(object):
return self.http.post("stream/create", params) return self.http.post("stream/create", params)
def stream_destroy(self, path): def stream_destroy(self, path):
"""Delete stream and its contents""" """Delete stream. Fails if any data is still present."""
params = { "path": path } params = { "path": path }
return self.http.post("stream/destroy", params) return self.http.post("stream/destroy", params)
@@ -171,7 +171,8 @@ class Client(object):
params["end"] = timestamp_to_string(end) params["end"] = timestamp_to_string(end)
return self.http.get_gen("stream/intervals", params) return self.http.get_gen("stream/intervals", params)
def stream_extract(self, path, start = None, end = None, count = False): def stream_extract(self, path, start = None, end = None,
count = False, markup = False):
""" """
Extract data from a stream. Returns a generator that yields Extract data from a stream. Returns a generator that yields
lines of ASCII-formatted data that matches the database lines of ASCII-formatted data that matches the database
@@ -179,6 +180,9 @@ class Client(object):
Specify count = True to return a count of matching data points Specify count = True to return a count of matching data points
rather than the actual data. The output format is unchanged. rather than the actual data. The output format is unchanged.
Specify markup = True to include comments in the returned data
that indicate interval starts and ends.
""" """
params = { params = {
"path": path, "path": path,
@@ -189,6 +193,8 @@ class Client(object):
params["end"] = timestamp_to_string(end) params["end"] = timestamp_to_string(end)
if count: if count:
params["count"] = 1 params["count"] = 1
if markup:
params["markup"] = 1
return self.http.get_gen("stream/extract", params) return self.http.get_gen("stream/extract", params)
def stream_count(self, path, start = None, end = None): def stream_count(self, path, start = None, end = None):

View File

@@ -7,11 +7,14 @@ def setup(self, sub):
cmd = sub.add_parser("destroy", help="Delete a stream and all data", cmd = sub.add_parser("destroy", help="Delete a stream and all data",
formatter_class = def_form, formatter_class = def_form,
description=""" description="""
Destroy the stream at the specified path. All Destroy the stream at the specified path.
data and metadata related to the stream is The stream must be empty. All metadata
permanently deleted. related to the stream is permanently deleted.
""") """)
cmd.set_defaults(handler = cmd_destroy) cmd.set_defaults(handler = cmd_destroy)
group = cmd.add_argument_group("Options")
group.add_argument("-R", "--remove", action="store_true",
help="Remove all data before destroying stream")
group = cmd.add_argument_group("Required arguments") group = cmd.add_argument_group("Required arguments")
group.add_argument("path", group.add_argument("path",
help="Path of the stream to delete, e.g. /foo/bar", help="Path of the stream to delete, e.g. /foo/bar",
@@ -20,6 +23,11 @@ def setup(self, sub):
def cmd_destroy(self): def cmd_destroy(self):
"""Destroy stream""" """Destroy stream"""
if self.args.remove:
try:
count = self.client.stream_remove(self.args.path)
except nilmdb.client.ClientError as e:
self.die("error removing data: %s", str(e))
try: try:
self.client.stream_destroy(self.args.path) self.client.stream_destroy(self.args.path)
except nilmdb.client.ClientError as e: except nilmdb.client.ClientError as e:

View File

@@ -29,6 +29,8 @@ def setup(self, sub):
group.add_argument("-a", "--annotate", action="store_true", group.add_argument("-a", "--annotate", action="store_true",
help="Include comments with some information " help="Include comments with some information "
"about the stream") "about the stream")
group.add_argument("-m", "--markup", action="store_true",
help="Include comments with interval starts and ends")
group.add_argument("-T", "--timestamp-raw", action="store_true", group.add_argument("-T", "--timestamp-raw", action="store_true",
help="Show raw timestamps in annotated information") help="Show raw timestamps in annotated information")
group.add_argument("-c", "--count", action="store_true", group.add_argument("-c", "--count", action="store_true",
@@ -61,7 +63,8 @@ def cmd_extract(self):
for dataline in self.client.stream_extract(self.args.path, for dataline in self.client.stream_extract(self.args.path,
self.args.start, self.args.start,
self.args.end, self.args.end,
self.args.count): self.args.count,
self.args.markup):
if self.args.bare and not self.args.count: if self.args.bare and not self.args.count:
# Strip timestamp (first element). Doesn't make sense # Strip timestamp (first element). Doesn't make sense
# if we are only returning a count. # if we are only returning a count.

View File

@@ -286,22 +286,17 @@ cdef class IntervalSet:
(potentially) subsetted to make the one that is being (potentially) subsetted to make the one that is being
returned. returned.
""" """
if not isinstance(interval, Interval): if orig:
raise TypeError("bad type")
for n in self.tree.intersect(interval.start, interval.end): for n in self.tree.intersect(interval.start, interval.end):
i = n.obj i = n.obj
if i:
if i.start >= interval.start and i.end <= interval.end:
if orig:
yield (i, i)
else:
yield i
else:
subset = i.subset(max(i.start, interval.start), subset = i.subset(max(i.start, interval.start),
min(i.end, interval.end)) min(i.end, interval.end))
if orig:
yield (subset, i) yield (subset, i)
else: else:
for n in self.tree.intersect(interval.start, interval.end):
i = n.obj
subset = i.subset(max(i.start, interval.start),
min(i.end, interval.end))
yield subset yield subset
cpdef intersects(self, Interval other): cpdef intersects(self, Interval other):

View File

@@ -12,6 +12,7 @@ Manages both the SQL database and the table storage backend.
from __future__ import absolute_import from __future__ import absolute_import
import nilmdb.utils import nilmdb.utils
from nilmdb.utils.printf import * from nilmdb.utils.printf import *
from nilmdb.utils.time import timestamp_to_string
from nilmdb.utils.interval import IntervalError from nilmdb.utils.interval import IntervalError
from nilmdb.server.interval import Interval, DBInterval, IntervalSet from nilmdb.server.interval import Interval, DBInterval, IntervalSet
@@ -83,7 +84,18 @@ class NilmDB(object):
verbose = 0 verbose = 0
def __init__(self, basepath, max_results=None, def __init__(self, basepath, max_results=None,
bulkdata_args=None): max_removals=None, bulkdata_args=None):
"""Initialize NilmDB at the given basepath.
Other arguments are for debugging / testing:
'max_results' is the max rows to send in a single
stream_intervals or stream_extract response.
'max_removals' is the max rows to delete at once
in stream_move.
'bulkdata_args' is kwargs for the bulkdata module.
"""
if bulkdata_args is None: if bulkdata_args is None:
bulkdata_args = {} bulkdata_args = {}
@@ -113,11 +125,11 @@ class NilmDB(object):
self.con.execute("PRAGMA journal_mode=WAL") self.con.execute("PRAGMA journal_mode=WAL")
# Approximate largest number of elements that we want to send # Approximate largest number of elements that we want to send
# in a single reply (for stream_intervals, stream_extract) # in a single reply (for stream_intervals, stream_extract).
if max_results: self.max_results = max_results or 16384
self.max_results = max_results
else: # Remove up to this many rows per call to stream_remove.
self.max_results = 16384 self.max_removals = max_removals or 1048576
def get_basepath(self): def get_basepath(self):
return self.basepath return self.basepath
@@ -334,14 +346,14 @@ class NilmDB(object):
Returns (intervals, restart) tuple. Returns (intervals, restart) tuple.
intervals is a list of [start,end] timestamps of all intervals 'intervals' is a list of [start,end] timestamps of all intervals
that exist for path, between start and end. that exist for path, between start and end.
restart, if nonzero, means that there were too many results to 'restart', if not None, means that there were too many results
return in a single request. The data is complete from the to return in a single request. The data is complete from the
starting timestamp to the point at which it was truncated, starting timestamp to the point at which it was truncated, and
and a new request with a start time of 'restart' will fetch a new request with a start time of 'restart' will fetch the
the next block of data. next block of data.
""" """
stream_id = self._stream_id(path) stream_id = self._stream_id(path)
intervals = self._get_intervals(stream_id) intervals = self._get_intervals(stream_id)
@@ -363,7 +375,7 @@ class NilmDB(object):
break break
result.append([i.start, i.end]) result.append([i.start, i.end])
else: else:
restart = 0 restart = None
return (result, restart) return (result, restart)
def stream_create(self, path, layout_name): def stream_create(self, path, layout_name):
@@ -439,17 +451,22 @@ class NilmDB(object):
(newpath, stream_id)) (newpath, stream_id))
def stream_destroy(self, path): def stream_destroy(self, path):
"""Fully remove a table and all of its data from the database. """Fully remove a table from the database. Fails if there are
No way to undo it! Metadata is removed.""" any intervals data present; remove them first. Metadata is
also removed."""
stream_id = self._stream_id(path) stream_id = self._stream_id(path)
# Delete the cached interval data (if it was cached) # Verify that no intervals are present, and clear the cache
iset = self._get_intervals(stream_id)
if len(iset):
raise NilmDBError("all intervals must be removed before "
"destroying a stream")
self._get_intervals.cache_remove(self, stream_id) self._get_intervals.cache_remove(self, stream_id)
# Delete the data # Delete the bulkdata storage
self.data.destroy(path) self.data.destroy(path)
# Delete metadata, stream, intervals # Delete metadata, stream, intervals (should be none)
with self.con as con: with self.con as con:
con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,)) con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,))
con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,)) con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
@@ -517,23 +534,28 @@ class NilmDB(object):
dbinterval.db_startpos, dbinterval.db_startpos,
dbinterval.db_endpos) dbinterval.db_endpos)
def stream_extract(self, path, start = None, end = None, count = False): def stream_extract(self, path, start = None, end = None,
count = False, markup = False):
""" """
Returns (data, restart) tuple. Returns (data, restart) tuple.
data is ASCII-formatted data from the database, formatted 'data' is ASCII-formatted data from the database, formatted
according to the layout of the stream. according to the layout of the stream.
restart, if nonzero, means that there were too many results to 'restart', if not None, means that there were too many results to
return in a single request. The data is complete from the return in a single request. The data is complete from the
starting timestamp to the point at which it was truncated, starting timestamp to the point at which it was truncated,
and a new request with a start time of 'restart' will fetch and a new request with a start time of 'restart' will fetch
the next block of data. the next block of data.
count, if true, means to not return raw data, but just the count 'count', if true, means to not return raw data, but just the count
of rows that would have been returned. This is much faster of rows that would have been returned. This is much faster
than actually fetching the data. It is not limited by than actually fetching the data. It is not limited by
max_results. max_results.
'markup', if true, indicates that returned data should be
marked with a comment denoting when a particular interval
starts, and another comment when an interval ends.
""" """
stream_id = self._stream_id(path) stream_id = self._stream_id(path)
table = self.data.getnode(path) table = self.data.getnode(path)
@@ -543,7 +565,7 @@ class NilmDB(object):
result = [] result = []
matched = 0 matched = 0
remaining = self.max_results remaining = self.max_results
restart = 0 restart = None
for interval in intervals.intersection(requested): for interval in intervals.intersection(requested):
# Reading single rows from the table is too slow, so # Reading single rows from the table is too slow, so
# we use two bisections to find both the starting and # we use two bisections to find both the starting and
@@ -562,14 +584,26 @@ class NilmDB(object):
row_end = row_max row_end = row_max
restart = table[row_max] restart = table[row_max]
# Add markup
if markup:
result.append("# interval-start " +
timestamp_to_string(interval.start) + "\n")
# Gather these results up # Gather these results up
result.append(table.get_data(row_start, row_end)) result.append(table.get_data(row_start, row_end))
# Count them # Count them
remaining -= row_end - row_start remaining -= row_end - row_start
if restart: # Add markup, and exit if restart is set.
if restart is not None:
if markup:
result.append("# interval-end " +
timestamp_to_string(restart) + "\n")
break break
if markup:
result.append("# interval-end " +
timestamp_to_string(interval.end) + "\n")
if count: if count:
return matched return matched
@@ -578,9 +612,17 @@ class NilmDB(object):
def stream_remove(self, path, start = None, end = None): def stream_remove(self, path, start = None, end = None):
""" """
Remove data from the specified time interval within a stream. Remove data from the specified time interval within a stream.
Removes all data in the interval [start, end), and intervals
are truncated or split appropriately. Returns the number of Removes data in the interval [start, end), and intervals are
data points removed. truncated or split appropriately.
Returns a (removed, restart) tuple.
'removed' is the number of data points that were removed.
'restart', if not None, means there were too many rows to
remove in a single request. This function should be called
again with a start time of 'restart' to complete the removal.
""" """
stream_id = self._stream_id(path) stream_id = self._stream_id(path)
table = self.data.getnode(path) table = self.data.getnode(path)
@@ -588,6 +630,8 @@ class NilmDB(object):
(start, end) = self._check_user_times(start, end) (start, end) = self._check_user_times(start, end)
to_remove = Interval(start, end) to_remove = Interval(start, end)
removed = 0 removed = 0
remaining = self.max_removals
restart = None
# Can't remove intervals from within the iterator, so we need to # Can't remove intervals from within the iterator, so we need to
# remember what's currently in the intersection now. # remember what's currently in the intersection now.
@@ -598,6 +642,13 @@ class NilmDB(object):
row_start = self._find_start(table, dbint) row_start = self._find_start(table, dbint)
row_end = self._find_end(table, dbint) row_end = self._find_end(table, dbint)
# Shorten it if we'll hit the maximum number of removals
row_max = row_start + remaining
if row_max < row_end:
row_end = row_max
dbint.end = table[row_max]
restart = dbint.end
# Adjust the DBInterval to match the newly found ends # Adjust the DBInterval to match the newly found ends
dbint.db_start = dbint.start dbint.db_start = dbint.start
dbint.db_end = dbint.end dbint.db_end = dbint.end
@@ -613,4 +664,7 @@ class NilmDB(object):
# Count how many were removed # Count how many were removed
removed += row_end - row_start removed += row_end - row_start
return removed if restart is not None:
break
return (removed, restart)

View File

@@ -210,7 +210,7 @@ class Stream(NilmApp):
@exception_to_httperror(NilmDBError) @exception_to_httperror(NilmDBError)
@cherrypy.tools.CORS_allow(methods = ["POST"]) @cherrypy.tools.CORS_allow(methods = ["POST"])
def destroy(self, path): def destroy(self, path):
"""Delete a stream and its associated data.""" """Delete a stream. Fails if any data is still present."""
return self.db.stream_destroy(path) return self.db.stream_destroy(path)
# /stream/rename?oldpath=/newton/prep&newpath=/newton/prep/1 # /stream/rename?oldpath=/newton/prep&newpath=/newton/prep/1
@@ -339,7 +339,14 @@ class Stream(NilmApp):
if start >= end: if start >= end:
raise cherrypy.HTTPError("400 Bad Request", raise cherrypy.HTTPError("400 Bad Request",
"start must precede end") "start must precede end")
return self.db.stream_remove(path, start, end) total_removed = 0
while True:
(removed, restart) = self.db.stream_remove(path, start, end)
total_removed += removed
if restart is None:
break
start = restart
return total_removed
# /stream/intervals?path=/newton/prep # /stream/intervals?path=/newton/prep
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0 # /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
@@ -386,7 +393,7 @@ class Stream(NilmApp):
diffpath) diffpath)
response = ''.join([ json.dumps(i) + "\r\n" for i in ints ]) response = ''.join([ json.dumps(i) + "\r\n" for i in ints ])
yield response yield response
if restart == 0: if restart is None:
break break
start = restart start = restart
return content(start, end) return content(start, end)
@@ -395,14 +402,18 @@ class Stream(NilmApp):
@cherrypy.expose @cherrypy.expose
@chunked_response @chunked_response
@response_type("text/plain") @response_type("text/plain")
def extract(self, path, start = None, end = None, count = False): def extract(self, path, start = None, end = None,
count = False, markup = False):
""" """
Extract data from backend database. Streams the resulting Extract data from backend database. Streams the resulting
entries as ASCII text lines separated by newlines. This may entries as ASCII text lines separated by newlines. This may
make multiple requests to the nilmdb backend to avoid causing make multiple requests to the nilmdb backend to avoid causing
it to block for too long. it to block for too long.
Add count=True to return a count rather than actual data. If 'count' is True, returns a count rather than actual data.
If 'markup' is True, adds comments to the stream denoting each
interval's start and end timestamp.
""" """
if start is not None: if start is not None:
start = string_to_timestamp(start) start = string_to_timestamp(start)
@@ -421,21 +432,23 @@ class Stream(NilmApp):
raise cherrypy.HTTPError("404 Not Found", "No such stream") raise cherrypy.HTTPError("404 Not Found", "No such stream")
@workaround_cp_bug_1200 @workaround_cp_bug_1200
def content(start, end, count): def content(start, end):
# Note: disable chunked responses to see tracebacks from here. # Note: disable chunked responses to see tracebacks from here.
if count: if count:
matched = self.db.stream_extract(path, start, end, count) matched = self.db.stream_extract(path, start, end,
count = True)
yield sprintf("%d\n", matched) yield sprintf("%d\n", matched)
return return
while True: while True:
(data, restart) = self.db.stream_extract(path, start, end) (data, restart) = self.db.stream_extract(
path, start, end, count = False, markup = markup)
yield data yield data
if restart == 0: if restart is None:
return return
start = restart start = restart
return content(start, end, count) return content(start, end)
class Exiter(object): class Exiter(object):
"""App that exits the server, for testing""" """App that exits the server, for testing"""

View File

@@ -1,3 +1,5 @@
from __future__ import absolute_import
from nilmdb.utils import datetime_tz from nilmdb.utils import datetime_tz
import re import re
import time import time
@@ -58,6 +60,11 @@ def parse_time(toparse):
timestamp, the current local timezone is assumed (e.g. from the TZ timestamp, the current local timezone is assumed (e.g. from the TZ
env var). env var).
""" """
if toparse == "min":
return min_timestamp
if toparse == "max":
return max_timestamp
# If string isn't "now" and doesn't contain at least 4 digits, # If string isn't "now" and doesn't contain at least 4 digits,
# consider it invalid. smartparse might otherwise accept # consider it invalid. smartparse might otherwise accept
# empty strings and strings with just separators. # empty strings and strings with just separators.

28
tests/data/extract-8 Normal file
View File

@@ -0,0 +1,28 @@
# interval-start 1332496919900000
1332496919900000 2.523050e+05 2.254020e+05 4.779410e+03 3.638030e+03 8.138070e+03 4.334460e+03 1.083780e+03 3.743730e+03
1332496919908333 2.551190e+05 2.237870e+05 5.965640e+03 2.076350e+03 9.468790e+03 3.693880e+03 1.247860e+03 3.393680e+03
1332496919916667 2.616370e+05 2.247980e+05 4.848970e+03 2.315620e+03 9.323300e+03 4.225460e+03 1.805780e+03 2.593050e+03
1332496919925000 2.606460e+05 2.251300e+05 3.061360e+03 3.951840e+03 7.662910e+03 5.341410e+03 1.986520e+03 2.276780e+03
1332496919933333 2.559710e+05 2.235030e+05 4.096030e+03 3.296970e+03 7.827080e+03 5.452120e+03 2.492520e+03 2.929450e+03
1332496919941667 2.579260e+05 2.217080e+05 5.472320e+03 1.555700e+03 8.495760e+03 4.491140e+03 2.379780e+03 3.741710e+03
1332496919950000 2.610180e+05 2.242350e+05 4.669770e+03 1.876190e+03 8.366680e+03 3.677510e+03 9.021690e+02 3.549040e+03
1332496919958333 2.569150e+05 2.274650e+05 2.785070e+03 3.751930e+03 7.440320e+03 3.964860e+03 -3.227860e+02 2.460890e+03
1332496919966667 2.509510e+05 2.262000e+05 3.772710e+03 3.131950e+03 8.159860e+03 4.539860e+03 7.375190e+02 2.126750e+03
1332496919975000 2.556710e+05 2.223720e+05 5.826200e+03 8.715560e+02 9.120240e+03 4.545110e+03 2.804310e+03 2.721000e+03
1332496919983333 2.649730e+05 2.214860e+05 5.839130e+03 4.659180e+02 8.628300e+03 3.934870e+03 2.972490e+03 3.773730e+03
1332496919991667 2.652170e+05 2.233920e+05 3.718770e+03 2.834970e+03 7.209900e+03 3.460260e+03 1.324930e+03 4.075960e+03
# interval-end 1332496919991668
# interval-start 1332496920000000
1332496920000000 2.564370e+05 2.244300e+05 4.011610e+03 3.475340e+03 7.495890e+03 3.388940e+03 2.613970e+02 3.731260e+03
1332496920008333 2.539630e+05 2.241670e+05 5.621070e+03 1.548010e+03 9.165170e+03 3.522930e+03 1.058930e+03 2.996960e+03
1332496920016667 2.585080e+05 2.249300e+05 6.011400e+03 8.188660e+02 9.039950e+03 4.482440e+03 2.490390e+03 2.679340e+03
1332496920025000 2.596270e+05 2.260220e+05 4.474500e+03 2.423020e+03 7.414190e+03 5.071970e+03 2.439380e+03 2.962960e+03
1332496920033333 2.551870e+05 2.246320e+05 4.738570e+03 3.398040e+03 7.395120e+03 4.726450e+03 1.839030e+03 3.393530e+03
1332496920041667 2.571020e+05 2.216230e+05 6.144130e+03 1.441090e+03 8.756480e+03 3.495320e+03 1.869940e+03 3.752530e+03
1332496920050000 2.636530e+05 2.217700e+05 6.221770e+03 7.389620e+02 9.547600e+03 2.666820e+03 1.462660e+03 3.332570e+03
1332496920058333 2.636130e+05 2.252560e+05 4.477120e+03 2.437450e+03 8.510210e+03 3.855630e+03 9.594420e+02 2.387180e+03
1332496920066667 2.553500e+05 2.262640e+05 4.283720e+03 3.923940e+03 7.912470e+03 5.466520e+03 1.284990e+03 2.093720e+03
1332496920075000 2.527270e+05 2.246090e+05 5.851930e+03 2.491980e+03 8.540630e+03 5.623050e+03 2.339780e+03 3.007140e+03
1332496920083333 2.584750e+05 2.235780e+05 5.924870e+03 1.394480e+03 8.779620e+03 4.544180e+03 2.132030e+03 3.849760e+03
1332496920091667 2.615630e+05 2.246090e+05 4.336140e+03 2.455750e+03 8.055380e+03 3.469110e+03 6.278730e+02 3.664200e+03
# interval-end 1332496920100000

View File

@@ -375,6 +375,7 @@ class TestClient(object):
# Delete streams that exist # Delete streams that exist
for stream in client.stream_list(): for stream in client.stream_list():
client.stream_remove(stream[0])
client.stream_destroy(stream[0]) client.stream_destroy(stream[0])
# Database is empty # Database is empty
@@ -506,6 +507,10 @@ class TestClient(object):
[ 109, 118 ], [ 109, 118 ],
[ 200, 300 ] ]) [ 200, 300 ] ])
# destroy stream (try without removing data first)
with assert_raises(ClientError):
client.stream_destroy("/context/test")
client.stream_remove("/context/test")
client.stream_destroy("/context/test") client.stream_destroy("/context/test")
client.close() client.close()
@@ -600,6 +605,7 @@ class TestClient(object):
]) ])
# Clean up # Clean up
client.stream_remove("/empty/test")
client.stream_destroy("/empty/test") client.stream_destroy("/empty/test")
client.close() client.close()
@@ -635,8 +641,9 @@ class TestClient(object):
eq_(connections(), (1, 5)) eq_(connections(), (1, 5))
# Clean up # Clean up
c.stream_remove("/persist/test")
c.stream_destroy("/persist/test") c.stream_destroy("/persist/test")
eq_(connections(), (1, 6)) eq_(connections(), (1, 7))
def test_client_13_timestamp_rounding(self): def test_client_13_timestamp_rounding(self):
# Test potentially bad timestamps (due to floating point # Test potentially bad timestamps (due to floating point
@@ -661,5 +668,6 @@ class TestClient(object):
# Server will round this and give an error on finalize() # Server will round this and give an error on finalize()
ctx.insert("299999999.99 1\n") ctx.insert("299999999.99 1\n")
client.stream_remove("/rounding/test")
client.stream_destroy("/rounding/test") client.stream_destroy("/rounding/test")
client.close() client.close()

View File

@@ -21,12 +21,13 @@ from testutil.helpers import *
testdb = "tests/cmdline-testdb" testdb = "tests/cmdline-testdb"
def server_start(max_results = None, bulkdata_args = {}): def server_start(max_results = None, max_removals = None, bulkdata_args = {}):
global test_server, test_db global test_server, test_db
# Start web app on a custom port # Start web app on a custom port
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)( test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
testdb, testdb,
max_results = max_results, max_results = max_results,
max_removals = max_removals,
bulkdata_args = bulkdata_args) bulkdata_args = bulkdata_args)
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1", test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
port = 32180, stoppable = False, port = 32180, stoppable = False,
@@ -233,6 +234,8 @@ class TestCmdline(object):
eq_(parse_time("1333648800.0"), test) eq_(parse_time("1333648800.0"), test)
eq_(parse_time("1333648800000000"), test) eq_(parse_time("1333648800000000"), test)
eq_(parse_time("@1333648800000000"), test) eq_(parse_time("@1333648800000000"), test)
eq_(parse_time("min"), nilmdb.utils.time.min_timestamp)
eq_(parse_time("max"), nilmdb.utils.time.max_timestamp)
with assert_raises(ValueError): with assert_raises(ValueError):
parse_time("@hashtag12345") parse_time("@hashtag12345")
@@ -593,6 +596,8 @@ class TestCmdline(object):
test(6, "10:00:30", "10:00:31", extra="-b") test(6, "10:00:30", "10:00:31", extra="-b")
test(7, "10:00:30", "10:00:30.999", extra="-a -T") test(7, "10:00:30", "10:00:30.999", extra="-a -T")
test(7, "10:00:30", "10:00:30.999", extra="-a --timestamp-raw") test(7, "10:00:30", "10:00:30.999", extra="-a --timestamp-raw")
test(8, "10:01:59.9", "10:02:00.1", extra="--markup")
test(8, "10:01:59.9", "10:02:00.1", extra="-m")
# all data put in by tests # all data put in by tests
self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01") self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01")
@@ -600,6 +605,11 @@ class TestCmdline(object):
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01") self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("43200\n") self.match("43200\n")
# markup for 3 intervals, plus extra markup lines whenever we had
# a "restart" from the nilmdb.stream_extract function
self.ok("extract -m /newton/prep --start 2000-01-01 --end 2020-01-01")
lines_(self.captured, 43210)
def test_09_truncated(self): def test_09_truncated(self):
# Test truncated responses by overriding the nilmdb max_results # Test truncated responses by overriding the nilmdb max_results
server_stop() server_stop()
@@ -699,11 +709,9 @@ class TestCmdline(object):
# Reinsert some data, to verify that no overlaps with deleted # Reinsert some data, to verify that no overlaps with deleted
# data are reported # data are reported
os.environ['TZ'] = "UTC" for minute in ["0", "2"]:
self.ok("insert --timestamp -f --rate 120 /newton/prep" self.ok("insert --timestamp -f --rate 120 /newton/prep"
"tests/data/prep-20120323T1000") " tests/data/prep-20120323T100" + minute)
self.ok("insert -t --filename --rate 120 /newton/prep "
"tests/data/prep-20120323T1002")
def test_11_destroy(self): def test_11_destroy(self):
# Delete records # Delete records
@@ -715,6 +723,9 @@ class TestCmdline(object):
self.fail("destroy /no/such/stream") self.fail("destroy /no/such/stream")
self.contain("No stream at path") self.contain("No stream at path")
self.fail("destroy -R /no/such/stream")
self.contain("No stream at path")
self.fail("destroy asdfasdf") self.fail("destroy asdfasdf")
self.contain("No stream at path") self.contain("No stream at path")
@@ -728,8 +739,14 @@ class TestCmdline(object):
self.ok("list --detail") self.ok("list --detail")
lines_(self.captured, 7) lines_(self.captured, 7)
# Delete some # Fail to destroy because intervals still present
self.ok("destroy /newton/prep") self.fail("destroy /newton/prep")
self.contain("all intervals must be removed")
self.ok("list --detail")
lines_(self.captured, 7)
# Destroy for real
self.ok("destroy -R /newton/prep")
self.ok("list") self.ok("list")
self.match("/newton/raw uint16_6\n" self.match("/newton/raw uint16_6\n"
"/newton/zzz/rawnotch uint16_9\n") "/newton/zzz/rawnotch uint16_9\n")
@@ -740,7 +757,8 @@ class TestCmdline(object):
self.ok("destroy /newton/raw") self.ok("destroy /newton/raw")
self.ok("create /newton/raw uint16_6") self.ok("create /newton/raw uint16_6")
self.ok("destroy /newton/raw") # Specify --remove with no data
self.ok("destroy --remove /newton/raw")
self.ok("list") self.ok("list")
self.match("") self.match("")
@@ -815,7 +833,7 @@ class TestCmdline(object):
# Now recreate the data one more time and make sure there are # Now recreate the data one more time and make sure there are
# fewer files. # fewer files.
self.ok("destroy /newton/prep") self.ok("destroy --remove /newton/prep")
self.fail("destroy /newton/prep") # already destroyed self.fail("destroy /newton/prep") # already destroyed
self.ok("create /newton/prep float32_8") self.ok("create /newton/prep float32_8")
os.environ['TZ'] = "UTC" os.environ['TZ'] = "UTC"
@@ -826,14 +844,16 @@ class TestCmdline(object):
for (dirpath, dirnames, filenames) in os.walk(testdb): for (dirpath, dirnames, filenames) in os.walk(testdb):
nfiles += len(filenames) nfiles += len(filenames)
lt_(nfiles, 50) lt_(nfiles, 50)
self.ok("destroy /newton/prep") # destroy again self.ok("destroy -R /newton/prep") # destroy again
def test_14_remove_files(self): def test_14_remove_files(self):
# Test BulkData's ability to remove when data is split into # Test BulkData's ability to remove when data is split into
# multiple files. Should be a fairly comprehensive test of # multiple files. Should be a fairly comprehensive test of
# remove functionality. # remove functionality.
# Also limit max_removals, to cover more functionality.
server_stop() server_stop()
server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file server_start(max_removals = 4321,
bulkdata_args = { "file_size" : 920, # 23 rows per file
"files_per_dir" : 3 }) "files_per_dir" : 3 })
# Insert data. Just for fun, insert out of order # Insert data. Just for fun, insert out of order
@@ -974,8 +994,8 @@ class TestCmdline(object):
self.match("[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -" self.match("[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -"
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n") "> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
self.ok("destroy /diff/1") self.ok("destroy -R /diff/1")
self.ok("destroy /diff/2") self.ok("destroy -R /diff/2")
def test_16_rename(self): def test_16_rename(self):
# Test renaming. Force file size smaller so we get more files # Test renaming. Force file size smaller so we get more files
@@ -1039,7 +1059,7 @@ class TestCmdline(object):
self.fail("rename /foo/bar /xxx/yyy/zzz/www") self.fail("rename /foo/bar /xxx/yyy/zzz/www")
self.contain("path is subdir of existing node") self.contain("path is subdir of existing node")
self.ok("rename /foo/bar /xxx/yyy/mmm") self.ok("rename /foo/bar /xxx/yyy/mmm")
self.ok("destroy /xxx/yyy/zzz") self.ok("destroy -R /xxx/yyy/zzz")
check_path("xxx", "yyy", "mmm") check_path("xxx", "yyy", "mmm")
# Extract it at the final path # Extract it at the final path
@@ -1047,7 +1067,7 @@ class TestCmdline(object):
"--end '2012-03-23 10:04:01'") "--end '2012-03-23 10:04:01'")
eq_(self.captured, extract_before) eq_(self.captured, extract_before)
self.ok("destroy /xxx/yyy/mmm") self.ok("destroy -R /xxx/yyy/mmm")
# Make sure temporary rename dirs weren't left around # Make sure temporary rename dirs weren't left around
for (dirpath, dirnames, filenames) in os.walk(testdb): for (dirpath, dirnames, filenames) in os.walk(testdb):