Compare commits

..

9 Commits

8 changed files with 91 additions and 56 deletions

View File

@@ -6,6 +6,7 @@ import nilmdb.utils
import nilmdb.client.httpclient import nilmdb.client.httpclient
from nilmdb.client.errors import ClientError from nilmdb.client.errors import ClientError
import re
import time import time
import simplejson as json import simplejson as json
import contextlib import contextlib
@@ -65,7 +66,12 @@ class Client(object):
params["layout"] = layout params["layout"] = layout
if extended: if extended:
params["extended"] = 1 params["extended"] = 1
return self.http.get("stream/list", params) def sort_streams_nicely(x):
"""Human-friendly sort (/stream/2 before /stream/10)"""
num = lambda t: int(t) if t.isdigit() else t
key = lambda k: [ num(c) for c in re.split('([0-9]+)', k[0]) ]
return sorted(x, key = key)
return sort_streams_nicely(self.http.get("stream/list", params))
def stream_get_metadata(self, path, keys = None): def stream_get_metadata(self, path, keys = None):
params = { "path": path } params = { "path": path }
@@ -313,6 +319,11 @@ class StreamInserter(object):
part of a new interval and there may be a gap left in-between.""" part of a new interval and there may be a gap left in-between."""
self._send_block(final = True) self._send_block(final = True)
def send(self):
"""Send any data that we might have buffered up. Does not affect
any other treatment of timestamps or endpoints."""
self._send_block(final = False)
def _get_first_noncomment(self, block): def _get_first_noncomment(self, block):
"""Return the (start, end) indices of the first full line in """Return the (start, end) indices of the first full line in
block that isn't a comment, or raise IndexError if block that isn't a comment, or raise IndexError if

View File

@@ -81,7 +81,7 @@ class Cmdline(object):
def __init__(self, argv = None): def __init__(self, argv = None):
self.argv = argv or sys.argv[1:] self.argv = argv or sys.argv[1:]
self.client = None self.client = None
self.def_url = os.environ.get("NILMDB_URL", "http://localhost:12380") self.def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
self.subcmd = {} self.subcmd = {}
self.complete = Complete() self.complete = Complete()

View File

@@ -9,7 +9,8 @@ def setup(self, sub):
a stream. a stream.
""", """,
usage="%(prog)s path [-g [key ...] | " usage="%(prog)s path [-g [key ...] | "
"-s key=value [...] | -u key=value [...]]") "-s key=value [...] | -u key=value [...]] | "
"-d [key ...]")
cmd.set_defaults(handler = cmd_metadata) cmd.set_defaults(handler = cmd_metadata)
group = cmd.add_argument_group("Required arguments") group = cmd.add_argument_group("Required arguments")
@@ -30,6 +31,9 @@ def setup(self, sub):
help="Update metadata using provided " help="Update metadata using provided "
"key=value pairs", "key=value pairs",
).completer = self.complete.meta_keyval ).completer = self.complete.meta_keyval
exc.add_argument("-d", "--delete", nargs="*", metavar="key",
help="Delete metadata for specified keys (default all)",
).completer = self.complete.meta_key
return cmd return cmd
def cmd_metadata(self): def cmd_metadata(self):
@@ -56,6 +60,16 @@ def cmd_metadata(self):
handler(self.args.path, data) handler(self.args.path, data)
except nilmdb.client.ClientError as e: except nilmdb.client.ClientError as e:
self.die("error setting/updating metadata: %s", str(e)) self.die("error setting/updating metadata: %s", str(e))
elif self.args.delete is not None:
# Delete (by setting values to empty strings)
keys = self.args.delete or None
try:
data = self.client.stream_get_metadata(self.args.path, keys)
for key in data:
data[key] = ""
self.client.stream_update_metadata(self.args.path, data)
except nilmdb.client.ClientError as e:
self.die("error deleting metadata: %s", str(e))
else: else:
# Get (or unspecified) # Get (or unspecified)
keys = self.args.get or None keys = self.args.get or None
@@ -64,7 +78,7 @@ def cmd_metadata(self):
except nilmdb.client.ClientError as e: except nilmdb.client.ClientError as e:
self.die("error getting metadata: %s", str(e)) self.die("error getting metadata: %s", str(e))
for key, value in sorted(data.items()): for key, value in sorted(data.items()):
# Omit nonexistant keys # Print nonexistant keys as having empty value
if value is None: if value is None:
value = "" value = ""
printf("%s=%s\n", key, value) printf("%s=%s\n", key, value)

View File

