Files
nilmtools/tests/test.py

912 lines
34 KiB
Python

# -*- coding: utf-8 -*-
import nilmtools.copy_one
import nilmtools.cleanup
import nilmtools.copy_one
import nilmtools.copy_wildcard
import nilmtools.decimate_auto
import nilmtools.decimate
import nilmtools.insert
import nilmtools.median
import nilmtools.pipewatch
import nilmtools.prep
import nilmtools.sinefit
import nilmtools.trainola
from nilmdb.utils.interval import Interval
from nose.tools import assert_raises
import unittest
import numpy
import math
import json
import random
from testutil.helpers import *
import subprocess
import traceback
import os
import atexit
import signal
import functools
from urllib.request import urlopen
from nilmtools.filter import ArgumentError
def run_cherrypy_server(path, port, event):
db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(path)
server = nilmdb.server.Server(db, host="127.0.0.1",
port=port, stoppable=True)
server.start(blocking = True, event = event)
db.close()
class CommandTester():
url = "http://localhost:32182/"
url2 = "http://localhost:32183/"
@classmethod
def setup_class(cls):
# We need two servers running for "copy_multiple", but
# cherrypy uses globals and can only run once per process.
# Using multiprocessing with "spawn" method should work in
# theory, but is hard to get working when the test suite is
# spawned directly by nosetests (rather than ./run-tests.py).
# Instead, just run the real nilmdb-server that got installed
# along with our nilmdb dependency.
def terminate_servers():
for p in cls.servers:
p.terminate()
atexit.register(terminate_servers)
cls.servers = []
for (path, port) in (("tests/testdb1", 32182),
("tests/testdb2", 32183)):
def listening():
try:
urlopen(f"http://127.0.0.1:{port}/", timeout=0.1)
return True
except Exception as e:
return False
if listening():
raise Exception(f"another server already running on {port}")
recursive_unlink(path)
p = subprocess.Popen(["nilmdb-server",
"--address", "127.0.0.1",
"--database", path,
"--port", str(port),
"--quiet",
"--traceback"],
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL)
for i in range(50):
if listening():
break
time.sleep(0.1)
else:
raise Exception(f"server didn't start on port {port}")
@classmethod
def teardown_class(cls):
for p in cls.servers:
p.terminate()
def run(self, arg_string, infile=None, outfile=None):
"""Run a cmdline client with the specified argument string,
passing the given input. Save the output and exit code."""
os.environ['NILMDB_URL'] = self.url
self.last_args = arg_string
class stdio_wrapper:
def __init__(self, stdin, stdout, stderr):
self.io = (stdin, stdout, stderr)
def __enter__(self):
self.saved = ( sys.stdin, sys.stdout, sys.stderr )
( sys.stdin, sys.stdout, sys.stderr ) = self.io
def __exit__(self, type, value, traceback):
( sys.stdin, sys.stdout, sys.stderr ) = self.saved
# Empty input if none provided
if infile is None:
infile = io.TextIOWrapper(io.BytesIO(b""))
# Capture stderr
errfile = io.TextIOWrapper(io.BytesIO())
if outfile is None:
# If no output file, capture stdout with stderr
outfile = errfile
with stdio_wrapper(infile, outfile, errfile) as s:
try:
args = shlex.split(arg_string)
sys.argv[0] = "test_runner"
self.main(args)
sys.exit(0)
except SystemExit as e:
exitcode = e.code
except Exception as e:
traceback.print_exc()
exitcode = 1
# Capture raw binary output, and also try to decode a Unicode
# string copy.
self.captured_binary = outfile.buffer.getvalue()
try:
outfile.seek(0)
self.captured = outfile.read()
except UnicodeDecodeError:
self.captured = None
self.exitcode = exitcode
def ok(self, arg_string, infile = None):
self.run(arg_string, infile)
if self.exitcode != 0:
self.dump()
eq_(self.exitcode, 0)
def fail(self, arg_string, infile=None, exitcode=None):
self.run(arg_string, infile)
if exitcode is not None and self.exitcode != exitcode:
# Wrong exit code
self.dump()
eq_(self.exitcode, exitcode)
if self.exitcode == 0:
# Success, when we wanted failure
self.dump()
ne_(self.exitcode, 0)
def contain(self, checkstring, contain=True):
if contain:
in_(checkstring, self.captured)
else:
nin_(checkstring, self.captured)
def match(self, checkstring):
eq_(checkstring, self.captured)
def matchfile(self, file):
# Captured data should match file contents exactly
with open(file) as f:
contents = f.read()
if contents != self.captured:
print("--- reference file (first 1000 bytes):\n")
print(contents[0:1000] + "\n")
print("--- captured data (first 1000 bytes):\n")
print(self.captured[0:1000] + "\n")
zipped = itertools.zip_longest(contents, self.captured)
for (n, (a, b)) in enumerate(zipped):
if a != b:
print("--- first difference is at offset", n)
print("--- reference:", repr(a))
print("--- captured:", repr(b))
break
raise AssertionError("captured data doesn't match " + file)
def matchfilecount(self, file):
# Last line of captured data should match the number of
# non-commented lines in file
count = 0
with open(file) as f:
for line in f:
if line[0] != '#':
count += 1
eq_(self.captured.splitlines()[-1], sprintf("%d", count))
def dump(self):
printf("\n===args start===\n%s\n===args end===\n", self.last_args)
printf("===dump start===\n%s===dump end===\n", self.captured)
class TestAllCommands(CommandTester):
def test_00_load_data(self):
client = nilmdb.client.Client(url=self.url)
client.stream_create("/newton/prep", "float32_8")
client.stream_set_metadata("/newton/prep",
{ "description": "newton" })
for ts in ("20120323T1000", "20120323T1002", "20120323T1004"):
start = nilmdb.utils.time.parse_time(ts)
fn = f"tests/data/prep-{ts}"
data = nilmdb.utils.timestamper.TimestamperRate(fn, start, 120)
client.stream_insert("/newton/prep", data);
def test_01_copy(self):
self.main = nilmtools.copy_one.main
client = nilmdb.client.Client(url=self.url)
# basic arguments
self.fail(f"")
self.fail(f"no-such-src no-such-dest")
self.contain("source path no-such-src not found")
self.fail(f"-u {self.url} no-such-src no-such-dest")
# nonexistent dest
self.fail(f"/newton/prep /newton/prep-copy")
self.contain("Destination /newton/prep-copy doesn't exist")
# wrong type
client.stream_create("/newton/prep-copy-wrongtype", "uint16_6")
self.fail(f"/newton/prep /newton/prep-copy-wrongtype")
self.contain("wrong number of fields")
# copy with metadata, and compare
client.stream_create("/newton/prep-copy", "float32_8")
self.ok(f"/newton/prep /newton/prep-copy")
a = list(client.stream_extract("/newton/prep"))
b = list(client.stream_extract("/newton/prep-copy"))
eq_(a, b)
a = client.stream_get_metadata("/newton/prep")
b = client.stream_get_metadata("/newton/prep-copy")
eq_(a, b)
# copy with no metadata
client.stream_create("/newton/prep-copy-nometa", "float32_8")
self.ok(f"--nometa /newton/prep /newton/prep-copy-nometa")
a = list(client.stream_extract("/newton/prep"))
b = list(client.stream_extract("/newton/prep-copy-nometa"))
eq_(a, b)
a = client.stream_get_metadata("/newton/prep")
b = client.stream_get_metadata("/newton/prep-copy-nometa")
ne_(a, b)
def test_02_copy_wildcard(self):
self.main = nilmtools.copy_wildcard.main
client1 = nilmdb.client.Client(url=self.url)
client2 = nilmdb.client.Client(url=self.url2)
# basic arguments
self.fail(f"")
self.fail(f"/newton")
self.fail(f"-u {self.url} -U {self.url} /newton")
self.contain("URL must be different")
# no matches; silent
self.ok(f"-u {self.url} -U {self.url2} /newton")
self.ok(f"-u {self.url} -U {self.url2} /asdf*")
self.ok(f"-u {self.url2} -U {self.url} /newton*")
eq_(client2.stream_list(), [])
# this won't actually copy, but will still create streams
self.ok(f"-u {self.url} -U {self.url2} --dry-run /newton*")
self.contain("Creating destination stream /newton/prep-copy")
eq_(len(list(client2.stream_extract("/newton/prep"))), 0)
# this should copy a bunch
self.ok(f"-u {self.url} -U {self.url2} /*")
self.contain("Creating destination stream /newton/prep-copy", False)
eq_(client1.stream_list(), client2.stream_list())
eq_(list(client1.stream_extract("/newton/prep")),
list(client2.stream_extract("/newton/prep")))
eq_(client1.stream_get_metadata("/newton/prep"),
client2.stream_get_metadata("/newton/prep"))
# repeating it is OK; it just won't recreate streams.
# Let's try with --nometa too
client2.stream_remove("/newton/prep")
client2.stream_destroy("/newton/prep")
self.ok(f"-u {self.url} -U {self.url2} --nometa /newton*")
self.contain("Creating destination stream /newton/prep-copy", False)
self.contain("Creating destination stream /newton/prep", True)
eq_(client1.stream_list(), client2.stream_list())
eq_(list(client1.stream_extract("/newton/prep")),
list(client2.stream_extract("/newton/prep")))
eq_(client2.stream_get_metadata("/newton/prep"), {})
# fill in test cases
self.ok(f"-u {self.url} -U {self.url2} -s 2010 -e 2020 -F /newton*")
def test_03_decimate(self):
self.main = nilmtools.decimate.main
client = nilmdb.client.Client(url=self.url)
# basic arguments
self.fail(f"")
# no dest
self.fail(f"/newton/prep /newton/prep-decimated-1")
self.contain("doesn't exist")
# wrong dest shape
client.stream_create("/newton/prep-decimated-bad", "float32_8")
self.fail(f"/newton/prep /newton/prep-decimated-bad")
self.contain("wrong number of fields")
# bad factor
self.fail(f"/newton/prep -f 1 /newton/prep-decimated-bad")
self.contain("needs to be 2 or more")
# ok, default factor 4
client.stream_create("/newton/prep-decimated-4", "float32_24")
self.ok(f"/newton/prep /newton/prep-decimated-4")
a = client.stream_count("/newton/prep")
b = client.stream_count("/newton/prep-decimated-4")
eq_(a // 4, b)
# factor 10
client.stream_create("/newton/prep-decimated-10", "float32_24")
self.ok(f"/newton/prep -f 10 /newton/prep-decimated-10")
self.contain("Processing")
a = client.stream_count("/newton/prep")
b = client.stream_count("/newton/prep-decimated-10")
eq_(a // 10, b)
# different factor, same target
self.fail(f"/newton/prep -f 16 /newton/prep-decimated-10")
self.contain("Metadata in destination stream")
self.contain("decimate_factor = 10")
self.contain("doesn't match desired data")
self.contain("decimate_factor = 16")
# unless we force it
self.ok(f"/newton/prep -f 16 -F /newton/prep-decimated-10")
a = client.stream_count("/newton/prep")
b = client.stream_count("/newton/prep-decimated-10")
# but all data was already converted, so no more
eq_(a // 10, b)
# if we try to decimate an already-decimated stream, the suggested
# shape is different
self.fail(f"/newton/prep-decimated-4 -f 4 /newton/prep-decimated-16")
self.contain("create /newton/prep-decimated-16 float32_24")
# decimate again
client.stream_create("/newton/prep-decimated-16", "float32_24")
self.ok(f"/newton/prep-decimated-4 -f 4 /newton/prep-decimated-16")
self.contain("Processing")
# check shape suggestion for different input types
for (shape, expected) in (("int32_1", "float64_3"),
("uint32_1", "float64_3"),
("int64_1", "float64_3"),
("uint64_1", "float64_3"),
("float32_1", "float32_3"),
("float64_1", "float64_3")):
client.stream_create(f"/test/{shape}", shape)
self.fail(f"/test/{shape} /test/{shape}-decim")
self.contain(f"create /test/{shape}-decim {expected}")
def test_04_decimate_auto(self):
self.main = nilmtools.decimate_auto.main
client = nilmdb.client.Client(url=self.url)
self.fail(f"")
self.fail(f"--max -1 asdf")
self.contain("bad max")
self.fail(f"/no/such/stream")
self.contain("no stream matched path")
# normal run
self.ok(f"/newton/prep")
# can't auto decimate a decimated stream
self.fail(f"/newton/prep-decimated-16")
self.contain("need to pass the base stream instead")
# decimate prep again, this time much more; also use -F
self.ok(f"-m 10 --force-metadata /newton/pr??")
self.contain("Level 4096 decimation has 9 rows")
# decimate the different shapes
self.ok(f"/test/*")
self.contain("Level 1 decimation has 0 rows")
def test_05_insert(self):
self.main = nilmtools.insert.main
client = nilmdb.client.Client(url=self.url)
self.fail(f"")
self.ok(f"--help")
# mutually exclusive arguments
self.fail(f"--delta --rate 123 /foo bar")
self.fail(f"--live --filename /foo bar")
# Insert from file
client.stream_create("/insert/prep", "float32_8")
t0 = "tests/data/prep-20120323T1000"
t2 = "tests/data/prep-20120323T1002"
t4 = "tests/data/prep-20120323T1004"
self.ok(f"--file --dry-run --rate 120 /insert/prep {t0} {t2} {t4}")
self.contain("Dry run")
# wrong rate
self.fail(f"--file --dry-run --rate 10 /insert/prep {t0} {t2} {t4}")
self.contain("Data is coming in too fast")
# skip forward in time
self.ok(f"--file --dry-run --rate 120 /insert/prep {t0} {t4}")
self.contain("data timestamp behind by 120")
self.contain("Skipping data timestamp forward")
# skip backwards in time
self.fail(f"--file --dry-run --rate 120 /insert/prep {t0} {t2} {t0}")
self.contain("data timestamp ahead by 240")
# skip backwards in time is OK if --skip provided
self.ok(f"--skip -f -D -r 120 insert/prep {t0} {t2} {t0} {t4}")
self.contain("Skipping the remainder of this file")
# Now insert for real
self.ok(f"--skip --file --rate 120 /insert/prep {t0} {t2} {t4}")
self.contain("Done")
# Overlap
self.fail(f"--skip --file --rate 120 /insert/prep {t0}")
self.contain("new data overlaps existing data")
# Not overlap if we change file offset
self.ok(f"--skip --file --rate 120 -o 0 /insert/prep {t0}")
# Data with no timestamp
self.fail(f"-f -r 120 /insert/prep tests/data/prep-notime")
self.contain("No idea what timestamp to use")
# Check intervals so far
eq_(list(client.stream_intervals("/insert/prep")),
[[1332507600000000, 1332507959991668],
[1332511200000000, 1332511319991668]])
# Delta supplied by file
self.ok(f"--file --delta -o 0 /insert/prep {t4}-delta")
eq_(list(client.stream_intervals("/insert/prep")),
[[1332507600000000, 1332507959991668],
[1332511200000000, 1332511319991668],
[1332511440000000, 1332511499000001]])
# Now fake live timestamps by using the delta file, and a
# fake clock that increments one second per call.
def fake_time_now():
nonlocal fake_time_base
ret = fake_time_base
fake_time_base += 1000000
return ret
real_time_now = nilmtools.insert.time_now
nilmtools.insert.time_now = fake_time_now
# Delta supplied by file. This data is too fast because delta
# contains a 50 sec jump
fake_time_base = 1332511560000000
self.fail(f"--live --delta -o 0 /insert/prep {t4}-delta")
self.contain("Data is coming in too fast")
self.contain("data time is Fri, 23 Mar 2012 10:06:55")
self.contain("clock time is only Fri, 23 Mar 2012 10:06:06")
# This data is OK, no jump
fake_time_base = 1332511560000000
self.ok(f"--live --delta -o 0 /insert/prep {t4}-delta2")
# This has unparseable delta
fake_time_base = 1332511560000000
self.fail(f"--live --delta -o 0 /insert/prep {t4}-delta3")
self.contain("can't parse delta")
# Insert some gzipped data, with no timestamp in name
bp1 = "tests/data/bpnilm-raw-1.gz"
bp2 = "tests/data/bpnilm-raw-2.gz"
client.stream_create("/insert/raw", "uint16_6")
self.ok(f"--file /insert/raw {bp1} {bp2}")
# Try truncated data
tr = "tests/data/trunc"
self.ok(f"--file /insert/raw {tr}1 {tr}2 {tr}3 {tr}4")
nilmtools.insert.time_now = real_time_now
def generate_sine_data(self, client, path, data_sec, fs, freq):
# generate raw data
client.stream_create(path, "uint16_2")
with client.stream_insert_context(path) as ctx:
for n in range(fs * data_sec):
t = n / fs
v = math.sin(t * 2 * math.pi * freq)
i = 0.3 * math.sin(3*t) + math.sin(t)
line = b"%d %d %d\n" % (
(t + 1234567890) * 1e6,
v * 32767 + 32768,
i * 32768 + 32768)
ctx.insert(line)
if 0:
for (s, e) in client.stream_intervals(path):
print(Interval(s,e).human_string())
def test_06_sinefit(self):
self.main = nilmtools.sinefit.main
client = nilmdb.client.Client(url=self.url)
self.fail(f"")
self.ok(f"--help")
self.generate_sine_data(client, "/sf/raw", 50, 8000, 60)
client.stream_create("/sf/out-bad", "float32_4")
self.fail(f"--column 1 /sf/raw /sf/out-bad")
self.contain("wrong number of fields")
self.fail(f"--column 1 /sf/raw /sf/out")
self.contain("/sf/out doesn't exist")
# basic run
client.stream_create("/sf/out", "float32_3")
self.ok(f"--column 1 /sf/raw /sf/out")
eq_(client.stream_count("/sf/out"), 3000)
# parameter errors
self.fail(f"--column 0 /sf/raw /sf/out")
self.contain("need a column number")
self.fail(f"/sf/raw /sf/out")
self.contain("need a column number")
self.fail(f"-c 1 --frequency 0 /sf/raw /sf/out")
self.contain("frequency must be")
self.fail(f"-c 1 --min-freq 100 /sf/raw /sf/out")
self.contain("invalid min or max frequency")
self.fail(f"-c 1 --max-freq 5 /sf/raw /sf/out")
self.contain("invalid min or max frequency")
self.fail(f"-c 1 --min-amp -1 /sf/raw /sf/out")
self.contain("min amplitude must be")
# trigger some warnings
client.stream_create("/sf/out2", "float32_3")
self.ok(f"-c 1 -f 500 -e @1234567897000000 /sf/raw /sf/out2")
self.contain("outside valid range")
self.contain("1000 warnings suppressed")
eq_(client.stream_count("/sf/out2"), 0)
self.ok(f"-c 1 -a 40000 -e @1234567898000000 /sf/raw /sf/out2")
self.contain("below minimum threshold")
# get coverage for "advance = N/2" line near end of sinefit,
# where we found a fit but it was after the end of the window,
# so we didn't actually mark anything in this window.
self.ok(f"-c 1 -f 240 -m 50 -e @1234567898010000 /sf/raw /sf/out2")
def test_07_median(self):
self.main = nilmtools.median.main
client = nilmdb.client.Client(url=self.url)
self.fail(f"")
self.ok(f"--help")
client.stream_create("/median/1", "float32_8")
client.stream_create("/median/2", "float32_8")
self.fail("/newton/prep /median/0")
self.contain("doesn't exist")
self.ok("/newton/prep /median/1")
self.ok("--difference /newton/prep /median/2")
def test_08_prep(self):
self.main = nilmtools.prep.main
client = nilmdb.client.Client(url=self.url)
self.fail(f"")
self.ok(f"--help")
self.fail(f"-c 2 /sf/raw /sf/out /prep/out")
self.contain("/prep/out doesn't exist")
# basic usage
client.stream_create("/prep/out", "float32_8")
self.ok(f"-c 2 /sf/raw /sf/out /prep/out")
self.contain("processed 100000")
# test arguments
self.fail(f"/sf/raw /sf/out /prep/out")
self.contain("need a column number")
self.fail(f"-c 0 /sf/raw /sf/out /prep/out")
self.contain("need a column number")
self.fail(f"-c 2 -n 3 /sf/raw /sf/out /prep/out")
self.contain("need 6 columns")
self.fail(f"-c 2 -n 0 /sf/raw /sf/out /prep/out")
self.contain("number of odd harmonics must be")
self.fail(f"-c 2 -N 0 /sf/raw /sf/out /prep/out")
self.contain("number of shifted FFTs must be")
self.ok(f"-c 2 -r 0 /sf/raw /sf/out /prep/out")
self.ok(f"-c 2 -R 0 /sf/raw /sf/out /prep/out")
self.fail(f"-c 2 -r 0 -R 0 /sf/raw /sf/out /prep/out")
self.fail(f"-c 2 /sf/raw /sf/no-sinefit-data /prep/out")
self.contain("sinefit data not found")
self.fail(f"-c 2 /sf/raw /prep/out /prep/out")
self.contain("sinefit data type is float32_8; expected float32_3")
# Limit time so only one row gets passed in
client.stream_create("/prep/tmp", "float32_8")
s = 1234567890000000
e = 1234567890000125
self.ok(f"-c 2 -s {s} -e {e} /sf/raw /sf/out /prep/tmp")
# Lower sampling rate on everything, so that the FFT doesn't
# return all the harmonics, and prep has to fill with zeros.
# Tests the "if N < (nharm * 2):" condition in prep
self.generate_sine_data(client, "/sf/raw-low", 5, 100, 60)
self.main = nilmtools.sinefit.main
client.stream_create("/sf/out-low", "float32_3")
self.ok(f"--column 1 /sf/raw-low /sf/out-low")
self.main = nilmtools.prep.main
client.stream_create("/prep/out-low", "float32_8")
self.ok(f"-c 2 /sf/raw-low /sf/out-low /prep/out-low")
# Test prep with empty sinefit data
client.stream_create("/sf/out-empty", "float32_3")
with client.stream_insert_context("/sf/out-empty",
1034567890123456,
2034567890123456):
pass
client.stream_create("/prep/out-empty", "float32_8")
self.ok(f"-c 2 /sf/raw /sf/out-empty /prep/out-empty")
self.contain("warning: no periods found; skipping")
def generate_trainola_data(self):
# Build some fake data for trainola, which is just pulses of varying
# length.
client = nilmdb.client.Client(url=self.url)
total_sec = 100
fs = 100
rg = numpy.random.Generator(numpy.random.MT19937(1234567))
path = "/train/data"
# Just build up some random pulses. This uses seeded random numbers,
# so any changes here will affect the success/failures of tests later.
client.stream_create(path, "float32_1")
with client.stream_insert_context(path) as ctx:
remaining = 0
for n in range(fs * total_sec):
t = n / fs
data = rg.normal(100) / 100 - 1
if remaining > 0:
remaining -= 1
data += 1
else:
if rg.integers(fs * 10 * total_sec) < fs:
if rg.integers(3) < 2:
remaining = fs*2
else:
remaining = fs/2
line = b"%d %f\n" % (t * 1e6, data)
ctx.insert(line)
# To view what was made, try:
if 0:
subprocess.call(f"nilmtool -u {self.url} extract -s min -e max " +
f"{path} > /tmp/data", shell=True)
# then in Octave: a=load("/tmp/data"); plot(a(:,2));
if 0:
for (s, e) in client.stream_intervals(path):
print(Interval(s,e).human_string())
# Also generate something with more than 100k data points
client.stream_create("/train/big", "uint8_1")
with client.stream_insert_context("/train/big") as ctx:
for n in range(110000):
ctx.insert(b"%d 0\n" % n)
def test_09_trainola(self):
self.main = nilmtools.trainola.main
client = nilmdb.client.numpyclient.NumpyClient(url=self.url)
self.fail(f"")
self.ok(f"--help")
self.ok(f"--version")
self.generate_trainola_data()
def get_json(path):
with open(path) as f:
js = f.read().replace('\n', ' ')
return f"'{js}'"
# pass a dict as argv[0]
with assert_raises(KeyError):
saved_stdout = sys.stdout
try:
with open(os.devnull, 'w') as sys.stdout:
nilmtools.trainola.main([{ "url": self.url }])
finally:
sys.stdout = saved_stdout
# pass no args and they come from sys.argv
saved_argv = sys.argv
try:
sys.argv = [ "prog", "bad-json," ]
with assert_raises(json.decoder.JSONDecodeError):
nilmtools.trainola.main()
finally:
sys.argv = saved_argv
# catch a bunch of errors based on different json input
client.stream_create("/train/matches", "uint8_1")
for (num, error) in [ (1, "no columns"),
(2, "duplicated columns"),
(3, "bad column number"),
(4, "source path '/c/d' does not exist"),
(5, "destination path '/a/b' does not exist"),
(6, "missing exemplars"),
(7, "missing exemplars"),
(8, "exemplar stream '/e/f' does not exist"),
(9, "No data in this exemplar"),
(10, "Too few data points"),
(11, "Too many data points"),
(12, "column FOO is not available in source") ]:
self.fail(get_json(f"tests/data/trainola-bad{num}.js"))
self.contain(error)
# not enough columns in dest
self.fail(get_json("tests/data/trainola1.js"))
self.contain("bad destination column number")
# run normally
client.stream_destroy("/train/matches")
client.stream_create("/train/matches", "uint8_2")
self.ok(get_json("tests/data/trainola1.js"))
self.contain("matched 10 exemplars")
# check actual matches, since we made up the data
matches = list(client.stream_extract_numpy("/train/matches"))
eq_(matches[0].tolist(), [[34000000, 1, 0],
[36000000, 0, 1],
[40800000, 1, 0],
[42800000, 0, 1],
[60310000, 1, 0],
[62310000, 0, 1],
[69290000, 1, 0],
[71290000, 0, 1],
[91210000, 1, 0],
[93210000, 0, 1]])
# another run using random noise as an exemplar, to get better coverage
client.stream_create("/train/matches2", "uint8_1")
self.ok(get_json("tests/data/trainola2.js"))
def test_10_pipewatch(self):
self.main = nilmtools.pipewatch.main
self.fail(f"")
self.ok(f"--help")
lock = "tests/pipewatch.lock"
lk = f"--lock {lock}"
try:
os.unlink(lock)
except OSError:
pass
# try locking so pipewatch will exit (with code 0)
lockfile = open(lock, "w")
nilmdb.utils.lock.exclusive_lock(lockfile)
self.ok(f"{lk} true true")
self.contain("pipewatch process already running")
os.unlink(lock)
# have pipewatch remove its own lock to trigger error later
self.ok(f"{lk} 'rm {lock}' true")
# various cases to get coverage
self.ok(f"{lk} true 'cat >/dev/null'")
self.contain("generator returned 0, consumer returned 0")
self.fail(f"{lk} false true")
self.contain("generator returned 1, consumer returned 0")
self.fail(f"{lk} false false")
self.contain("generator returned 1, consumer returned 1")
self.fail(f"{lk} true false")
self.contain("generator returned 0, consumer returned 1")
self.fail(f"{lk} 'kill -15 $$' true")
self.ok(f"{lk} 'sleep 1 ; echo hi' 'cat >/dev/null'")
self.ok(f"{lk} 'echo hi' 'cat >/dev/null'")
self.fail(f"{lk} --timeout 0.5 'sleep 10 ; echo hi' 'cat >/dev/null'")
self.fail(f"{lk} 'yes' 'head -1 >/dev/null'")
self.fail(f"{lk} false 'exec 2>&-; trap \"sleep 10\" 0 15 ; sleep 10'")
def test_11_cleanup(self):
self.main = nilmtools.cleanup.main
client = nilmdb.client.Client(url=self.url)
# This mostly just gets coverage, doesn't carefully verify behavior
self.fail(f"")
self.ok(f"--help")
self.fail(f"tests/data/cleanup-bad.cfg")
self.contain("unknown units")
client.stream_create("/empty/foo", "uint16_1")
self.ok(f"tests/data/cleanup.cfg")
self.contain("'/nonexistent/bar' did not match any existing streams")
self.contain("no config for existing stream '/empty/foo'")
self.contain("nothing to do (only 0.00 weeks of data present)")
self.contain("specify --yes to actually perform")
self.ok(f"--yes tests/data/cleanup.cfg")
self.contain("removing data before")
self.contain("removing from /sf/raw")
self.ok(f"--estimate tests/data/cleanup.cfg")
self.contain("Total estimated disk usage")
self.contain("MiB")
self.contain("GiB")
self.ok(f"--yes tests/data/cleanup-nodecim.cfg")
self.ok(f"--estimate tests/data/cleanup-nodecim.cfg")
def test_12_misc(self):
# Fill in test cases that were missed by earlier code:
# math.py
with assert_raises(ValueError):
nilmtools.math.sfit4([1], 5)
nilmtools.math.sfit4([1,2], 5)
# filter.py
client = nilmdb.client.numpyclient.NumpyClient(self.url)
client.stream_create("/misc/a", "uint8_1")
client.stream_create("/misc/b", "uint8_1")
with client.stream_insert_context("/misc/a") as ctx:
for n in range(10000):
ctx.insert(b"%d 0\n" % n)
pni = nilmtools.filter.process_numpy_interval
src = nilmtools.filter.get_stream_info(client, "/misc/a")
extractor = functools.partial(
client.stream_extract_numpy, "/misc/a",
layout=src.layout, maxrows=1000)
inserter = functools.partial(
client.stream_insert_numpy_context, "/misc/b")
def func1(*args):
return 0
def func2(*args):
return -1
def func3(array, interval, args, insert_func, last):
if last:
return array.shape[0]
return 0
saved = (sys.stdout, sys.stderr)
try:
with open(os.devnull, 'w') as sys.stdout:
with open(os.devnull, 'w') as sys.stderr:
pni(Interval(0, 10000), extractor, inserter, 100, func1)
with assert_raises(SystemExit):
f = nilmtools.filter.Filter("hello world")
finally:
(sys.stdout, sys.stderr) = saved
with assert_raises(Exception):
pni(Interval(0, 10000), extractor, inserter, 100, func2)
pni(Interval(0, 10000), extractor, inserter, 100000, func3)
with assert_raises(NotImplementedError):
pni(Interval(0, 10000), extractor, inserter, 100000,
nilmtools.filter.example_callback_function)
self.main = nilmtools.filter.main
self.fail(f"")
self.ok(f"--help")
self.fail(f"/misc/a /misc/a")
self.contain("must be different")
self.fail(f"--start HELLOWORLD /misc/a /misc/a")
self.contain("not enough digits for a timestamp")
client.stream_create("/misc/c", "uint8_1")
self.ok(f"--quiet /misc/a /misc/c")
self.contain("Source: /misc/a", False)
self.contain("Generic filter: need to handle")
f = nilmtools.filter.Filter()
parser = f.setup_parser()
args = f.parse_args(["--quiet", "/misc/a", "/misc/c"])
x = f.client_src
x = f.client_dest
for i in f.intervals():
with assert_raises(Exception) as e:
x = f.client_src
in_("client is in use", str(e.exception))
with assert_raises(Exception) as e:
x = f.client_dest
in_("client is in use", str(e.exception))