Compare commits

...

9 Commits

7 changed files with 137 additions and 114 deletions

View File

@@ -162,9 +162,12 @@ class StreamInserterNumpy(nilmdb.client.client.StreamInserter):
elif array.ndim == 2: elif array.ndim == 2:
# Convert to structured array # Convert to structured array
sarray = numpy.zeros(array.shape[0], dtype=self._dtype) sarray = numpy.zeros(array.shape[0], dtype=self._dtype)
try:
sarray['timestamp'] = array[:,0] sarray['timestamp'] = array[:,0]
# Need the squeeze in case sarray['data'] is 1 dimensional # Need the squeeze in case sarray['data'] is 1 dimensional
sarray['data'] = numpy.squeeze(array[:,1:]) sarray['data'] = numpy.squeeze(array[:,1:])
except (IndexError, ValueError):
raise ValueError("wrong number of fields for this data type")
array = sarray array = sarray
else: else:
raise ValueError("wrong number of dimensions in array") raise ValueError("wrong number of dimensions in array")

View File

@@ -81,6 +81,12 @@ class Cmdline(object):
def __init__(self, argv = None): def __init__(self, argv = None):
self.argv = argv or sys.argv[1:] self.argv = argv or sys.argv[1:]
try:
# Assume command line arguments are encoded with stdin's encoding,
# and reverse it. Won't be needed in Python 3, but for now..
self.argv = [ x.decode(sys.stdin.encoding) for x in self.argv ]
except Exception: # pragma: no cover
pass
self.client = None self.client = None
self.def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/") self.def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
self.subcmd = {} self.subcmd = {}

View File

@@ -1,5 +1,6 @@
from nilmdb.utils.printf import * from nilmdb.utils.printf import *
import nilmdb.client import nilmdb.client
import fnmatch
from argparse import ArgumentDefaultsHelpFormatter as def_form from argparse import ArgumentDefaultsHelpFormatter as def_form
@@ -10,25 +11,39 @@ def setup(self, sub):
Destroy the stream at the specified path. Destroy the stream at the specified path.
The stream must be empty. All metadata The stream must be empty. All metadata
related to the stream is permanently deleted. related to the stream is permanently deleted.
Wildcards and multiple paths are supported.
""") """)
cmd.set_defaults(handler = cmd_destroy) cmd.set_defaults(handler = cmd_destroy)
group = cmd.add_argument_group("Options") group = cmd.add_argument_group("Options")
group.add_argument("-R", "--remove", action="store_true", group.add_argument("-R", "--remove", action="store_true",
help="Remove all data before destroying stream") help="Remove all data before destroying stream")
group.add_argument("-q", "--quiet", action="store_true",
help="Don't display names when destroying "
"multiple paths")
group = cmd.add_argument_group("Required arguments") group = cmd.add_argument_group("Required arguments")
group.add_argument("path", group.add_argument("path", nargs='+',
help="Path of the stream to delete, e.g. /foo/bar", help="Path of the stream to delete, e.g. /foo/bar/*",
).completer = self.complete.path ).completer = self.complete.path
return cmd return cmd
def cmd_destroy(self): def cmd_destroy(self):
"""Destroy stream""" """Destroy stream"""
streams = [ s[0] for s in self.client.stream_list() ]
paths = []
for path in self.args.path:
new = fnmatch.filter(streams, path)
if not new:
self.die("error: no stream matched path: %s", path)
paths.extend(new)
for path in paths:
if not self.args.quiet and len(paths) > 1:
printf("Destroying %s\n", path)
try:
if self.args.remove: if self.args.remove:
try: count = self.client.stream_remove(path)
count = self.client.stream_remove(self.args.path) self.client.stream_destroy(path)
except nilmdb.client.ClientError as e:
self.die("error removing data: %s", str(e))
try:
self.client.stream_destroy(self.args.path)
except nilmdb.client.ClientError as e: except nilmdb.client.ClientError as e:
self.die("error destroying stream: %s", str(e)) self.die("error destroying stream: %s", str(e))

View File

