Compare commits

...

7 Commits

Author SHA1 Message Date
4cdef3285d Destroy now requires that all data has been previously removed.
Added new flag "-R" to command line to perform an automatic removal.
This should be the last of the ways in which a single command could
block the nilmdb thread for a long time.
2013-03-18 19:39:03 -04:00
bcd82c4d59 Limit the number of rows removed per call to nilmdb.stream_remove
Server class will retry as needed, as with stream_extract and
stream_intervals.
2013-03-18 18:22:45 -04:00
caf63ab01f Fix stream_extract/stream_intervals restart around timestamp == 0. 2013-03-18 18:20:25 -04:00
2d72891162 Accept "min" and "max" as timestamps on command line 2013-03-18 18:19:24 -04:00
cda2ac3e77 Don't return a mutable interval from IntervalSet.intersection()
Instead, always take the subset, which creates a new interval.
Also adds a small optimization by moving the 'if orig' check outside the
loop.
2013-03-18 18:16:35 -04:00
57d3d60f6a Fix relative import problems 2013-03-18 16:27:27 -04:00
d6b5befe76 Don't use filenames as default arg completion 2013-03-16 17:27:58 -04:00
9 changed files with 143 additions and 69 deletions

View File

@@ -17,4 +17,4 @@ _nilmtool_argcomplete() {
unset COMPREPLY
fi
}
complete -o nospace -o default -F _nilmtool_argcomplete nilmtool
complete -o nospace -F _nilmtool_argcomplete nilmtool

View File

@@ -97,7 +97,7 @@ class Client(object):
return self.http.post("stream/create", params)
def stream_destroy(self, path):
"""Delete stream and its contents"""
"""Delete stream. Fails if any data is still present."""
params = { "path": path }
return self.http.post("stream/destroy", params)

View File

