nilmdb/tests/test_cmdline.py

543 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import nilmdb
from nilmdb.utils.printf import *
import nilmdb.cmdline
from nose.tools import *
from nose.tools import assert_raises
import itertools
import datetime_tz
import os
import shutil
import sys
import threading
import urllib2
from urllib2 import urlopen, HTTPError
import Queue
import StringIO
import shlex
from test_helpers import *
testdb = "tests/cmdline-testdb"
def server_start(max_results = None):
global test_server, test_db
# Start web app on a custom port
test_db = nilmdb.NilmDB(testdb, sync = False, max_results = max_results)
test_server = nilmdb.Server(test_db, host = "127.0.0.1",
port = 12380, stoppable = False,
fast_shutdown = True,
force_traceback = False)
test_server.start(blocking = False)
def server_stop():
global test_server, test_db
# Close web app
test_server.stop()
test_db.close()
def setup_module():
global test_server, test_db
# Clear out DB
recursive_unlink(testdb)
server_start()
def teardown_module():
server_stop()
# Add an encoding property to StringIO so Python will convert Unicode
# properly when writing or reading.
class UTF8StringIO(StringIO.StringIO):
encoding = 'utf-8'
class TestCmdline(object):
def run(self, arg_string, infile=None, outfile=None):
"""Run a cmdline client with the specified argument string,
passing the given input. Returns a tuple with the output and
exit code"""
# printf("TZ=UTC ./nilmtool.py %s\n", arg_string)
class stdio_wrapper:
def __init__(self, stdin, stdout, stderr):
self.io = (stdin, stdout, stderr)
def __enter__(self):
self.saved = ( sys.stdin, sys.stdout, sys.stderr )
( sys.stdin, sys.stdout, sys.stderr ) = self.io
def __exit__(self, type, value, traceback):
( sys.stdin, sys.stdout, sys.stderr ) = self.saved
# Empty input if none provided
if infile is None:
infile = UTF8StringIO("")
# Capture stderr
errfile = UTF8StringIO()
if outfile is None:
# If no output file, capture stdout with stderr
outfile = errfile
with stdio_wrapper(infile, outfile, errfile) as s:
try:
# shlex doesn't support Unicode very well. Encode the
# string as UTF-8 explicitly before splitting.
args = shlex.split(arg_string.encode('utf-8'))
nilmdb.cmdline.Cmdline(args).run()
sys.exit(0)
except SystemExit as e:
exitcode = e.code
captured = outfile.getvalue()
self.captured = captured
self.exitcode = exitcode
def ok(self, arg_string, infile = None):
self.run(arg_string, infile)
if self.exitcode != 0:
self.dump()
eq_(self.exitcode, 0)
def fail(self, arg_string, infile = None, exitcode = None):
self.run(arg_string, infile)
if exitcode is not None and self.exitcode != exitcode:
self.dump()
eq_(self.exitcode, exitcode)
if self.exitcode == 0:
self.dump()
ne_(self.exitcode, 0)
def contain(self, checkstring):
in_(checkstring, self.captured)
def match(self, checkstring):
eq_(checkstring, self.captured)
def matchfile(self, file):
# Captured data should match file contents exactly
with open(file) as f:
contents = f.read()
if contents != self.captured:
#print contents[1:1000] + "\n"
#print self.captured[1:1000] + "\n"
raise AssertionError("captured data doesn't match " + file)
def matchfilecount(self, file):
# Last line of captured data should match the number of
# non-commented lines in file
count = 0
with open(file) as f:
for line in f:
if line[0] != '#':
count += 1
eq_(self.captured.splitlines()[-1], sprintf("%d", count))
def dump(self):
printf("-----dump start-----\n%s-----dump end-----\n", self.captured)
def test_cmdline_01_basic(self):
# help
self.ok("--help")
self.contain("usage:")
# fail for no args
self.fail("")
# fail for no such option
self.fail("--nosuchoption")
# fail for bad command
self.fail("badcommand")
# try some URL constructions
self.fail("--url http://nosuchurl/ info")
self.contain("Couldn't resolve host 'nosuchurl'")
self.fail("--url nosuchurl info")
self.contain("Couldn't resolve host 'nosuchurl'")
self.fail("-u nosuchurl/foo info")
self.contain("Couldn't resolve host 'nosuchurl'")
self.fail("-u localhost:0 info")
self.contain("couldn't connect to host")
self.ok("-u localhost:12380 info")
self.ok("info")
# Duplicated arguments should fail, but this isn't implemented
# due to it being kind of a pain with argparse.
if 0:
self.fail("-u url1 -u url2 info")
self.contain("duplicated argument")
self.fail("list --detail --detail")
self.contain("duplicated argument")
self.fail("list --detail --path path1 --path path2")
self.contain("duplicated argument")
self.fail("extract --start 2000-01-01 --start 2001-01-02")
self.contain("duplicated argument")
def test_cmdline_02_info(self):
self.ok("info")
self.contain("Server URL: http://localhost:12380/")
self.contain("Server version: " + test_server.version)
self.contain("Server database path")
self.contain("Server database size")
def test_cmdline_03_createlist(self):
# Basic stream tests, like those in test_client.
# No streams
self.ok("list")
self.match("")
# Bad paths
self.fail("create foo/bar/baz PrepData")
self.contain("paths must start with /")
self.fail("create /foo PrepData")
self.contain("invalid path")
# Bad layout type
self.fail("create /newton/prep NoSuchLayout")
self.contain("no such layout")
# Create a few streams
self.ok("create /newton/zzz/rawnotch RawNotchedData")
self.ok("create /newton/prep PrepData")
self.ok("create /newton/raw RawData")
# Should not be able to create a stream with another stream as
# its parent
self.fail("create /newton/prep/blah PrepData")
self.contain("path is subdir of existing node")
# Should not be able to create a stream at a location that
# has other nodes as children
self.fail("create /newton/zzz PrepData")
self.contain("subdirs of this path already exist")
# Verify we got those 3 streams and they're returned in
# alphabetical order.
self.ok("list")
self.match("/newton/prep PrepData\n"
"/newton/raw RawData\n"
"/newton/zzz/rawnotch RawNotchedData\n")
# Match just one type or one path
self.ok("list --path /newton/raw")
self.match("/newton/raw RawData\n")
self.ok("list --layout RawData")
self.match("/newton/raw RawData\n")
# Wildcard matches
self.ok("list --layout Raw*")
self.match("/newton/raw RawData\n"
"/newton/zzz/rawnotch RawNotchedData\n")
self.ok("list --path *zzz* --layout Raw*")
self.match("/newton/zzz/rawnotch RawNotchedData\n")
self.ok("list --path *zzz* --layout Prep*")
self.match("")
def test_cmdline_04_metadata(self):
# Set / get metadata
self.fail("metadata")
self.fail("metadata --get")
self.ok("metadata /newton/prep")
self.match("")
self.ok("metadata /newton/raw --get")
self.match("")
self.ok("metadata /newton/prep --set "
"'description=The Data' "
"v_scale=1.234")
self.ok("metadata /newton/raw --update "
"'description=The Data'")
self.ok("metadata /newton/raw --update "
"v_scale=1.234")
# various parsing tests
self.ok("metadata /newton/raw --update foo=")
self.fail("metadata /newton/raw --update =bar")
self.fail("metadata /newton/raw --update foo==bar")
self.fail("metadata /newton/raw --update foo;bar")
# errors
self.fail("metadata /newton/nosuchstream foo=bar")
self.contain("unrecognized arguments")
self.fail("metadata /newton/nosuchstream")
self.contain("No stream at path")
self.fail("metadata /newton/nosuchstream --set foo=bar")
self.contain("No stream at path")
self.ok("metadata /newton/prep")
self.match("description=The Data\nv_scale=1.234\n")
self.ok("metadata /newton/prep --get")
self.match("description=The Data\nv_scale=1.234\n")
self.ok("metadata /newton/prep --get descr")
self.match("descr=\n")
self.ok("metadata /newton/prep --get description")
self.match("description=The Data\n")
self.ok("metadata /newton/prep --get description v_scale")
self.match("description=The Data\nv_scale=1.234\n")
self.ok("metadata /newton/prep --set "
"'description=The Data'")
self.ok("metadata /newton/prep --get")
self.match("description=The Data\n")
self.fail("metadata /newton/nosuchpath")
self.contain("No stream at path /newton/nosuchpath")
def test_cmdline_05_parsetime(self):
os.environ['TZ'] = "America/New_York"
cmd = nilmdb.cmdline.Cmdline(None)
test = datetime_tz.datetime_tz.now()
eq_(cmd.parse_time(str(test)), test)
test = datetime_tz.datetime_tz.smartparse("20120405 1400-0400")
eq_(cmd.parse_time("hi there 20120405 1400-0400 testing! 123"), test)
eq_(cmd.parse_time("20120405 1800 UTC"), test)
eq_(cmd.parse_time("20120405 1400-0400 UTC"), test)
for badtime in [ "20120405 1400-9999", "hello", "-", "", "14:00" ]:
with assert_raises(ValueError):
x = cmd.parse_time(badtime)
eq_(cmd.parse_time("snapshot-20120405-140000.raw.gz"), test)
eq_(cmd.parse_time("prep-20120405T1400"), test)
def test_cmdline_06_insert(self):
self.ok("insert --help")
self.fail("insert /foo/bar baz qwer")
self.contain("Error getting stream info")
self.fail("insert /newton/prep baz qwer")
self.match("Error opening input file baz\n")
self.fail("insert /newton/prep")
self.contain("Error extracting time")
self.fail("insert --start 19801205 /newton/prep 1 2 3 4")
self.contain("--start can only be used with one input file")
self.fail("insert /newton/prep "
"tests/data/prep-20120323T1000")
# insert pre-timestamped data, from stdin
os.environ['TZ'] = "UTC"
with open("tests/data/prep-20120323T1004-timestamped") as input:
self.ok("insert --none /newton/prep", input)
# insert data with normal timestamper from filename
os.environ['TZ'] = "UTC"
self.ok("insert --rate 120 /newton/prep "
"tests/data/prep-20120323T1000 "
"tests/data/prep-20120323T1002")
# overlap
os.environ['TZ'] = "UTC"
self.fail("insert --rate 120 /newton/prep "
"tests/data/prep-20120323T1004")
self.contain("overlap")
# Just to help test more situations -- stop and restart
# the server now. This tests nilmdb's interval caching,
# at the very least.
server_stop()
server_start()
# still an overlap if we specify a different start
os.environ['TZ'] = "America/New_York"
self.fail("insert --rate 120 --start '03/23/2012 06:05:00' /newton/prep"
" tests/data/prep-20120323T1004")
self.contain("overlap")
# wrong format
os.environ['TZ'] = "UTC"
self.fail("insert --rate 120 /newton/raw "
"tests/data/prep-20120323T1004")
self.contain("Error parsing input data")
# empty data does nothing
self.ok("insert --rate 120 --start '03/23/2012 06:05:00' /newton/prep "
"/dev/null")
# bad start time
self.fail("insert --rate 120 --start 'whatever' /newton/prep /dev/null")
def test_cmdline_07_detail(self):
# Just count the number of lines, it's probably fine
self.ok("list --detail")
lines_(self.captured, 8)
self.ok("list --detail --path *prep")
lines_(self.captured, 4)
self.ok("list --detail --path *prep --start='23 Mar 2012 10:02'")
lines_(self.captured, 3)
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05'")
lines_(self.captured, 2)
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15'")
lines_(self.captured, 2)
self.contain("10:05:15.000")
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'")
lines_(self.captured, 2)
self.contain("10:05:15.500")
self.ok("list --detail --path *prep --start='23 Mar 2012 19:05:15.50'")
lines_(self.captured, 2)
self.contain("no intervals")
self.ok("list --detail --path *prep --start='23 Mar 2012 10:05:15.50'"
+ " --end='23 Mar 2012 10:05:15.50'")
lines_(self.captured, 2)
self.contain("10:05:15.500")
self.ok("list --detail")
lines_(self.captured, 8)
def test_cmdline_08_extract(self):
# nonexistent stream
self.fail("extract /no/such/foo --start 2000-01-01 --end 2020-01-01")
self.contain("Error getting stream info")
# empty ranges return an error
self.fail("extract -a /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:00:30'", exitcode = 2)
self.contain("no data")
self.fail("extract -a /newton/prep " +
"--start '23 Mar 2012 10:00:30.000001' " +
"--end '23 Mar 2012 10:00:30.000001'", exitcode = 2)
self.contain("no data")
self.fail("extract -a /newton/prep " +
"--start '23 Mar 2022 10:00:30' " +
"--end '23 Mar 2022 10:00:30'", exitcode = 2)
self.contain("no data")
# but are ok if we're just counting results
self.ok("extract --count /newton/prep " +
"--start '23 Mar 2012 10:00:30' " +
"--end '23 Mar 2012 10:00:30'")
self.match("0\n")
self.ok("extract -c /newton/prep " +
"--start '23 Mar 2012 10:00:30.000001' " +
"--end '23 Mar 2012 10:00:30.000001'")
self.match("0\n")
# Check various dumps against stored copies of how they should appear
def test(file, start, end, extra=""):
self.ok("extract " + extra + " /newton/prep " +
"--start '23 Mar 2012 " + start + "' " +
"--end '23 Mar 2012 " + end + "'")
self.matchfile("tests/data/extract-" + str(file))
self.ok("extract --count " + extra + " /newton/prep " +
"--start '23 Mar 2012 " + start + "' " +
"--end '23 Mar 2012 " + end + "'")
self.matchfilecount("tests/data/extract-" + str(file))
test(1, "10:00:30", "10:00:31", extra="-a")
test(1, "10:00:30.000000", "10:00:31", extra="-a")
test(2, "10:00:30.000001", "10:00:31")
test(2, "10:00:30.008333", "10:00:31")
test(3, "10:00:30.008333", "10:00:30.008334")
test(3, "10:00:30.008333", "10:00:30.016667")
test(4, "10:00:30.008333", "10:00:30.025")
test(5, "10:00:30", "10:00:31", extra="--annotate --bare")
test(6, "10:00:30", "10:00:31", extra="-b")
# all data put in by tests
self.ok("extract -a /newton/prep --start 2000-01-01 --end 2020-01-01")
lines_(self.captured, 43204)
self.ok("extract -c /newton/prep --start 2000-01-01 --end 2020-01-01")
self.match("43200\n")
def test_cmdline_09_truncated(self):
# Test truncated responses by overriding the nilmdb max_results
server_stop()
server_start(max_results = 2)
self.ok("list --detail")
lines_(self.captured, 8)
server_stop()
server_start()
def test_cmdline_10_destroy(self):
# Delete records
self.ok("destroy --help")
self.fail("destroy")
self.contain("too few arguments")
self.fail("destroy /no/such/stream")
self.contain("No stream at path")
self.fail("destroy asdfasdf")
self.contain("No stream at path")
# From previous tests, we have:
self.ok("list")
self.match("/newton/prep PrepData\n"
"/newton/raw RawData\n"
"/newton/zzz/rawnotch RawNotchedData\n")
# Notice how they're not empty
self.ok("list --detail")
lines_(self.captured, 8)
# Delete some
self.ok("destroy /newton/prep")
self.ok("list")
self.match("/newton/raw RawData\n"
"/newton/zzz/rawnotch RawNotchedData\n")
self.ok("destroy /newton/zzz/rawnotch")
self.ok("list")
self.match("/newton/raw RawData\n")
self.ok("destroy /newton/raw")
self.ok("create /newton/raw RawData")
self.ok("destroy /newton/raw")
self.ok("list")
self.match("")
# Re-create a previously deleted location, and some new ones
rebuild = [ "/newton/prep", "/newton/zzz",
"/newton/raw", "/newton/asdf/qwer" ]
for path in rebuild:
# Create the path
self.ok("create " + path + " PrepData")
self.ok("list")
self.contain(path)
# Make sure it was created empty
self.ok("list --detail --path " + path)
self.contain("(no intervals)")
def test_cmdline_11_unicode(self):
# Unicode paths.
self.ok("destroy /newton/asdf/qwer")
self.ok("destroy /newton/prep")
self.ok("destroy /newton/raw")
self.ok("destroy /newton/zzz")
self.ok(u"create /düsseldorf/raw uint16_6")
self.ok("list --detail")
self.contain(u"/düsseldorf/raw uint16_6")
self.contain("(no intervals)")
# Unicode metadata
self.ok(u"metadata /düsseldorf/raw --set α=beta 'γ'")
self.ok(u"metadata /düsseldorf/raw --update 'α=β ε τ α'")
self.ok(u"metadata /düsseldorf/raw")
self.match(u"α=β ε τ α\nγ\n")