git-svn-id: https://bucket.mit.edu/svn/nilm/nilmdb@10682 ddd99763-3ecb-0310-9145-efcb8ce7c51ftags/bxinterval-last
@@ -103,7 +103,9 @@ class MyCurl(object): | |||
body_str = body.getvalue() | |||
self._check_error(body_str) | |||
return json.loads(body_str) | |||
def close(self): | |||
self.curl.close() | |||
def getjson(self, url, params = None): | |||
"""Simple GET that returns JSON string""" | |||
@@ -130,6 +132,9 @@ class Client(object): | |||
"""Return compact json-encoded version of parameter""" | |||
return json.dumps(data, separators=(',',':')) | |||
def close(self): | |||
self.curl.close() | |||
def geturl(self): | |||
"""Return the URL we're using""" | |||
return self.curl.baseurl | |||
@@ -4,6 +4,8 @@ so it can be more easily tested.""" | |||
from __future__ import absolute_import | |||
from nilmdb.printf import * | |||
import nilmdb.client | |||
import nilmdb.layout | |||
import nilmdb.timestamper | |||
import time | |||
import sys | |||
@@ -12,6 +14,9 @@ import os | |||
import urlparse | |||
import argparse | |||
import fnmatch | |||
import subprocess | |||
from argparse import ArgumentDefaultsHelpFormatter as def_form | |||
version = "0.1" | |||
@@ -20,57 +25,65 @@ class Cmdline(object): | |||
def __init__(self, argv): | |||
self.argv = argv | |||
def parse_opts(self): | |||
def parser_setup(self): | |||
version_string = sprintf("nilmtool %s, client library %s", | |||
version, nilmdb.Client.client_version) | |||
formatter = argparse.ArgumentDefaultsHelpFormatter | |||
parser = argparse.ArgumentParser(add_help = False, | |||
formatter_class = formatter) | |||
self.parser = argparse.ArgumentParser(add_help = False, | |||
formatter_class = def_form) | |||
group = parser.add_argument_group("General options") | |||
group = self.parser.add_argument_group("General options") | |||
group.add_argument("-q", "--quiet", action='store_true', | |||
help='suppress unnecessary messages') | |||
group.add_argument("-h", "--help", action='help', | |||
help='show this help message and exit') | |||
group.add_argument("-V", "--version", action="version", | |||
version=version_string) | |||
group = parser.add_argument_group("Server") | |||
group = self.parser.add_argument_group("Server") | |||
group.add_argument("-u", "--url", action="store", | |||
default="http://localhost:12380/", | |||
help="NilmDB server URL (default: %(default)s)") | |||
sub = parser.add_subparsers(title="Commands", | |||
dest="command", | |||
description="Specify --help after the " | |||
"command for command-specific options.") | |||
sub = self.parser.add_subparsers(title="Commands", | |||
dest="command", | |||
description="Specify --help after " | |||
"the command for command-specific " | |||
"options.") | |||
self.parser_setup_info(sub) | |||
self.parser_setup_list(sub) | |||
self.parser_setup_create(sub) | |||
self.parser_setup_metadata(sub) | |||
self.parser_setup_insert(sub) | |||
# info | |||
def parser_setup_info(self, sub): | |||
cmd = sub.add_parser("info", help="Server information", | |||
formatter_class = formatter, | |||
formatter_class = def_form, | |||
description=""" | |||
List information about the server, like | |||
version. | |||
""") | |||
cmd.set_defaults(handler = self.cmd_info) | |||
# list | |||
def parser_setup_list(self, sub): | |||
cmd = sub.add_parser("list", help="List streams", | |||
formatter_class = formatter, | |||
formatter_class = def_form, | |||
description=""" | |||
List streams available in the database, | |||
optionally filtering by type or path. Wildcards | |||
optionally filtering by layout or path. Wildcards | |||
are accepted. | |||
""") | |||
cmd.set_defaults(handler = self.cmd_list) | |||
group = cmd.add_argument_group("Stream filtering") | |||
group.add_argument("-t", "--type", default="*", | |||
help="Match only this stream type") | |||
group.add_argument("-l", "--layout", default="*", | |||
help="Match only this stream layout") | |||
group.add_argument("-p", "--path", default="*", | |||
help="Match only this path") | |||
# create | |||
def parser_setup_create(self, sub): | |||
cmd = sub.add_parser("create", help="Create a new stream", | |||
formatter_class = formatter, | |||
formatter_class = def_form, | |||
description=""" | |||
Create a new empty stream at the | |||
specified path and with the specifed | |||
@@ -80,10 +93,10 @@ class Cmdline(object): | |||
group = cmd.add_argument_group("Required arguments") | |||
group.add_argument("path", | |||
help="Path of new stream, e.g. /foo/bar") | |||
group.add_argument("type", | |||
group.add_argument("layout", | |||
help="Layout type for new stream, e.g. RawData") | |||
# metadata | |||
def parser_setup_metadata(self, sub): | |||
cmd = sub.add_parser("metadata", help="Get or set stream metadata", | |||
description=""" | |||
Get or set key=value metadata associated with | |||
@@ -108,21 +121,53 @@ class Cmdline(object): | |||
help="Update metadata using provided " | |||
"key=value pairs") | |||
# parse it | |||
self.args = parser.parse_args(self.argv) | |||
def parser_setup_insert(self, sub): | |||
cmd = sub.add_parser("insert", help="Insert data", | |||
description=""" | |||
Insert data into a stream. | |||
""") | |||
cmd.set_defaults(handler = self.cmd_insert) | |||
group = cmd.add_argument_group("Timestamping", | |||
description=""" | |||
If timestamps are already provided in the | |||
input date, use --none. Otherwise, | |||
provide --start, or use --filename to | |||
try to deduce timestamps from the file. | |||
""") | |||
group.add_argument("-r", "--rate", type=float, | |||
help=""" | |||
If needed, rate in Hz (default: based on | |||
stream layout) | |||
""") | |||
exc = group.add_mutually_exclusive_group() | |||
exc.add_argument("-s", "--start", metavar="TIME", | |||
help="Starting timestamp (free-form)") | |||
exc.add_argument("-f", "--filename", action="store_true", | |||
help=""" | |||
Use filenames to determine start time | |||
(default, if filenames are provided) | |||
""") | |||
exc.add_argument("-n", "--none", action="store_true", | |||
help="Timestamp is already present, don't add one") | |||
group = cmd.add_argument_group("Required parameters") | |||
group.add_argument("path", | |||
help="Path of stream, e.g. /foo/bar") | |||
group.add_argument("file", nargs="*", default=['-'], | |||
help="File(s) to insert (default: stdin)") | |||
def die(self, formatstr, *args): | |||
fprintf(sys.stderr, formatstr, *args) | |||
self.client.close() | |||
sys.exit(-1) | |||
def run(self): | |||
self.parse_opts() | |||
# Run parser | |||
self.parser_setup() | |||
self.args = self.parser.parse_args(self.argv) | |||
# Wrap this in a try so we can make sure to close the client | |||
# when we're done. | |||
import gc | |||
gc.collect() # hack - this cleans up/disconnects the old clients. | |||
self.client = nilmdb.Client(self.args.url) | |||
# Make a test connection to make sure things work | |||
@@ -136,6 +181,9 @@ class Cmdline(object): | |||
# here. | |||
self.args.handler() | |||
self.client.close() | |||
sys.exit(0) | |||
def cmd_info(self): | |||
"""Print info about the server""" | |||
printf("Client library version: %s\n", self.client.client_version) | |||
@@ -146,22 +194,22 @@ class Cmdline(object): | |||
def cmd_list(self): | |||
"""List available streams""" | |||
streams = self.client.stream_list() | |||
for (path, type) in streams: | |||
for (path, layout) in streams: | |||
if (fnmatch.fnmatch(path, self.args.path) and | |||
fnmatch.fnmatch(type, self.args.type)): | |||
printf("%s %s\n", path, type) | |||
fnmatch.fnmatch(layout, self.args.layout)): | |||
printf("%s %s\n", path, layout) | |||
def cmd_create(self): | |||
"""Create new stream""" | |||
try: | |||
self.client.stream_create(self.args.path, self.args.type) | |||
self.client.stream_create(self.args.path, self.args.layout) | |||
except nilmdb.client.ClientError as e: | |||
self.die("Error creating stream: %s\n", str(e)) | |||
def cmd_metadata(self): | |||
"""Manipulate metadata""" | |||
if self.args.set is not None or self.args.update is not None: | |||
# Either a set or an update | |||
# Either set, or update | |||
if self.args.set is not None: | |||
keyvals = self.args.set | |||
handler = self.client.stream_set_metadata | |||
@@ -190,6 +238,62 @@ class Cmdline(object): | |||
except nilmdb.client.ClientError as e: | |||
self.die("Error getting metadata: %s\n", str(e)) | |||
for key, value in sorted(data.items()): | |||
# Omit nonexistant keys | |||
if value is None: | |||
value = "" | |||
printf("%s=%s\n", key, value) | |||
def cmd_insert(self): | |||
# Find requested stream | |||
streams = self.client.stream_list(self.args.path) | |||
if len(streams) != 1: | |||
self.die("Error getting stream info for path %s\n", self.args.path) | |||
layout = streams[0][1] | |||
if self.args.start and len(self.args.file) != 1: | |||
self.die("--start can only be used with one input file, for now") | |||
for filename in self.args.file: | |||
if filename == '-': | |||
process = None | |||
infile = sys.stdin | |||
else: | |||
if not os.path.exists(filename): | |||
self.die("Error opening input file %s\n", filename) | |||
try: | |||
# zcat is _much_ faster than python's gzopen | |||
process = subprocess.Popen(["zcat", "-f", filename], | |||
bufsize = -1, | |||
stdin = None, | |||
stderr = None, | |||
stdout = PIPE) | |||
infile = process.stdout | |||
except OSError: # pragma: no cover | |||
self.die("Error spawning zcat process\n") | |||
# Build a timestamper for this file | |||
if self.args.none: | |||
ts = nilmdb.timestamper.TimestamperNull(infile) | |||
else: | |||
# If no rate, see if we can get it from nilmdb.layout | |||
if not self.args.rate: | |||
try: | |||
self.args.rate = nilmdb.layout.named[layout].rate_hz | |||
except KeyError: | |||
self.die("Need to specify --rate\n") | |||
# These will die if they can't parse | |||
if self.args.start: | |||
start = self.parse_time(self.args.start) | |||
else: | |||
start = self.parse_time(filename) | |||
ts = nilmdb.timestamper.TimestamperRate(infile, start, rate) | |||
print "Input file:", filename | |||
print "Timestamper:", ts | |||
print "Start:", start | |||
print "Rate:", rate | |||
self.die("not implemented") |
@@ -271,9 +271,9 @@ class NilmDB(object): | |||
""" | |||
stream_id = self._stream_id(path) | |||
with self.con as con: | |||
con.execute("DELETE FROM metadata " | |||
"WHERE stream_id=?", (stream_id,)) | |||
for key in data: | |||
con.execute("DELETE FROM metadata " | |||
"WHERE stream_id=? AND key=?", (stream_id, key)) | |||
if data[key] != '': | |||
con.execute("INSERT INTO metadata VALUES (?, ?, ?)", | |||
(stream_id, key, data[key])) | |||
@@ -58,7 +58,7 @@ class TimestamperRate(Timestamper): | |||
def __init__(self, file, start, rate, end = None): | |||
""" | |||
file: file name or object | |||
start: Unix timestamp for the first value | |||
rate: 1/rate is added to the timestamp for each line | |||
@@ -87,3 +87,11 @@ class TimestamperNow(Timestamper): | |||
now = datetime_tz.datetime_tz.utcnow().totimestamp() | |||
yield sprintf("%.6f ", now) | |||
Timestamper.__init__(self, file, iterator()) | |||
class TimestamperNull(Timestamper): | |||
"""Timestamper that adds nothing to each line""" | |||
def __init__(self, file): | |||
def iterator(): | |||
while True: | |||
yield "" | |||
Timestamper.__init__(self, file, iterator()) |
@@ -10,7 +10,7 @@ cover-erase= | |||
##cover-branches= # need nose 1.1.3 for this | |||
stop= | |||
verbosity=2 | |||
tests=tests/test_cmdline.py | |||
#tests=tests/test_cmdline.py | |||
#tests=tests/test_layout.py | |||
#tests=tests/test_interval.py | |||
#tests=tests/test_client.py | |||
@@ -148,7 +148,7 @@ class TestClient(object): | |||
result = client.stream_insert("/newton/prep", data) | |||
in_("400 Bad Request", str(e.exception)) | |||
in_("timestamp is not monotonically increasing", str(e.exception)) | |||
# Now try empty data (no server request made) | |||
empty = cStringIO.StringIO("") | |||
data = nilmdb.timestamper.TimestamperRate(empty, start, 120) | |||
@@ -159,7 +159,7 @@ class TestClient(object): | |||
with assert_raises(ClientError) as e: | |||
client.curl.putjson("stream/insert", "", { "path": "/newton/prep" }) | |||
in_("400 Bad Request", str(e.exception)) | |||
in_("no data provided", str(e.exception)) | |||
in_("no data provided", str(e.exception)) | |||
# Now do the real load | |||
data = nilmdb.timestamper.TimestamperRate(testfile, start, 120) | |||
@@ -5,7 +5,7 @@ import nilmdb.cmdline | |||
from nose.tools import * | |||
from nose.tools import assert_raises | |||
import json | |||
import itertools | |||
import itertools | |||
import os | |||
import shutil | |||
import sys | |||
@@ -41,7 +41,7 @@ def teardown_module(): | |||
class TestCmdline(object): | |||
def run(self, arg_string, input_string = "", capture_stderr=True): | |||
def run(self, arg_string, infile=None, outfile=None): | |||
"""Run a cmdline client with the specified argument string, | |||
passing the given input. Returns a tuple with the output and | |||
exit code""" | |||
@@ -53,48 +53,50 @@ class TestCmdline(object): | |||
( sys.stdin, sys.stdout, sys.stderr ) = self.io | |||
def __exit__(self, type, value, traceback): | |||
( sys.stdin, sys.stdout, sys.stderr ) = self.saved | |||
infile = cStringIO.StringIO(input_string) | |||
outfile = cStringIO.StringIO() | |||
if capture_stderr: | |||
errfile = outfile | |||
else: | |||
errfile = sys.stderr | |||
# Empty input if none provided | |||
if infile is None: | |||
infile = cStringIO.StringIO("") | |||
# Capture stderr | |||
errfile = cStringIO.StringIO() | |||
if outfile is None: | |||
# If no output file, capture stdout with stderr | |||
outfile = errfile | |||
with stdio_wrapper(infile, outfile, errfile) as s: | |||
try: | |||
nilmdb.cmdline.Cmdline(shlex.split(arg_string)).run() | |||
sys.exit(0) | |||
except SystemExit as e: | |||
exitcode = e.code | |||
output = outfile.getvalue() | |||
self.output = output | |||
captured = outfile.getvalue() | |||
self.captured = captured | |||
self.exitcode = exitcode | |||
def ok(self, arg_string, input_string = ""): | |||
self.run(arg_string, input_string) | |||
def ok(self, arg_string, infile = None): | |||
self.run(arg_string, infile) | |||
if self.exitcode != 0: | |||
self.dump() | |||
eq_(self.exitcode, 0) | |||
def fail(self, arg_string, input_string = ""): | |||
self.run(arg_string, input_string) | |||
def fail(self, arg_string, infile = None): | |||
self.run(arg_string, infile) | |||
if self.exitcode == 0: | |||
self.dump() | |||
ne_(self.exitcode, 0) | |||
def check(self, checkstring): | |||
in_(checkstring, self.output) | |||
def contain(self, checkstring): | |||
in_(checkstring, self.captured) | |||
def match(self, checkstring): | |||
eq_(checkstring, self.output) | |||
eq_(checkstring, self.captured) | |||
def dump(self): | |||
printf("-----dump start-----\n%s-----dump end-----\n", self.output) | |||
printf("-----dump start-----\n%s-----dump end-----\n", self.captured) | |||
def test_cmdline_1_basic(self): | |||
# help | |||
self.ok("--help") | |||
self.check("usage:") | |||
self.contain("usage:") | |||
# fail for no args | |||
self.fail("") | |||
@@ -107,48 +109,42 @@ class TestCmdline(object): | |||
# try some URL constructions | |||
self.fail("--url http://nosuchurl/ info") | |||
self.check("Couldn't resolve host 'nosuchurl'") | |||
self.contain("Couldn't resolve host 'nosuchurl'") | |||
self.fail("--url nosuchurl info") | |||
self.check("Couldn't resolve host 'nosuchurl'") | |||
self.contain("Couldn't resolve host 'nosuchurl'") | |||
self.fail("-u nosuchurl/foo info") | |||
self.check("Couldn't resolve host 'nosuchurl'") | |||
self.contain("Couldn't resolve host 'nosuchurl'") | |||
self.fail("-u localhost:0 info") | |||
self.check("couldn't connect to host") | |||
self.contain("couldn't connect to host") | |||
self.ok("-u localhost:12380 info") | |||
self.ok("info") | |||
def test_cmdline_2_info(self): | |||
self.ok("info") | |||
self.check("Server URL: http://localhost:12380/") | |||
self.check("Server version: " + test_server.version) | |||
self.contain("Server URL: http://localhost:12380/") | |||
self.contain("Server version: " + test_server.version) | |||
def test_cmdline_3_misc(self): | |||
def test_cmdline_3_createlist(self): | |||
# Basic stream tests, like those in test_client. | |||
# BUG: for some reason these start to hang up! what's going on? | |||
for i in range(100): | |||
print i | |||
self.ok("list") | |||
eq_(1,0) | |||
# No streams | |||
self.ok("list") | |||
self.match("") | |||
# Bad paths | |||
self.fail("create foo/bar/baz PrepData") | |||
self.check("paths must start with /") | |||
self.contain("paths must start with /") | |||
self.fail("create /foo PrepData") | |||
self.check("invalid path") | |||
self.contain("invalid path") | |||
# Bad layout type | |||
self.fail("create /newton/prep NoSuchLayout") | |||
self.check("no such layout") | |||
self.contain("no such layout") | |||
# Create a few streams | |||
self.ok("create /newton/prep PrepData") | |||
@@ -160,25 +156,26 @@ class TestCmdline(object): | |||
self.match("/newton/prep PrepData\n" | |||
"/newton/raw RawData\n" | |||
"/newton/zzz/rawnotch RawNotchedData\n") | |||
# Match just one type or one path | |||
self.ok("list --path /newton/raw") | |||
self.match("/newton/raw RawData\n") | |||
self.ok("list --type RawData") | |||
self.ok("list --layout RawData") | |||
self.match("/newton/raw RawData\n") | |||
# Wildcard matches | |||
self.ok("list --type Raw*") | |||
self.ok("list --layout Raw*") | |||
self.match("/newton/raw RawData\n" | |||
"/newton/zzz/rawnotch RawNotchedData\n") | |||
self.ok("list --path *zzz* --type Raw*") | |||
self.ok("list --path *zzz* --layout Raw*") | |||
self.match("/newton/zzz/rawnotch RawNotchedData\n") | |||
self.ok("list --path *zzz* --type Prep*") | |||
self.ok("list --path *zzz* --layout Prep*") | |||
self.match("") | |||
def test_cmdline_4_metadata(self): | |||
# Set / get metadata | |||
self.fail("metadata") | |||
self.fail("metadata --get") | |||
@@ -199,32 +196,39 @@ class TestCmdline(object): | |||
self.ok("metadata /newton/prep") | |||
self.match("description=The Data\nv_scale=1.234\n") | |||
self.ok("metadata /newton/prep --get") | |||
self.match("description=The Data\nv_scale=1.234\n") | |||
self.ok("metadata /newton/prep --get descr") | |||
self.match("") | |||
self.match("descr=\n") | |||
self.ok("metadata /newton/prep --get description") | |||
self.match("description=The Data\n") | |||
self.ok("metadata /newton/raw") | |||
self.dump() | |||
self.ok("metadata /newton/prep --get description v_scale") | |||
self.match("description=The Data\nv_scale=1.234\n") | |||
self.ok("metadata /newton/prep --set " | |||
"'description=The Data'") | |||
self.ok("metadata /newton/prep --get") | |||
self.match("description=The Data\n") | |||
# client.stream_set_metadata("/newton/prep", meta1) | |||
# client.stream_update_metadata("/newton/prep", {}) | |||
# client.stream_update_metadata("/newton/raw", meta2) | |||
# client.stream_update_metadata("/newton/raw", meta3) | |||
# eq_(client.stream_get_metadata("/newton/prep"), meta1) | |||
# eq_(client.stream_get_metadata("/newton/raw"), meta1) | |||
# eq_(client.stream_get_metadata("/newton/raw", [ "description" ] ), meta2) | |||
# eq_(client.stream_get_metadata("/newton/raw", [ "description", | |||
# "v_scale" ] ), meta1) | |||
self.fail("metadata /newton/nosuchpath") | |||
self.contain("No stream at path /newton/nosuchpath") | |||
# # test wrong types (list instead of dict) | |||
# with assert_raises(ClientError): | |||
# client.stream_set_metadata("/newton/prep", [1,2,3]) | |||
# with assert_raises(ClientError): | |||
# client.stream_update_metadata("/newton/prep", [1,2,3]) | |||
def test_cmdline_5_insert(self): | |||
self.ok("insert --help") | |||
self.fail("insert /foo/bar baz qwer") | |||
self.contain("Error getting stream info") | |||
self.fail("insert /newton/prep baz qwer") | |||
self.match("Error opening input file baz\n") | |||
#self.fail("insert /newton/nosuchpath") | |||
#self.contain("No stream at path /newton/nosuchpath") | |||
# def test_client_3_insert(self): | |||
# client = nilmdb.Client(url = "http://localhost:12380/") | |||
@@ -248,7 +252,7 @@ class TestCmdline(object): | |||
# result = client.stream_insert("/newton/prep", data) | |||
# in_("400 Bad Request", str(e.exception)) | |||
# in_("timestamp is not monotonically increasing", str(e.exception)) | |||
# # Now try empty data (no server request made) | |||
# empty = cStringIO.StringIO("") | |||
# data = nilmdb.timestamper.TimestamperRate(empty, start, 120) | |||
@@ -259,7 +263,7 @@ class TestCmdline(object): | |||
# with assert_raises(ClientError) as e: | |||
# client.curl.putjson("stream/insert", "", { "path": "/newton/prep" }) | |||
# in_("400 Bad Request", str(e.exception)) | |||
# in_("no data provided", str(e.exception)) | |||
# in_("no data provided", str(e.exception)) | |||
# # Now do the real load | |||
# data = nilmdb.timestamper.TimestamperRate(testfile, start, 120) | |||
@@ -54,7 +54,7 @@ class Test00Nilmdb(object): # named 00 so it runs first | |||
with assert_raises(ValueError): | |||
db.stream_create("/foo", "PrepData") | |||
# Bad layout type | |||
with assert_raises(KeyError): | |||
with assert_raises(ValueError): | |||
db.stream_create("/newton/prep", "NoSuchLayout") | |||
# Bad index columns | |||
with assert_raises(KeyError): | |||