@@ -7,11 +7,14 @@ def setup(self, sub):
cmd = sub.add_parser("destroy", help="Delete a stream and all data",
formatter_class = def_form,
description="""
Destroy the stream at the specified path. All
data and metadata related to the stream is
permanently deleted.
Destroy the stream at the specified path.
The stream must be empty. All metadata
related to the stream is permanently deleted.
""")
cmd.set_defaults(handler = cmd_destroy)
group = cmd.add_argument_group("Options")
group.add_argument("-R", "--remove", action="store_true",
help="Remove all data before destroying stream")
group = cmd.add_argument_group("Required arguments")
group.add_argument("path",
help="Path of the stream to delete, e.g. /foo/bar",
@@ -20,6 +23,11 @@ def setup(self, sub):
def cmd_destroy(self):
"""Destroy stream"""
if self.args.remove:
try:
count = self.client.stream_remove(self.args.path)
except nilmdb.client.ClientError as e:
self.die("error removing data: %s", str(e))
try:
self.client.stream_destroy(self.args.path)
except nilmdb.client.ClientError as e:

View File

@@ -286,23 +286,18 @@ cdef class IntervalSet:
(potentially) subsetted to make the one that is being
returned.
"""
if not isinstance(interval, Interval):
raise TypeError("bad type")
for n in self.tree.intersect(interval.start, interval.end):
i = n.obj
if i:
if i.start >= interval.start and i.end <= interval.end:
if orig:
yield (i, i)
else:
yield i
else:
subset = i.subset(max(i.start, interval.start),
min(i.end, interval.end))
if orig:
yield (subset, i)
else:
yield subset
if orig:
for n in self.tree.intersect(interval.start, interval.end):
i = n.obj
subset = i.subset(max(i.start, interval.start),
min(i.end, interval.end))
yield (subset, i)
else:
for n in self.tree.intersect(interval.start, interval.end):
i = n.obj
subset = i.subset(max(i.start, interval.start),
min(i.end, interval.end))
yield subset
cpdef intersects(self, Interval other):
"""Return True if this IntervalSet intersects another interval"""

View File

@@ -83,7 +83,18 @@ class NilmDB(object):
verbose = 0
def __init__(self, basepath, max_results=None,
bulkdata_args=None):
max_removals=None, bulkdata_args=None):
"""Initialize NilmDB at the given basepath.
Other arguments are for debugging / testing:
'max_results' is the max rows to send in a single
stream_intervals or stream_extract response.
'max_removals' is the max rows to delete at once
in stream_move.
'bulkdata_args' is kwargs for the bulkdata module.
"""
if bulkdata_args is None:
bulkdata_args = {}
@@ -113,11 +124,11 @@ class NilmDB(object):
self.con.execute("PRAGMA journal_mode=WAL")
# Approximate largest number of elements that we want to send
# in a single reply (for stream_intervals, stream_extract)
if max_results:
self.max_results = max_results
else:
self.max_results = 16384
# in a single reply (for stream_intervals, stream_extract).
self.max_results = max_results or 16384
# Remove up to this many rows per call to stream_remove.
self.max_removals = max_removals or 1048576
def get_basepath(self):
return self.basepath
@@ -334,14 +345,14 @@ class NilmDB(object):
Returns (intervals, restart) tuple.
intervals is a list of [start,end] timestamps of all intervals
'intervals' is a list of [start,end] timestamps of all intervals
that exist for path, between start and end.
restart, if nonzero, means that there were too many results to
return in a single request. The data is complete from the
starting timestamp to the point at which it was truncated,
and a new request with a start time of 'restart' will fetch
the next block of data.
'restart', if not None, means that there were too many results
to return in a single request. The data is complete from the
starting timestamp to the point at which it was truncated, and
a new request with a start time of 'restart' will fetch the
next block of data.
"""
stream_id = self._stream_id(path)
intervals = self._get_intervals(stream_id)
@@ -363,7 +374,7 @@ class NilmDB(object):
break
result.append([i.start, i.end])
else:
restart = 0
restart = None
return (result, restart)
def stream_create(self, path, layout_name):
@@ -439,17 +450,22 @@ class NilmDB(object):
(newpath, stream_id))
def stream_destroy(self, path):
"""Fully remove a table and all of its data from the database.
No way to undo it! Metadata is removed."""
"""Fully remove a table from the database. Fails if there are
any intervals data present; remove them first. Metadata is
also removed."""
stream_id = self._stream_id(path)
# Delete the cached interval data (if it was cached)
# Verify that no intervals are present, and clear the cache
iset = self._get_intervals(stream_id)
if len(iset):
raise NilmDBError("all intervals must be removed before "
"destroying a stream")
self._get_intervals.cache_remove(self, stream_id)
# Delete the data
# Delete the bulkdata storage
self.data.destroy(path)
# Delete metadata, stream, intervals
# Delete metadata, stream, intervals (should be none)
with self.con as con:
con.execute("DELETE FROM metadata WHERE stream_id=?", (stream_id,))
con.execute("DELETE FROM ranges WHERE stream_id=?", (stream_id,))
@@ -521,10 +537,10 @@ class NilmDB(object):
"""
Returns (data, restart) tuple.
data is ASCII-formatted data from the database, formatted
'data' is ASCII-formatted data from the database, formatted
according to the layout of the stream.
restart, if nonzero, means that there were too many results to
'restart', if not None, means that there were too many results to
return in a single request. The data is complete from the
starting timestamp to the point at which it was truncated,
and a new request with a start time of 'restart' will fetch
@@ -543,7 +559,7 @@ class NilmDB(object):
result = []
matched = 0
remaining = self.max_results
restart = 0
restart = None
for interval in intervals.intersection(requested):
# Reading single rows from the table is too slow, so
# we use two bisections to find both the starting and
@@ -568,7 +584,7 @@ class NilmDB(object):
# Count them
remaining -= row_end - row_start
if restart:
if restart is not None:
break
if count:
@@ -578,9 +594,17 @@ class NilmDB(object):
def stream_remove(self, path, start = None, end = None):
"""
Remove data from the specified time interval within a stream.
Removes all data in the interval [start, end), and intervals
are truncated or split appropriately. Returns the number of
data points removed.
Removes data in the interval [start, end), and intervals are
truncated or split appropriately.
Returns a (removed, restart) tuple.
'removed' is the number of data points that were removed.
'restart', if not None, means there were too many rows to
remove in a single request. This function should be called
again with a start time of 'restart' to complete the removal.
"""
stream_id = self._stream_id(path)
table = self.data.getnode(path)
@@ -588,6 +612,8 @@ class NilmDB(object):
(start, end) = self._check_user_times(start, end)
to_remove = Interval(start, end)
removed = 0
remaining = self.max_removals
restart = None
# Can't remove intervals from within the iterator, so we need to
# remember what's currently in the intersection now.
@@ -598,6 +624,13 @@ class NilmDB(object):
row_start = self._find_start(table, dbint)
row_end = self._find_end(table, dbint)
# Shorten it if we'll hit the maximum number of removals
row_max = row_start + remaining
if row_max < row_end:
row_end = row_max
dbint.end = table[row_max]
restart = dbint.end
# Adjust the DBInterval to match the newly found ends
dbint.db_start = dbint.start
dbint.db_end = dbint.end
@@ -613,4 +646,7 @@ class NilmDB(object):
# Count how many were removed
removed += row_end - row_start
return removed
if restart is not None:
break
return (removed, restart)

View File

@@ -210,7 +210,7 @@ class Stream(NilmApp):
@exception_to_httperror(NilmDBError)
@cherrypy.tools.CORS_allow(methods = ["POST"])
def destroy(self, path):
"""Delete a stream and its associated data."""
"""Delete a stream. Fails if any data is still present."""
return self.db.stream_destroy(path)
# /stream/rename?oldpath=/newton/prep&newpath=/newton/prep/1
@@ -339,7 +339,14 @@ class Stream(NilmApp):
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
return self.db.stream_remove(path, start, end)
total_removed = 0
while True:
(removed, restart) = self.db.stream_remove(path, start, end)
total_removed += removed
if restart is None:
break
start = restart
return total_removed
# /stream/intervals?path=/newton/prep
# /stream/intervals?path=/newton/prep&start=1234567890.0&end=1234567899.0
@@ -386,7 +393,7 @@ class Stream(NilmApp):
diffpath)
response = ''.join([ json.dumps(i) + "\r\n" for i in ints ])
yield response
if restart == 0:
if restart is None:
break
start = restart
return content(start, end)
@@ -432,7 +439,7 @@ class Stream(NilmApp):
(data, restart) = self.db.stream_extract(path, start, end)
yield data
if restart == 0:
if restart is None:
return
start = restart
return content(start, end, count)

View File

@@ -1,3 +1,5 @@
from __future__ import absolute_import
from nilmdb.utils import datetime_tz
import re
import time
@@ -58,6 +60,11 @@ def parse_time(toparse):
timestamp, the current local timezone is assumed (e.g. from the TZ
env var).
"""
if toparse == "min":
return min_timestamp
if toparse == "max":
return max_timestamp
# If string isn't "now" and doesn't contain at least 4 digits,
# consider it invalid. smartparse might otherwise accept
# empty strings and strings with just separators.