@@ -79,7 +79,12 @@ class BulkData(object):
if Table.exists(ospath): if Table.exists(ospath):
raise ValueError("stream already exists at this path") raise ValueError("stream already exists at this path")
if os.path.isdir(ospath): if os.path.isdir(ospath):
raise ValueError("subdirs of this path already exist") # Look for any files in subdirectories. Fully empty subdirectories
# are OK; they might be there during a rename
for (root, dirs, files) in os.walk(ospath):
if len(files):
raise ValueError(
"non-empty subdirs of this path already exist")
def _create_parents(self, unicodepath): def _create_parents(self, unicodepath):
"""Verify the path name, and create parent directories if they """Verify the path name, and create parent directories if they
@@ -188,7 +193,6 @@ class BulkData(object):
# Basic checks # Basic checks
if oldospath == newospath: if oldospath == newospath:
raise ValueError("old and new paths are the same") raise ValueError("old and new paths are the same")
self._create_check_ospath(newospath)
# Move the table to a temporary location # Move the table to a temporary location
tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root) tmpdir = tempfile.mkdtemp(prefix = "rename-", dir = self.root)
@@ -196,6 +200,9 @@ class BulkData(object):
os.rename(oldospath, tmppath) os.rename(oldospath, tmppath)
try: try:
# Check destination path
self._create_check_ospath(newospath)
# Create parent dirs for new location # Create parent dirs for new location
self._create_parents(newunicodepath) self._create_parents(newunicodepath)

View File

@@ -174,6 +174,21 @@ class Root(NilmApp):
class Stream(NilmApp): class Stream(NilmApp):
"""Stream-specific operations""" """Stream-specific operations"""
# Helpers
def _get_times(self, start_param, end_param):
(start, end) = (None, None)
if start_param is not None:
start = string_to_timestamp(start_param)
if end_param is not None:
end = string_to_timestamp(end_param)
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError(
"400 Bad Request",
sprintf("start must precede end (%s >= %s)",
start_param, end_param))
return (start, end)
# /stream/list # /stream/list
# /stream/list?layout=float32_8 # /stream/list?layout=float32_8
# /stream/list?path=/newton/prep&extended=1 # /stream/list?path=/newton/prep&extended=1
@@ -302,16 +317,11 @@ class Stream(NilmApp):
body = cherrypy.request.body.read() body = cherrypy.request.body.read()
# Check path and get layout # Check path and get layout
streams = self.db.stream_list(path = path) if len(self.db.stream_list(path = path)) != 1:
if len(streams) != 1: raise cherrypy.HTTPError("404", "No such stream: " + path)
raise cherrypy.HTTPError("404 Not Found", "No such stream")
# Check limits # Check limits
start = string_to_timestamp(start) (start, end) = self._get_times(start, end)
end = string_to_timestamp(end)
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
# Pass the data directly to nilmdb, which will parse it and # Pass the data directly to nilmdb, which will parse it and
# raise a ValueError if there are any problems. # raise a ValueError if there are any problems.
@@ -333,14 +343,7 @@ class Stream(NilmApp):
the interval [start, end). Returns the number of data points the interval [start, end). Returns the number of data points
removed. removed.
""" """
if start is not None: (start, end) = self._get_times(start, end)
start = string_to_timestamp(start)
if end is not None:
end = string_to_timestamp(end)
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
total_removed = 0 total_removed = 0
while True: while True:
(removed, restart) = self.db.stream_remove(path, start, end) (removed, restart) = self.db.stream_remove(path, start, end)
@@ -371,15 +374,7 @@ class Stream(NilmApp):
Note that the response type is the non-standard Note that the response type is the non-standard
'application/x-json-stream' for lack of a better option. 'application/x-json-stream' for lack of a better option.
""" """
if start is not None: (start, end) = self._get_times(start, end)
start = string_to_timestamp(start)
if end is not None:
end = string_to_timestamp(end)
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
if len(self.db.stream_list(path = path)) != 1: if len(self.db.stream_list(path = path)) != 1:
raise cherrypy.HTTPError("404", "No such stream: " + path) raise cherrypy.HTTPError("404", "No such stream: " + path)
@@ -417,21 +412,11 @@ class Stream(NilmApp):
If 'markup' is True, adds comments to the stream denoting each If 'markup' is True, adds comments to the stream denoting each
interval's start and end timestamp. interval's start and end timestamp.
""" """
if start is not None: (start, end) = self._get_times(start, end)
start = string_to_timestamp(start)
if end is not None:
end = string_to_timestamp(end)
# Check parameters
if start is not None and end is not None:
if start >= end:
raise cherrypy.HTTPError("400 Bad Request",
"start must precede end")
# Check path and get layout # Check path and get layout
streams = self.db.stream_list(path = path) if len(self.db.stream_list(path = path)) != 1:
if len(streams) != 1: raise cherrypy.HTTPError("404", "No such stream: " + path)
raise cherrypy.HTTPError("404 Not Found", "No such stream")
@workaround_cp_bug_1200 @workaround_cp_bug_1200
def content(start, end): def content(start, end):