@@ -10,22 +10,16 @@ def setup(self, sub):
formatter_class = def_form, formatter_class = def_form,
description=""" description="""
List streams available in the database, List streams available in the database,
optionally filtering by layout or path. Wildcards optionally filtering by path. Wildcards
are accepted. are accepted; non-matching paths or wildcards
are ignored.
""") """)
cmd.set_defaults(verify = cmd_list_verify, cmd.set_defaults(verify = cmd_list_verify,
handler = cmd_list) handler = cmd_list)
group = cmd.add_argument_group("Stream filtering") group = cmd.add_argument_group("Stream filtering")
group.add_argument("-p", "--path", metavar="PATH", default="*", group.add_argument("path", metavar="PATH", default=["*"], nargs='*',
help="Match only this path (-p can be omitted)",
).completer = self.complete.path ).completer = self.complete.path
group.add_argument("path_positional", default="*",
nargs="?", help=argparse.SUPPRESS,
).completer = self.complete.path
group.add_argument("-l", "--layout", default="*",
help="Match only this stream layout",
).completer = self.complete.layout
group = cmd.add_argument_group("Interval info") group = cmd.add_argument_group("Interval info")
group.add_argument("-E", "--ext", action="store_true", group.add_argument("-E", "--ext", action="store_true",
@@ -49,20 +43,12 @@ def setup(self, sub):
group = cmd.add_argument_group("Misc options") group = cmd.add_argument_group("Misc options")
group.add_argument("-T", "--timestamp-raw", action="store_true", group.add_argument("-T", "--timestamp-raw", action="store_true",
help="Show raw timestamps when printing times") help="Show raw timestamps when printing times")
group.add_argument("-l", "--layout", action="store_true",
help="Show layout type next to path name")
return cmd return cmd
def cmd_list_verify(self): def cmd_list_verify(self):
# A hidden "path_positional" argument lets the user leave off the
# "-p" when specifying the path. Handle it here.
got_opt = self.args.path != "*"
got_pos = self.args.path_positional != "*"
if got_pos:
if got_opt:
self.parser.error("too many paths specified")
else:
self.args.path = self.args.path_positional
if self.args.start is not None and self.args.end is not None: if self.args.start is not None and self.args.end is not None:
if self.args.start >= self.args.end: if self.args.start >= self.args.end:
self.parser.error("start must precede end") self.parser.error("start must precede end")
@@ -80,13 +66,16 @@ def cmd_list(self):
else: else:
time_string = nilmdb.utils.time.timestamp_to_human time_string = nilmdb.utils.time.timestamp_to_human
for argpath in self.args.path:
for stream in streams: for stream in streams:
(path, layout, int_min, int_max, rows, time) = stream[:6] (path, layout, int_min, int_max, rows, time) = stream[:6]
if not (fnmatch.fnmatch(path, self.args.path) and if not fnmatch.fnmatch(path, argpath):
fnmatch.fnmatch(layout, self.args.layout)):
continue continue
if self.args.layout:
printf("%s %s\n", path, layout) printf("%s %s\n", path, layout)
else:
printf("%s\n", path)
if self.args.ext: if self.args.ext:
if int_min is None or int_max is None: if int_min is None or int_max is None:
@@ -102,7 +91,8 @@ def cmd_list(self):
printed = False printed = False
for (start, end) in self.client.stream_intervals( for (start, end) in self.client.stream_intervals(
path, self.args.start, self.args.end): path, self.args.start, self.args.end):
printf(" [ %s -> %s ]\n", time_string(start), time_string(end)) printf(" [ %s -> %s ]\n",
time_string(start), time_string(end))
printed = True printed = True
if not printed: if not printed:
printf(" (no intervals)\n") printf(" (no intervals)\n")

View File

@@ -1,17 +1,19 @@
from nilmdb.utils.printf import * from nilmdb.utils.printf import *
import nilmdb.client import nilmdb.client
import fnmatch
def setup(self, sub): def setup(self, sub):
cmd = sub.add_parser("remove", help="Remove data", cmd = sub.add_parser("remove", help="Remove data",
description=""" description="""
Remove all data from a specified time range within a Remove all data from a specified time range within a
stream. stream. If multiple streams or wildcards are provided,
the same time range is removed from all streams.
""") """)
cmd.set_defaults(handler = cmd_remove) cmd.set_defaults(handler = cmd_remove)
group = cmd.add_argument_group("Data selection") group = cmd.add_argument_group("Data selection")
group.add_argument("path", group.add_argument("path", nargs='+',
help="Path of stream, e.g. /foo/bar", help="Path of stream, e.g. /foo/bar/*",
).completer = self.complete.path ).completer = self.complete.path
group.add_argument("-s", "--start", required=True, group.add_argument("-s", "--start", required=True,
metavar="TIME", type=self.arg_time, metavar="TIME", type=self.arg_time,
@@ -23,18 +25,31 @@ def setup(self, sub):
).completer = self.complete.time ).completer = self.complete.time
group = cmd.add_argument_group("Output format") group = cmd.add_argument_group("Output format")
group.add_argument("-q", "--quiet", action="store_true",
help="Don't display names when removing "
"from multiple paths")
group.add_argument("-c", "--count", action="store_true", group.add_argument("-c", "--count", action="store_true",
help="Output number of data points removed") help="Output number of data points removed")
return cmd return cmd
def cmd_remove(self): def cmd_remove(self):
streams = [ s[0] for s in self.client.stream_list() ]
paths = []
for path in self.args.path:
new = fnmatch.filter(streams, path)
if not new:
self.die("error: no stream matched path: %s", path)
paths.extend(new)
try: try:
count = self.client.stream_remove(self.args.path, for path in paths:
if not self.args.quiet and len(paths) > 1:
printf("Removing from %s\n", path)
count = self.client.stream_remove(path,
self.args.start, self.args.end) self.args.start, self.args.end)
if self.args.count:
printf("%d\n", count);
except nilmdb.client.ClientError as e: except nilmdb.client.ClientError as e:
self.die("error removing data: %s", str(e)) self.die("error removing data: %s", str(e))
if self.args.count:
printf("%d\n", count)
return 0 return 0

View File

@@ -300,38 +300,19 @@ class TestCmdline(object):
# Verify we got those 3 streams and they're returned in # Verify we got those 3 streams and they're returned in
# alphabetical order. # alphabetical order.
self.ok("list") self.ok("list -l")
self.match("/newton/prep float32_8\n" self.match("/newton/prep float32_8\n"
"/newton/raw uint16_6\n" "/newton/raw uint16_6\n"
"/newton/zzz/rawnotch uint16_9\n") "/newton/zzz/rawnotch uint16_9\n")
# Match just one type or one path. Also check # Match just one type or one path. Also check
# that --path is optional # that --path is optional
self.ok("list --path /newton/raw") self.ok("list --layout /newton/raw")
self.match("/newton/raw uint16_6\n")
self.ok("list /newton/raw")
self.match("/newton/raw uint16_6\n")
self.fail("list -p /newton/raw /newton/raw")
self.contain("too many paths")
self.ok("list --layout uint16_6")
self.match("/newton/raw uint16_6\n") self.match("/newton/raw uint16_6\n")
# Wildcard matches # Wildcard matches
self.ok("list --layout uint16*") self.ok("list *zzz*")
self.match("/newton/raw uint16_6\n" self.match("/newton/zzz/rawnotch\n")
"/newton/zzz/rawnotch uint16_9\n")
self.ok("list --path *zzz* --layout uint16*")
self.match("/newton/zzz/rawnotch uint16_9\n")
self.ok("list *zzz* --layout uint16*")
self.match("/newton/zzz/rawnotch uint16_9\n")
self.ok("list --path *zzz* --layout float32*")
self.match("")
# reversed range # reversed range
self.fail("list /newton/prep --start 2020-01-01 --end 2000-01-01") self.fail("list /newton/prep --start 2020-01-01 --end 2000-01-01")
@@ -497,28 +478,28 @@ class TestCmdline(object):
self.ok("list --detail") self.ok("list --detail")
lines_(self.captured, 8) lines_(self.captured, 8)
self.ok("list --detail --path *prep") self.ok("list --detail *prep")
lines_(self.captured, 4) lines_(self.captured, 4)
self.ok("list --detail --path *prep --start='23 Mar 2012 10:02'") self.ok("list --detail *prep --start='23 Mar 2012 10:02'")
lines_(self.captured, 3) lines_(self.captured, 3)
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05'") self.ok("list --detail *prep --start='23 Mar 2012 10:05'")
lines_(self.captured, 2) lines_(self.captured, 2)
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15'") self.ok("list --detail *prep --start='23 Mar 2012 10:05:15'")
lines_(self.captured, 2) lines_(self.captured, 2)
self.contain("10:05:15.000") self.contain("10:05:15.000")
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'") self.ok("list --detail *prep --start='23 Mar 2012 10:05:15.50'")
lines_(self.captured, 2) lines_(self.captured, 2)
self.contain("10:05:15.500") self.contain("10:05:15.500")
self.ok("list --detail --path *prep --start='23 Mar 2012 19:05:15.50'") self.ok("list --detail *prep --start='23 Mar 2012 19:05:15.50'")
lines_(self.captured, 2) lines_(self.captured, 2)
self.contain("no intervals") self.contain("no intervals")
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'" self.ok("list --detail *prep --start='23 Mar 2012 10:05:15.50'"
+ " --end='23 Mar 2012 10:05:15.51'") + " --end='23 Mar 2012 10:05:15.51'")
lines_(self.captured, 2) lines_(self.captured, 2)
self.contain("10:05:15.500") self.contain("10:05:15.500")
@@ -527,15 +508,15 @@ class TestCmdline(object):
lines_(self.captured, 8) lines_(self.captured, 8)
# Verify the "raw timestamp" output # Verify the "raw timestamp" output
self.ok("list --detail --path *prep --timestamp-raw " self.ok("list --detail *prep --timestamp-raw "
"--start='23 Mar 2012 10:05:15.50'") "--start='23 Mar 2012 10:05:15.50'")
lines_(self.captured, 2) lines_(self.captured, 2)
self.contain("[ 1332497115500000 -> 1332497160000000 ]") self.contain("[ 1332497115500000 -> 1332497160000000 ]")
# bad time # bad time
self.fail("list --detail --path *prep -T --start='9332497115.612'") self.fail("list --detail *prep -T --start='9332497115.612'")
# good time # good time
self.ok("list --detail --path *prep -T --start='1332497115.612'") self.ok("list --detail *prep -T --start='1332497115.612'")
lines_(self.captured, 2) lines_(self.captured, 2)
self.contain("[ 1332497115612000 -> 1332497160000000 ]") self.contain("[ 1332497115612000 -> 1332497160000000 ]")
@@ -639,7 +620,7 @@ class TestCmdline(object):
# Try nonexistent stream # Try nonexistent stream
self.fail("remove /no/such/foo --start 2000-01-01 --end 2020-01-01") self.fail("remove /no/such/foo --start 2000-01-01 --end 2020-01-01")
self.contain("No stream at path") self.contain("no stream matched path")
# empty or backward ranges return errors # empty or backward ranges return errors
self.fail("remove /newton/prep --start 2020-01-01 --end 2000-01-01") self.fail("remove /newton/prep --start 2020-01-01 --end 2000-01-01")
@@ -667,9 +648,14 @@ class TestCmdline(object):
"--start '23 Mar 2022 20:00:30' " + "--start '23 Mar 2022 20:00:30' " +
"--end '23 Mar 2022 20:00:31'") "--end '23 Mar 2022 20:00:31'")
self.match("0\n") self.match("0\n")
self.ok("remove -c /newton/prep /newton/pre* " +
"--start '23 Mar 2022 20:00:30' " +
"--end '23 Mar 2022 20:00:31'")
self.match("Removing from /newton/prep\n0\n" +
"Removing from /newton/prep\n0\n")
# Make sure we have the data we expect # Make sure we have the data we expect
self.ok("list --detail /newton/prep") self.ok("list -l --detail /newton/prep")
self.match("/newton/prep float32_8\n" + self.match("/newton/prep float32_8\n" +
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000" " [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:01:59.991668 +0000 ]\n" " -> Fri, 23 Mar 2012 10:01:59.991668 +0000 ]\n"
@@ -704,7 +690,7 @@ class TestCmdline(object):
self.match("24000\n") self.match("24000\n")
# See the missing chunks in list output # See the missing chunks in list output
self.ok("list --detail /newton/prep") self.ok("list --layout --detail /newton/prep")
self.match("/newton/prep float32_8\n" + self.match("/newton/prep float32_8\n" +
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000" " [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:00:05.000000 +0000 ]\n" " -> Fri, 23 Mar 2012 10:00:05.000000 +0000 ]\n"
@@ -718,7 +704,7 @@ class TestCmdline(object):
# Remove all data, verify it's missing # Remove all data, verify it's missing
self.ok("remove /newton/prep --start 2000-01-01 --end 2020-01-01") self.ok("remove /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("") # no count requested this time self.match("") # no count requested this time
self.ok("list --detail /newton/prep") self.ok("list -l --detail /newton/prep")
self.match("/newton/prep float32_8\n" + self.match("/newton/prep float32_8\n" +
" (no intervals)\n") " (no intervals)\n")
@@ -736,16 +722,16 @@ class TestCmdline(object):
self.contain("too few arguments") self.contain("too few arguments")
self.fail("destroy /no/such/stream") self.fail("destroy /no/such/stream")
self.contain("No stream at path") self.contain("no stream matched path")
self.fail("destroy -R /no/such/stream") self.fail("destroy -R /no/such/stream")
self.contain("No stream at path") self.contain("no stream matched path")
self.fail("destroy asdfasdf") self.fail("destroy asdfasdf")
self.contain("No stream at path") self.contain("no stream matched path")
# From previous tests, we have: # From previous tests, we have:
self.ok("list") self.ok("list -l")
self.match("/newton/prep float32_8\n" self.match("/newton/prep float32_8\n"
"/newton/raw uint16_6\n" "/newton/raw uint16_6\n"
"/newton/zzz/rawnotch uint16_9\n") "/newton/zzz/rawnotch uint16_9\n")
@@ -761,13 +747,13 @@ class TestCmdline(object):
lines_(self.captured, 7) lines_(self.captured, 7)
# Destroy for real # Destroy for real
self.ok("destroy -R /newton/prep") self.ok("destroy -R /n*/prep")
self.ok("list") self.ok("list -l")
self.match("/newton/raw uint16_6\n" self.match("/newton/raw uint16_6\n"
"/newton/zzz/rawnotch uint16_9\n") "/newton/zzz/rawnotch uint16_9\n")
self.ok("destroy /newton/zzz/rawnotch") self.ok("destroy /newton/zzz/rawnotch")
self.ok("list") self.ok("list -l")
self.match("/newton/raw uint16_6\n") self.match("/newton/raw uint16_6\n")
self.ok("destroy /newton/raw") self.ok("destroy /newton/raw")
@@ -786,18 +772,17 @@ class TestCmdline(object):
self.ok("list") self.ok("list")
self.contain(path) self.contain(path)
# Make sure it was created empty # Make sure it was created empty
self.ok("list --detail --path " + path) self.ok("list --detail " + path)
self.contain("(no intervals)") self.contain("(no intervals)")
def test_12_unicode(self): def test_12_unicode(self):
# Unicode paths. # Unicode paths.
self.ok("destroy /newton/asdf/qwer") self.ok("destroy /newton/asdf/qwer")
self.ok("destroy /newton/prep") self.ok("destroy /newton/prep /newton/raw")
self.ok("destroy /newton/raw")
self.ok("destroy /newton/zzz") self.ok("destroy /newton/zzz")
self.ok(u"create /düsseldorf/raw uint16_6") self.ok(u"create /düsseldorf/raw uint16_6")
self.ok("list --detail") self.ok("list -l --detail")
self.contain(u"/düsseldorf/raw uint16_6") self.contain(u"/düsseldorf/raw uint16_6")
self.contain("(no intervals)") self.contain("(no intervals)")
@@ -883,7 +868,7 @@ class TestCmdline(object):
du_before = nilmdb.utils.diskusage.du(testdb) du_before = nilmdb.utils.diskusage.du(testdb)
# Make sure we have the data we expect # Make sure we have the data we expect
self.ok("list --detail") self.ok("list -l --detail")
self.match("/newton/prep float32_8\n" + self.match("/newton/prep float32_8\n" +
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000" " [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:01:59.991668 +0000 ]\n" " -> Fri, 23 Mar 2012 10:01:59.991668 +0000 ]\n"
@@ -919,7 +904,7 @@ class TestCmdline(object):
self.match("3600\n") self.match("3600\n")
# See the missing chunks in list output # See the missing chunks in list output
self.ok("list --detail") self.ok("list -l --detail")
self.match("/newton/prep float32_8\n" + self.match("/newton/prep float32_8\n" +
" [ Fri, 23 Mar 2012 10:00:00.000000 +0000" " [ Fri, 23 Mar 2012 10:00:00.000000 +0000"
" -> Fri, 23 Mar 2012 10:00:05.000000 +0000 ]\n" " -> Fri, 23 Mar 2012 10:00:05.000000 +0000 ]\n"
@@ -1043,7 +1028,7 @@ class TestCmdline(object):
else: else:
raise AssertionError("data not found at " + seek) raise AssertionError("data not found at " + seek)
# Verify "list" output # Verify "list" output
self.ok("list") self.ok("list -l")
self.match("/" + "/".join(components) + " float32_8\n") self.match("/" + "/".join(components) + " float32_8\n")
# Lots of renames # Lots of renames

View File

@@ -130,6 +130,15 @@ class TestNumpyClient(object):
[4, 5]]])) [4, 5]]]))
in_("wrong number of dimensions", str(e.exception)) in_("wrong number of dimensions", str(e.exception))
# Wrong number of fields
with assert_raises(ValueError) as e:
client.stream_insert_numpy("/test/1",
np.array([[0, 1, 2],
[1, 2, 3],
[3, 4, 5],
[4, 5, 6]]))
in_("wrong number of fields", str(e.exception))
# Unstructured # Unstructured
client.stream_create("/test/2", "float32_8") client.stream_create("/test/2", "float32_8")
client.stream_insert_numpy( client.stream_insert_numpy(