View File

@@ -375,6 +375,7 @@ class TestClient(object):
# Delete streams that exist
for stream in client.stream_list():
client.stream_remove(stream[0])
client.stream_destroy(stream[0])
# Database is empty
@@ -506,6 +507,10 @@ class TestClient(object):
[ 109, 118 ],
[ 200, 300 ] ])
# destroy stream (try without removing data first)
with assert_raises(ClientError):
client.stream_destroy("/context/test")
client.stream_remove("/context/test")
client.stream_destroy("/context/test")
client.close()
@@ -600,6 +605,7 @@ class TestClient(object):
])
# Clean up
client.stream_remove("/empty/test")
client.stream_destroy("/empty/test")
client.close()
@@ -635,8 +641,9 @@ class TestClient(object):
eq_(connections(), (1, 5))
# Clean up
c.stream_remove("/persist/test")
c.stream_destroy("/persist/test")
eq_(connections(), (1, 6))
eq_(connections(), (1, 7))
def test_client_13_timestamp_rounding(self):
# Test potentially bad timestamps (due to floating point
@@ -661,5 +668,6 @@ class TestClient(object):
# Server will round this and give an error on finalize()
ctx.insert("299999999.99 1\n")
client.stream_remove("/rounding/test")
client.stream_destroy("/rounding/test")
client.close()

View File