View File

@@ -65,6 +65,14 @@ def parse_time(toparse):
if toparse == "max": if toparse == "max":
return max_timestamp return max_timestamp
# If it starts with @, treat it as a NILM timestamp
# (integer microseconds since epoch)
try:
if toparse[0] == '@':
return int(toparse[1:])
except (ValueError, KeyError):
pass
# If string isn't "now" and doesn't contain at least 4 digits, # If string isn't "now" and doesn't contain at least 4 digits,
# consider it invalid. smartparse might otherwise accept # consider it invalid. smartparse might otherwise accept
# empty strings and strings with just separators. # empty strings and strings with just separators.
@@ -78,14 +86,6 @@ def parse_time(toparse):
except (ValueError, OverflowError): except (ValueError, OverflowError):
pass pass
# If it starts with @, treat it as a NILM timestamp
# (integer microseconds since epoch)
try:
if toparse[0] == '@':
return int(toparse[1:])
except (ValueError, KeyError):
pass
# If it's parseable as a float, treat it as a Unix or NILM # If it's parseable as a float, treat it as a Unix or NILM
# timestamp based on its range. # timestamp based on its range.
try: try:

View File

@@ -311,11 +311,11 @@ class TestClient(object):
# Trigger a curl error in generator # Trigger a curl error in generator
with assert_raises(ServerError) as e: with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl/").next() client.http.get_gen("http://nosuchurl.example.com./").next()
# Trigger a curl error in generator # Trigger a curl error in generator
with assert_raises(ServerError) as e: with assert_raises(ServerError) as e:
client.http.get_gen("http://nosuchurl/").next() client.http.get_gen("http://nosuchurl.example.com./").next()
# Check 404 for missing streams # Check 404 for missing streams
for function in [ client.stream_intervals, client.stream_extract ]: for function in [ client.stream_intervals, client.stream_extract ]:
@@ -460,6 +460,7 @@ class TestClient(object):
ctx.update_start(109) ctx.update_start(109)
ctx.insert("110 1\n") ctx.insert("110 1\n")
ctx.insert("111 1\n") ctx.insert("111 1\n")
ctx.send()
ctx.insert("112 1\n") ctx.insert("112 1\n")
ctx.insert("113 1\n") ctx.insert("113 1\n")
ctx.insert("114 1\n") ctx.insert("114 1\n")

View File

@@ -369,6 +369,8 @@ class TestCmdline(object):
self.contain("No stream at path") self.contain("No stream at path")
self.fail("metadata /newton/nosuchstream --set foo=bar") self.fail("metadata /newton/nosuchstream --set foo=bar")
self.contain("No stream at path") self.contain("No stream at path")
self.fail("metadata /newton/nosuchstream --delete")
self.contain("No stream at path")
self.ok("metadata /newton/prep") self.ok("metadata /newton/prep")
self.match("description=The Data\nv_scale=1.234\n") self.match("description=The Data\nv_scale=1.234\n")
@@ -394,6 +396,19 @@ class TestCmdline(object):
self.fail("metadata /newton/nosuchpath") self.fail("metadata /newton/nosuchpath")
self.contain("No stream at path /newton/nosuchpath") self.contain("No stream at path /newton/nosuchpath")
self.ok("metadata /newton/prep --delete")
self.ok("metadata /newton/prep --get")
self.match("")
self.ok("metadata /newton/prep --set "
"'description=The Data' "
"v_scale=1.234")
self.ok("metadata /newton/prep --delete v_scale")
self.ok("metadata /newton/prep --get")
self.match("description=The Data\n")
self.ok("metadata /newton/prep --set description=")
self.ok("metadata /newton/prep --get")
self.match("")
def test_06_insert(self): def test_06_insert(self):
self.ok("insert --help") self.ok("insert --help")
@@ -1038,10 +1053,12 @@ class TestCmdline(object):
self.contain("old and new paths are the same") self.contain("old and new paths are the same")
check_path("newton", "prep") check_path("newton", "prep")
self.fail("rename /newton/prep /newton") self.fail("rename /newton/prep /newton")
self.contain("subdirs of this path already exist") self.contain("path must contain at least one folder")
self.fail("rename /newton/prep /newton/prep/") self.fail("rename /newton/prep /newton/prep/")
self.contain("invalid path") self.contain("invalid path")
self.ok("rename /newton/prep /newton/foo") self.ok("rename /newton/prep /newton/foo/1")
check_path("newton", "foo", "1")
self.ok("rename /newton/foo/1 /newton/foo")
check_path("newton", "foo") check_path("newton", "foo")
self.ok("rename /newton/foo /totally/different/thing") self.ok("rename /newton/foo /totally/different/thing")
check_path("totally", "different", "thing") check_path("totally", "different", "thing")