@@ -21,12 +21,13 @@ from testutil.helpers import *
testdb = "tests/cmdline-testdb"
def server_start(max_results = None, bulkdata_args = {}):
def server_start(max_results = None, max_removals = None, bulkdata_args = {}):
global test_server, test_db
# Start web app on a custom port
test_db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(
testdb,
max_results = max_results,
max_removals = max_removals,
bulkdata_args = bulkdata_args)
test_server = nilmdb.server.Server(test_db, host = "127.0.0.1",
port = 32180, stoppable = False,
@@ -233,6 +234,8 @@ class TestCmdline(object):
eq_(parse_time("1333648800.0"), test)
eq_(parse_time("1333648800000000"), test)
eq_(parse_time("@1333648800000000"), test)
eq_(parse_time("min"), nilmdb.utils.time.min_timestamp)
eq_(parse_time("max"), nilmdb.utils.time.max_timestamp)
with assert_raises(ValueError):
parse_time("@hashtag12345")
@@ -699,11 +702,9 @@ class TestCmdline(object):
# Reinsert some data, to verify that no overlaps with deleted
# data are reported
os.environ['TZ'] = "UTC"
self.ok("insert --timestamp -f --rate 120 /newton/prep "
"tests/data/prep-20120323T1000")
self.ok("insert -t --filename --rate 120 /newton/prep "
"tests/data/prep-20120323T1002")
for minute in ["0", "2"]:
self.ok("insert --timestamp -f --rate 120 /newton/prep"
" tests/data/prep-20120323T100" + minute)
def test_11_destroy(self):
# Delete records
@@ -715,6 +716,9 @@ class TestCmdline(object):
self.fail("destroy /no/such/stream")
self.contain("No stream at path")
self.fail("destroy -R /no/such/stream")
self.contain("No stream at path")
self.fail("destroy asdfasdf")
self.contain("No stream at path")
@@ -728,8 +732,14 @@ class TestCmdline(object):
self.ok("list --detail")
lines_(self.captured, 7)
# Delete some
self.ok("destroy /newton/prep")
# Fail to destroy because intervals still present
self.fail("destroy /newton/prep")
self.contain("all intervals must be removed")
self.ok("list --detail")
lines_(self.captured, 7)
# Destroy for real
self.ok("destroy -R /newton/prep")
self.ok("list")
self.match("/newton/raw uint16_6\n"
"/newton/zzz/rawnotch uint16_9\n")
@@ -740,7 +750,8 @@ class TestCmdline(object):
self.ok("destroy /newton/raw")
self.ok("create /newton/raw uint16_6")
self.ok("destroy /newton/raw")
# Specify --remove with no data
self.ok("destroy --remove /newton/raw")
self.ok("list")
self.match("")
@@ -815,7 +826,7 @@ class TestCmdline(object):
# Now recreate the data one more time and make sure there are
# fewer files.
self.ok("destroy /newton/prep")
self.ok("destroy --remove /newton/prep")
self.fail("destroy /newton/prep") # already destroyed
self.ok("create /newton/prep float32_8")
os.environ['TZ'] = "UTC"
@@ -826,14 +837,16 @@ class TestCmdline(object):
for (dirpath, dirnames, filenames) in os.walk(testdb):
nfiles += len(filenames)
lt_(nfiles, 50)
self.ok("destroy /newton/prep") # destroy again
self.ok("destroy -R /newton/prep") # destroy again
def test_14_remove_files(self):
# Test BulkData's ability to remove when data is split into
# multiple files. Should be a fairly comprehensive test of
# remove functionality.
# Also limit max_removals, to cover more functionality.
server_stop()
server_start(bulkdata_args = { "file_size" : 920, # 23 rows per file
server_start(max_removals = 4321,
bulkdata_args = { "file_size" : 920, # 23 rows per file
"files_per_dir" : 3 })
# Insert data. Just for fun, insert out of order
@@ -974,8 +987,8 @@ class TestCmdline(object):
self.match("[ Thu, 01 Jan 2004 00:00:00.000000 +0000 -"
"> Sat, 01 Jan 2005 00:00:00.000000 +0000 ]\n")
self.ok("destroy /diff/1")
self.ok("destroy /diff/2")
self.ok("destroy -R /diff/1")
self.ok("destroy -R /diff/2")
def test_16_rename(self):
# Test renaming. Force file size smaller so we get more files
@@ -1039,7 +1052,7 @@ class TestCmdline(object):
self.fail("rename /foo/bar /xxx/yyy/zzz/www")
self.contain("path is subdir of existing node")
self.ok("rename /foo/bar /xxx/yyy/mmm")
self.ok("destroy /xxx/yyy/zzz")
self.ok("destroy -R /xxx/yyy/zzz")
check_path("xxx", "yyy", "mmm")
# Extract it at the final path
@@ -1047,7 +1060,7 @@ class TestCmdline(object):
"--end '2012-03-23 10:04:01'")
eq_(self.captured, extract_before)
self.ok("destroy /xxx/yyy/mmm")
self.ok("destroy -R /xxx/yyy/mmm")
# Make sure temporary rename dirs weren't left around
for (dirpath, dirnames, filenames) in os.walk(testdb):