912 lines
34 KiB
Python
912 lines
34 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import nilmtools.copy_one
|
|
import nilmtools.cleanup
|
|
import nilmtools.copy_one
|
|
import nilmtools.copy_wildcard
|
|
import nilmtools.decimate_auto
|
|
import nilmtools.decimate
|
|
import nilmtools.insert
|
|
import nilmtools.median
|
|
import nilmtools.pipewatch
|
|
import nilmtools.prep
|
|
import nilmtools.sinefit
|
|
import nilmtools.trainola
|
|
|
|
from nilmdb.utils.interval import Interval
|
|
|
|
from nose.tools import assert_raises
|
|
import unittest
|
|
|
|
import numpy
|
|
import math
|
|
import json
|
|
import random
|
|
from testutil.helpers import *
|
|
import subprocess
|
|
import traceback
|
|
import os
|
|
import atexit
|
|
import signal
|
|
import functools
|
|
|
|
from urllib.request import urlopen
|
|
from nilmtools.filter import ArgumentError
|
|
|
|
def run_cherrypy_server(path, port, event):
|
|
db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(path)
|
|
server = nilmdb.server.Server(db, host="127.0.0.1",
|
|
port=port, stoppable=True)
|
|
server.start(blocking = True, event = event)
|
|
db.close()
|
|
|
|
class CommandTester():
|
|
|
|
url = "http://localhost:32182/"
|
|
url2 = "http://localhost:32183/"
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
# We need two servers running for "copy_multiple", but
|
|
# cherrypy uses globals and can only run once per process.
|
|
# Using multiprocessing with "spawn" method should work in
|
|
# theory, but is hard to get working when the test suite is
|
|
# spawned directly by nosetests (rather than ./run-tests.py).
|
|
# Instead, just run the real nilmdb-server that got installed
|
|
# along with our nilmdb dependency.
|
|
def terminate_servers():
|
|
for p in cls.servers:
|
|
p.terminate()
|
|
atexit.register(terminate_servers)
|
|
cls.servers = []
|
|
for (path, port) in (("tests/testdb1", 32182),
|
|
("tests/testdb2", 32183)):
|
|
def listening():
|
|
try:
|
|
urlopen(f"http://127.0.0.1:{port}/", timeout=0.1)
|
|
return True
|
|
except Exception as e:
|
|
return False
|
|
|
|
if listening():
|
|
raise Exception(f"another server already running on {port}")
|
|
|
|
recursive_unlink(path)
|
|
p = subprocess.Popen(["nilmdb-server",
|
|
"--address", "127.0.0.1",
|
|
"--database", path,
|
|
"--port", str(port),
|
|
"--quiet",
|
|
"--traceback"],
|
|
stdin=subprocess.DEVNULL,
|
|
stdout=subprocess.DEVNULL)
|
|
for i in range(50):
|
|
if listening():
|
|
break
|
|
time.sleep(0.1)
|
|
else:
|
|
raise Exception(f"server didn't start on port {port}")
|
|
|
|
@classmethod
|
|
def teardown_class(cls):
|
|
for p in cls.servers:
|
|
p.terminate()
|
|
|
|
def run(self, arg_string, infile=None, outfile=None):
|
|
"""Run a cmdline client with the specified argument string,
|
|
passing the given input. Save the output and exit code."""
|
|
os.environ['NILMDB_URL'] = self.url
|
|
self.last_args = arg_string
|
|
class stdio_wrapper:
|
|
def __init__(self, stdin, stdout, stderr):
|
|
self.io = (stdin, stdout, stderr)
|
|
def __enter__(self):
|
|
self.saved = ( sys.stdin, sys.stdout, sys.stderr )
|
|
( sys.stdin, sys.stdout, sys.stderr ) = self.io
|
|
def __exit__(self, type, value, traceback):
|
|
( sys.stdin, sys.stdout, sys.stderr ) = self.saved
|
|
# Empty input if none provided
|
|
if infile is None:
|
|
infile = io.TextIOWrapper(io.BytesIO(b""))
|
|
# Capture stderr
|
|
errfile = io.TextIOWrapper(io.BytesIO())
|
|
if outfile is None:
|
|
# If no output file, capture stdout with stderr
|
|
outfile = errfile
|
|
with stdio_wrapper(infile, outfile, errfile) as s:
|
|
try:
|
|
args = shlex.split(arg_string)
|
|
sys.argv[0] = "test_runner"
|
|
self.main(args)
|
|
sys.exit(0)
|
|
except SystemExit as e:
|
|
exitcode = e.code
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
exitcode = 1
|
|
|
|
# Capture raw binary output, and also try to decode a Unicode
|
|
# string copy.
|
|
self.captured_binary = outfile.buffer.getvalue()
|
|
try:
|
|
outfile.seek(0)
|
|
self.captured = outfile.read()
|
|
except UnicodeDecodeError:
|
|
self.captured = None
|
|
|
|
self.exitcode = exitcode
|
|
|
|
def ok(self, arg_string, infile = None):
|
|
self.run(arg_string, infile)
|
|
if self.exitcode != 0:
|
|
self.dump()
|
|
eq_(self.exitcode, 0)
|
|
|
|
def fail(self, arg_string, infile=None, exitcode=None):
|
|
self.run(arg_string, infile)
|
|
if exitcode is not None and self.exitcode != exitcode:
|
|
# Wrong exit code
|
|
self.dump()
|
|
eq_(self.exitcode, exitcode)
|
|
if self.exitcode == 0:
|
|
# Success, when we wanted failure
|
|
self.dump()
|
|
ne_(self.exitcode, 0)
|
|
|
|
def contain(self, checkstring, contain=True):
|
|
if contain:
|
|
in_(checkstring, self.captured)
|
|
else:
|
|
nin_(checkstring, self.captured)
|
|
|
|
def match(self, checkstring):
|
|
eq_(checkstring, self.captured)
|
|
|
|
def matchfile(self, file):
|
|
# Captured data should match file contents exactly
|
|
with open(file) as f:
|
|
contents = f.read()
|
|
if contents != self.captured:
|
|
print("--- reference file (first 1000 bytes):\n")
|
|
print(contents[0:1000] + "\n")
|
|
print("--- captured data (first 1000 bytes):\n")
|
|
print(self.captured[0:1000] + "\n")
|
|
zipped = itertools.zip_longest(contents, self.captured)
|
|
for (n, (a, b)) in enumerate(zipped):
|
|
if a != b:
|
|
print("--- first difference is at offset", n)
|
|
print("--- reference:", repr(a))
|
|
print("--- captured:", repr(b))
|
|
break
|
|
raise AssertionError("captured data doesn't match " + file)
|
|
|
|
def matchfilecount(self, file):
|
|
# Last line of captured data should match the number of
|
|
# non-commented lines in file
|
|
count = 0
|
|
with open(file) as f:
|
|
for line in f:
|
|
if line[0] != '#':
|
|
count += 1
|
|
eq_(self.captured.splitlines()[-1], sprintf("%d", count))
|
|
|
|
def dump(self):
|
|
printf("\n===args start===\n%s\n===args end===\n", self.last_args)
|
|
printf("===dump start===\n%s===dump end===\n", self.captured)
|
|
|
|
|
|
class TestAllCommands(CommandTester):
|
|
|
|
def test_00_load_data(self):
|
|
client = nilmdb.client.Client(url=self.url)
|
|
client.stream_create("/newton/prep", "float32_8")
|
|
client.stream_set_metadata("/newton/prep",
|
|
{ "description": "newton" })
|
|
|
|
for ts in ("20120323T1000", "20120323T1002", "20120323T1004"):
|
|
start = nilmdb.utils.time.parse_time(ts)
|
|
fn = f"tests/data/prep-{ts}"
|
|
data = nilmdb.utils.timestamper.TimestamperRate(fn, start, 120)
|
|
client.stream_insert("/newton/prep", data);
|
|
|
|
def test_01_copy(self):
|
|
self.main = nilmtools.copy_one.main
|
|
|
|
client = nilmdb.client.Client(url=self.url)
|
|
|
|
# basic arguments
|
|
self.fail(f"")
|
|
self.fail(f"no-such-src no-such-dest")
|
|
self.contain("source path no-such-src not found")
|
|
self.fail(f"-u {self.url} no-such-src no-such-dest")
|
|
|
|
# nonexistent dest
|
|
self.fail(f"/newton/prep /newton/prep-copy")
|
|
self.contain("Destination /newton/prep-copy doesn't exist")
|
|
|
|
# wrong type
|
|
client.stream_create("/newton/prep-copy-wrongtype", "uint16_6")
|
|
self.fail(f"/newton/prep /newton/prep-copy-wrongtype")
|
|
self.contain("wrong number of fields")
|
|
|
|
# copy with metadata, and compare
|
|
client.stream_create("/newton/prep-copy", "float32_8")
|
|
self.ok(f"/newton/prep /newton/prep-copy")
|
|
a = list(client.stream_extract("/newton/prep"))
|
|
b = list(client.stream_extract("/newton/prep-copy"))
|
|
eq_(a, b)
|
|
a = client.stream_get_metadata("/newton/prep")
|
|
b = client.stream_get_metadata("/newton/prep-copy")
|
|
eq_(a, b)
|
|
|
|
# copy with no metadata
|
|
client.stream_create("/newton/prep-copy-nometa", "float32_8")
|
|
self.ok(f"--nometa /newton/prep /newton/prep-copy-nometa")
|
|
a = list(client.stream_extract("/newton/prep"))
|
|
b = list(client.stream_extract("/newton/prep-copy-nometa"))
|
|
eq_(a, b)
|
|
a = client.stream_get_metadata("/newton/prep")
|
|
b = client.stream_get_metadata("/newton/prep-copy-nometa")
|
|
ne_(a, b)
|
|
|
|
def test_02_copy_wildcard(self):
|
|
self.main = nilmtools.copy_wildcard.main
|
|
|
|
client1 = nilmdb.client.Client(url=self.url)
|
|
client2 = nilmdb.client.Client(url=self.url2)
|
|
|
|
# basic arguments
|
|
self.fail(f"")
|
|
self.fail(f"/newton")
|
|
|
|
self.fail(f"-u {self.url} -U {self.url} /newton")
|
|
self.contain("URL must be different")
|
|
|
|
# no matches; silent
|
|
self.ok(f"-u {self.url} -U {self.url2} /newton")
|
|
self.ok(f"-u {self.url} -U {self.url2} /asdf*")
|
|
self.ok(f"-u {self.url2} -U {self.url} /newton*")
|
|
eq_(client2.stream_list(), [])
|
|
|
|
# this won't actually copy, but will still create streams
|
|
self.ok(f"-u {self.url} -U {self.url2} --dry-run /newton*")
|
|
self.contain("Creating destination stream /newton/prep-copy")
|
|
eq_(len(list(client2.stream_extract("/newton/prep"))), 0)
|
|
|
|
# this should copy a bunch
|
|
self.ok(f"-u {self.url} -U {self.url2} /*")
|
|
self.contain("Creating destination stream /newton/prep-copy", False)
|
|
eq_(client1.stream_list(), client2.stream_list())
|
|
eq_(list(client1.stream_extract("/newton/prep")),
|
|
list(client2.stream_extract("/newton/prep")))
|
|
eq_(client1.stream_get_metadata("/newton/prep"),
|
|
client2.stream_get_metadata("/newton/prep"))
|
|
|
|
# repeating it is OK; it just won't recreate streams.
|
|
# Let's try with --nometa too
|
|
client2.stream_remove("/newton/prep")
|
|
client2.stream_destroy("/newton/prep")
|
|
self.ok(f"-u {self.url} -U {self.url2} --nometa /newton*")
|
|
self.contain("Creating destination stream /newton/prep-copy", False)
|
|
self.contain("Creating destination stream /newton/prep", True)
|
|
eq_(client1.stream_list(), client2.stream_list())
|
|
eq_(list(client1.stream_extract("/newton/prep")),
|
|
list(client2.stream_extract("/newton/prep")))
|
|
eq_(client2.stream_get_metadata("/newton/prep"), {})
|
|
|
|
# fill in test cases
|
|
self.ok(f"-u {self.url} -U {self.url2} -s 2010 -e 2020 -F /newton*")
|
|
|
|
def test_03_decimate(self):
|
|
self.main = nilmtools.decimate.main
|
|
|
|
client = nilmdb.client.Client(url=self.url)
|
|
|
|
# basic arguments
|
|
self.fail(f"")
|
|
|
|
# no dest
|
|
self.fail(f"/newton/prep /newton/prep-decimated-1")
|
|
self.contain("doesn't exist")
|
|
|
|
# wrong dest shape
|
|
client.stream_create("/newton/prep-decimated-bad", "float32_8")
|
|
self.fail(f"/newton/prep /newton/prep-decimated-bad")
|
|
self.contain("wrong number of fields")
|
|
|
|
# bad factor
|
|
self.fail(f"/newton/prep -f 1 /newton/prep-decimated-bad")
|
|
self.contain("needs to be 2 or more")
|
|
|
|
# ok, default factor 4
|
|
client.stream_create("/newton/prep-decimated-4", "float32_24")
|
|
self.ok(f"/newton/prep /newton/prep-decimated-4")
|
|
a = client.stream_count("/newton/prep")
|
|
b = client.stream_count("/newton/prep-decimated-4")
|
|
eq_(a // 4, b)
|
|
|
|
# factor 10
|
|
client.stream_create("/newton/prep-decimated-10", "float32_24")
|
|
self.ok(f"/newton/prep -f 10 /newton/prep-decimated-10")
|
|
self.contain("Processing")
|
|
a = client.stream_count("/newton/prep")
|
|
b = client.stream_count("/newton/prep-decimated-10")
|
|
eq_(a // 10, b)
|
|
|
|
# different factor, same target
|
|
self.fail(f"/newton/prep -f 16 /newton/prep-decimated-10")
|
|
self.contain("Metadata in destination stream")
|
|
self.contain("decimate_factor = 10")
|
|
self.contain("doesn't match desired data")
|
|
self.contain("decimate_factor = 16")
|
|
|
|
# unless we force it
|
|
self.ok(f"/newton/prep -f 16 -F /newton/prep-decimated-10")
|
|
a = client.stream_count("/newton/prep")
|
|
b = client.stream_count("/newton/prep-decimated-10")
|
|
# but all data was already converted, so no more
|
|
eq_(a // 10, b)
|
|
|
|
# if we try to decimate an already-decimated stream, the suggested
|
|
# shape is different
|
|
self.fail(f"/newton/prep-decimated-4 -f 4 /newton/prep-decimated-16")
|
|
self.contain("create /newton/prep-decimated-16 float32_24")
|
|
|
|
# decimate again
|
|
client.stream_create("/newton/prep-decimated-16", "float32_24")
|
|
self.ok(f"/newton/prep-decimated-4 -f 4 /newton/prep-decimated-16")
|
|
self.contain("Processing")
|
|
|
|
# check shape suggestion for different input types
|
|
for (shape, expected) in (("int32_1", "float64_3"),
|
|
("uint32_1", "float64_3"),
|
|
("int64_1", "float64_3"),
|
|
("uint64_1", "float64_3"),
|
|
("float32_1", "float32_3"),
|
|
("float64_1", "float64_3")):
|
|
client.stream_create(f"/test/{shape}", shape)
|
|
self.fail(f"/test/{shape} /test/{shape}-decim")
|
|
self.contain(f"create /test/{shape}-decim {expected}")
|
|
|
|
def test_04_decimate_auto(self):
|
|
self.main = nilmtools.decimate_auto.main
|
|
|
|
client = nilmdb.client.Client(url=self.url)
|
|
|
|
self.fail(f"")
|
|
|
|
self.fail(f"--max -1 asdf")
|
|
self.contain("bad max")
|
|
|
|
self.fail(f"/no/such/stream")
|
|
self.contain("no stream matched path")
|
|
|
|
# normal run
|
|
self.ok(f"/newton/prep")
|
|
|
|
# can't auto decimate a decimated stream
|
|
self.fail(f"/newton/prep-decimated-16")
|
|
self.contain("need to pass the base stream instead")
|
|
|
|
# decimate prep again, this time much more; also use -F
|
|
self.ok(f"-m 10 --force-metadata /newton/pr??")
|
|
self.contain("Level 4096 decimation has 9 rows")
|
|
|
|
# decimate the different shapes
|
|
self.ok(f"/test/*")
|
|
self.contain("Level 1 decimation has 0 rows")
|
|
|
|
def test_05_insert(self):
|
|
self.main = nilmtools.insert.main
|
|
|
|
client = nilmdb.client.Client(url=self.url)
|
|
|
|
self.fail(f"")
|
|
self.ok(f"--help")
|
|
|
|
# mutually exclusive arguments
|
|
self.fail(f"--delta --rate 123 /foo bar")
|
|
self.fail(f"--live --filename /foo bar")
|
|
|
|
# Insert from file
|
|
client.stream_create("/insert/prep", "float32_8")
|
|
|
|
t0 = "tests/data/prep-20120323T1000"
|
|
t2 = "tests/data/prep-20120323T1002"
|
|
t4 = "tests/data/prep-20120323T1004"
|
|
self.ok(f"--file --dry-run --rate 120 /insert/prep {t0} {t2} {t4}")
|
|
self.contain("Dry run")
|
|
|
|
# wrong rate
|
|
self.fail(f"--file --dry-run --rate 10 /insert/prep {t0} {t2} {t4}")
|
|
self.contain("Data is coming in too fast")
|
|
|
|
# skip forward in time
|
|
self.ok(f"--file --dry-run --rate 120 /insert/prep {t0} {t4}")
|
|
self.contain("data timestamp behind by 120")
|
|
self.contain("Skipping data timestamp forward")
|
|
|
|
# skip backwards in time
|
|
self.fail(f"--file --dry-run --rate 120 /insert/prep {t0} {t2} {t0}")
|
|
self.contain("data timestamp ahead by 240")
|
|
|
|
# skip backwards in time is OK if --skip provided
|
|
self.ok(f"--skip -f -D -r 120 insert/prep {t0} {t2} {t0} {t4}")
|
|
self.contain("Skipping the remainder of this file")
|
|
|
|
# Now insert for real
|
|
self.ok(f"--skip --file --rate 120 /insert/prep {t0} {t2} {t4}")
|
|
self.contain("Done")
|
|
|
|
# Overlap
|
|
self.fail(f"--skip --file --rate 120 /insert/prep {t0}")
|
|
self.contain("new data overlaps existing data")
|
|
|
|
# Not overlap if we change file offset
|
|
self.ok(f"--skip --file --rate 120 -o 0 /insert/prep {t0}")
|
|
|
|
# Data with no timestamp
|
|
self.fail(f"-f -r 120 /insert/prep tests/data/prep-notime")
|
|
self.contain("No idea what timestamp to use")
|
|
|
|
# Check intervals so far
|
|
eq_(list(client.stream_intervals("/insert/prep")),
|
|
[[1332507600000000, 1332507959991668],
|
|
[1332511200000000, 1332511319991668]])
|
|
|
|
# Delta supplied by file
|
|
self.ok(f"--file --delta -o 0 /insert/prep {t4}-delta")
|
|
eq_(list(client.stream_intervals("/insert/prep")),
|
|
[[1332507600000000, 1332507959991668],
|
|
[1332511200000000, 1332511319991668],
|
|
[1332511440000000, 1332511499000001]])
|
|
|
|
# Now fake live timestamps by using the delta file, and a
|
|
# fake clock that increments one second per call.
|
|
def fake_time_now():
|
|
nonlocal fake_time_base
|
|
ret = fake_time_base
|
|
fake_time_base += 1000000
|
|
return ret
|
|
real_time_now = nilmtools.insert.time_now
|
|
nilmtools.insert.time_now = fake_time_now
|
|
|
|
# Delta supplied by file. This data is too fast because delta
|
|
# contains a 50 sec jump
|
|
fake_time_base = 1332511560000000
|
|
self.fail(f"--live --delta -o 0 /insert/prep {t4}-delta")
|
|
self.contain("Data is coming in too fast")
|
|
self.contain("data time is Fri, 23 Mar 2012 10:06:55")
|
|
self.contain("clock time is only Fri, 23 Mar 2012 10:06:06")
|
|
|
|
# This data is OK, no jump
|
|
fake_time_base = 1332511560000000
|
|
self.ok(f"--live --delta -o 0 /insert/prep {t4}-delta2")
|
|
|
|
# This has unparseable delta
|
|
fake_time_base = 1332511560000000
|
|
self.fail(f"--live --delta -o 0 /insert/prep {t4}-delta3")
|
|
self.contain("can't parse delta")
|
|
|
|
# Insert some gzipped data, with no timestamp in name
|
|
bp1 = "tests/data/bpnilm-raw-1.gz"
|
|
bp2 = "tests/data/bpnilm-raw-2.gz"
|
|
client.stream_create("/insert/raw", "uint16_6")
|
|
self.ok(f"--file /insert/raw {bp1} {bp2}")
|
|
|
|
# Try truncated data
|
|
tr = "tests/data/trunc"
|
|
self.ok(f"--file /insert/raw {tr}1 {tr}2 {tr}3 {tr}4")
|
|
|
|
nilmtools.insert.time_now = real_time_now
|
|
|
|
def generate_sine_data(self, client, path, data_sec, fs, freq):
|
|
# generate raw data
|
|
client.stream_create(path, "uint16_2")
|
|
with client.stream_insert_context(path) as ctx:
|
|
for n in range(fs * data_sec):
|
|
t = n / fs
|
|
v = math.sin(t * 2 * math.pi * freq)
|
|
i = 0.3 * math.sin(3*t) + math.sin(t)
|
|
line = b"%d %d %d\n" % (
|
|
(t + 1234567890) * 1e6,
|
|
v * 32767 + 32768,
|
|
i * 32768 + 32768)
|
|
ctx.insert(line)
|
|
if 0:
|
|
for (s, e) in client.stream_intervals(path):
|
|
print(Interval(s,e).human_string())
|
|
|
|
|
|
def test_06_sinefit(self):
|
|
self.main = nilmtools.sinefit.main
|
|
client = nilmdb.client.Client(url=self.url)
|
|
|
|
self.fail(f"")
|
|
self.ok(f"--help")
|
|
|
|
self.generate_sine_data(client, "/sf/raw", 50, 8000, 60)
|
|
|
|
client.stream_create("/sf/out-bad", "float32_4")
|
|
self.fail(f"--column 1 /sf/raw /sf/out-bad")
|
|
self.contain("wrong number of fields")
|
|
self.fail(f"--column 1 /sf/raw /sf/out")
|
|
self.contain("/sf/out doesn't exist")
|
|
|
|
# basic run
|
|
client.stream_create("/sf/out", "float32_3")
|
|
self.ok(f"--column 1 /sf/raw /sf/out")
|
|
eq_(client.stream_count("/sf/out"), 3000)
|
|
|
|
# parameter errors
|
|
self.fail(f"--column 0 /sf/raw /sf/out")
|
|
self.contain("need a column number")
|
|
self.fail(f"/sf/raw /sf/out")
|
|
self.contain("need a column number")
|
|
self.fail(f"-c 1 --frequency 0 /sf/raw /sf/out")
|
|
self.contain("frequency must be")
|
|
self.fail(f"-c 1 --min-freq 100 /sf/raw /sf/out")
|
|
self.contain("invalid min or max frequency")
|
|
self.fail(f"-c 1 --max-freq 5 /sf/raw /sf/out")
|
|
self.contain("invalid min or max frequency")
|
|
self.fail(f"-c 1 --min-amp -1 /sf/raw /sf/out")
|
|
self.contain("min amplitude must be")
|
|
|
|
# trigger some warnings
|
|
client.stream_create("/sf/out2", "float32_3")
|
|
self.ok(f"-c 1 -f 500 -e @1234567897000000 /sf/raw /sf/out2")
|
|
self.contain("outside valid range")
|
|
self.contain("1000 warnings suppressed")
|
|
eq_(client.stream_count("/sf/out2"), 0)
|
|
|
|
self.ok(f"-c 1 -a 40000 -e @1234567898000000 /sf/raw /sf/out2")
|
|
self.contain("below minimum threshold")
|
|
|
|
# get coverage for "advance = N/2" line near end of sinefit,
|
|
# where we found a fit but it was after the end of the window,
|
|
# so we didn't actually mark anything in this window.
|
|
self.ok(f"-c 1 -f 240 -m 50 -e @1234567898010000 /sf/raw /sf/out2")
|
|
|
|
def test_07_median(self):
|
|
self.main = nilmtools.median.main
|
|
client = nilmdb.client.Client(url=self.url)
|
|
|
|
self.fail(f"")
|
|
self.ok(f"--help")
|
|
|
|
client.stream_create("/median/1", "float32_8")
|
|
client.stream_create("/median/2", "float32_8")
|
|
self.fail("/newton/prep /median/0")
|
|
self.contain("doesn't exist")
|
|
self.ok("/newton/prep /median/1")
|
|
self.ok("--difference /newton/prep /median/2")
|
|
|
|
def test_08_prep(self):
|
|
self.main = nilmtools.prep.main
|
|
client = nilmdb.client.Client(url=self.url)
|
|
|
|
self.fail(f"")
|
|
self.ok(f"--help")
|
|
|
|
self.fail(f"-c 2 /sf/raw /sf/out /prep/out")
|
|
self.contain("/prep/out doesn't exist")
|
|
|
|
# basic usage
|
|
client.stream_create("/prep/out", "float32_8")
|
|
self.ok(f"-c 2 /sf/raw /sf/out /prep/out")
|
|
self.contain("processed 100000")
|
|
|
|
# test arguments
|
|
self.fail(f"/sf/raw /sf/out /prep/out")
|
|
self.contain("need a column number")
|
|
self.fail(f"-c 0 /sf/raw /sf/out /prep/out")
|
|
self.contain("need a column number")
|
|
self.fail(f"-c 2 -n 3 /sf/raw /sf/out /prep/out")
|
|
self.contain("need 6 columns")
|
|
self.fail(f"-c 2 -n 0 /sf/raw /sf/out /prep/out")
|
|
self.contain("number of odd harmonics must be")
|
|
self.fail(f"-c 2 -N 0 /sf/raw /sf/out /prep/out")
|
|
self.contain("number of shifted FFTs must be")
|
|
self.ok(f"-c 2 -r 0 /sf/raw /sf/out /prep/out")
|
|
self.ok(f"-c 2 -R 0 /sf/raw /sf/out /prep/out")
|
|
self.fail(f"-c 2 -r 0 -R 0 /sf/raw /sf/out /prep/out")
|
|
self.fail(f"-c 2 /sf/raw /sf/no-sinefit-data /prep/out")
|
|
self.contain("sinefit data not found")
|
|
self.fail(f"-c 2 /sf/raw /prep/out /prep/out")
|
|
self.contain("sinefit data type is float32_8; expected float32_3")
|
|
|
|
# Limit time so only one row gets passed in
|
|
client.stream_create("/prep/tmp", "float32_8")
|
|
s = 1234567890000000
|
|
e = 1234567890000125
|
|
self.ok(f"-c 2 -s {s} -e {e} /sf/raw /sf/out /prep/tmp")
|
|
|
|
# Lower sampling rate on everything, so that the FFT doesn't
|
|
# return all the harmonics, and prep has to fill with zeros.
|
|
# Tests the "if N < (nharm * 2):" condition in prep
|
|
self.generate_sine_data(client, "/sf/raw-low", 5, 100, 60)
|
|
self.main = nilmtools.sinefit.main
|
|
client.stream_create("/sf/out-low", "float32_3")
|
|
self.ok(f"--column 1 /sf/raw-low /sf/out-low")
|
|
self.main = nilmtools.prep.main
|
|
client.stream_create("/prep/out-low", "float32_8")
|
|
self.ok(f"-c 2 /sf/raw-low /sf/out-low /prep/out-low")
|
|
|
|
# Test prep with empty sinefit data
|
|
client.stream_create("/sf/out-empty", "float32_3")
|
|
with client.stream_insert_context("/sf/out-empty",
|
|
1034567890123456,
|
|
2034567890123456):
|
|
pass
|
|
client.stream_create("/prep/out-empty", "float32_8")
|
|
self.ok(f"-c 2 /sf/raw /sf/out-empty /prep/out-empty")
|
|
self.contain("warning: no periods found; skipping")
|
|
|
|
def generate_trainola_data(self):
|
|
# Build some fake data for trainola, which is just pulses of varying
|
|
# length.
|
|
client = nilmdb.client.Client(url=self.url)
|
|
|
|
total_sec = 100
|
|
fs = 100
|
|
rg = numpy.random.Generator(numpy.random.MT19937(1234567))
|
|
path = "/train/data"
|
|
|
|
# Just build up some random pulses. This uses seeded random numbers,
|
|
# so any changes here will affect the success/failures of tests later.
|
|
client.stream_create(path, "float32_1")
|
|
with client.stream_insert_context(path) as ctx:
|
|
remaining = 0
|
|
for n in range(fs * total_sec):
|
|
t = n / fs
|
|
data = rg.normal(100) / 100 - 1
|
|
if remaining > 0:
|
|
remaining -= 1
|
|
data += 1
|
|
else:
|
|
if rg.integers(fs * 10 * total_sec) < fs:
|
|
if rg.integers(3) < 2:
|
|
remaining = fs*2
|
|
else:
|
|
remaining = fs/2
|
|
line = b"%d %f\n" % (t * 1e6, data)
|
|
ctx.insert(line)
|
|
|
|
# To view what was made, try:
|
|
if 0:
|
|
subprocess.call(f"nilmtool -u {self.url} extract -s min -e max " +
|
|
f"{path} > /tmp/data", shell=True)
|
|
# then in Octave: a=load("/tmp/data"); plot(a(:,2));
|
|
if 0:
|
|
for (s, e) in client.stream_intervals(path):
|
|
print(Interval(s,e).human_string())
|
|
|
|
# Also generate something with more than 100k data points
|
|
client.stream_create("/train/big", "uint8_1")
|
|
with client.stream_insert_context("/train/big") as ctx:
|
|
for n in range(110000):
|
|
ctx.insert(b"%d 0\n" % n)
|
|
|
|
def test_09_trainola(self):
|
|
self.main = nilmtools.trainola.main
|
|
client = nilmdb.client.numpyclient.NumpyClient(url=self.url)
|
|
|
|
self.fail(f"")
|
|
self.ok(f"--help")
|
|
self.ok(f"--version")
|
|
|
|
self.generate_trainola_data()
|
|
|
|
def get_json(path):
|
|
with open(path) as f:
|
|
js = f.read().replace('\n', ' ')
|
|
return f"'{js}'"
|
|
|
|
# pass a dict as argv[0]
|
|
with assert_raises(KeyError):
|
|
saved_stdout = sys.stdout
|
|
try:
|
|
with open(os.devnull, 'w') as sys.stdout:
|
|
nilmtools.trainola.main([{ "url": self.url }])
|
|
finally:
|
|
sys.stdout = saved_stdout
|
|
|
|
# pass no args and they come from sys.argv
|
|
saved_argv = sys.argv
|
|
try:
|
|
sys.argv = [ "prog", "bad-json," ]
|
|
with assert_raises(json.decoder.JSONDecodeError):
|
|
nilmtools.trainola.main()
|
|
finally:
|
|
sys.argv = saved_argv
|
|
|
|
# catch a bunch of errors based on different json input
|
|
client.stream_create("/train/matches", "uint8_1")
|
|
for (num, error) in [ (1, "no columns"),
|
|
(2, "duplicated columns"),
|
|
(3, "bad column number"),
|
|
(4, "source path '/c/d' does not exist"),
|
|
(5, "destination path '/a/b' does not exist"),
|
|
(6, "missing exemplars"),
|
|
(7, "missing exemplars"),
|
|
(8, "exemplar stream '/e/f' does not exist"),
|
|
(9, "No data in this exemplar"),
|
|
(10, "Too few data points"),
|
|
(11, "Too many data points"),
|
|
(12, "column FOO is not available in source") ]:
|
|
self.fail(get_json(f"tests/data/trainola-bad{num}.js"))
|
|
self.contain(error)
|
|
|
|
# not enough columns in dest
|
|
self.fail(get_json("tests/data/trainola1.js"))
|
|
self.contain("bad destination column number")
|
|
|
|
# run normally
|
|
client.stream_destroy("/train/matches")
|
|
client.stream_create("/train/matches", "uint8_2")
|
|
self.ok(get_json("tests/data/trainola1.js"))
|
|
self.contain("matched 10 exemplars")
|
|
|
|
# check actual matches, since we made up the data
|
|
matches = list(client.stream_extract_numpy("/train/matches"))
|
|
eq_(matches[0].tolist(), [[34000000, 1, 0],
|
|
[36000000, 0, 1],
|
|
[40800000, 1, 0],
|
|
[42800000, 0, 1],
|
|
[60310000, 1, 0],
|
|
[62310000, 0, 1],
|
|
[69290000, 1, 0],
|
|
[71290000, 0, 1],
|
|
[91210000, 1, 0],
|
|
[93210000, 0, 1]])
|
|
|
|
# another run using random noise as an exemplar, to get better coverage
|
|
client.stream_create("/train/matches2", "uint8_1")
|
|
self.ok(get_json("tests/data/trainola2.js"))
|
|
|
|
def test_10_pipewatch(self):
|
|
self.main = nilmtools.pipewatch.main
|
|
|
|
self.fail(f"")
|
|
self.ok(f"--help")
|
|
|
|
lock = "tests/pipewatch.lock"
|
|
lk = f"--lock {lock}"
|
|
|
|
try:
|
|
os.unlink(lock)
|
|
except OSError:
|
|
pass
|
|
|
|
# try locking so pipewatch will exit (with code 0)
|
|
lockfile = open(lock, "w")
|
|
nilmdb.utils.lock.exclusive_lock(lockfile)
|
|
self.ok(f"{lk} true true")
|
|
self.contain("pipewatch process already running")
|
|
os.unlink(lock)
|
|
|
|
# have pipewatch remove its own lock to trigger error later
|
|
self.ok(f"{lk} 'rm {lock}' true")
|
|
|
|
# various cases to get coverage
|
|
self.ok(f"{lk} true 'cat >/dev/null'")
|
|
self.contain("generator returned 0, consumer returned 0")
|
|
self.fail(f"{lk} false true")
|
|
self.contain("generator returned 1, consumer returned 0")
|
|
self.fail(f"{lk} false false")
|
|
self.contain("generator returned 1, consumer returned 1")
|
|
self.fail(f"{lk} true false")
|
|
self.contain("generator returned 0, consumer returned 1")
|
|
self.fail(f"{lk} 'kill -15 $$' true")
|
|
self.ok(f"{lk} 'sleep 1 ; echo hi' 'cat >/dev/null'")
|
|
self.ok(f"{lk} 'echo hi' 'cat >/dev/null'")
|
|
self.fail(f"{lk} --timeout 0.5 'sleep 10 ; echo hi' 'cat >/dev/null'")
|
|
self.fail(f"{lk} 'yes' 'head -1 >/dev/null'")
|
|
self.fail(f"{lk} false 'exec 2>&-; trap \"sleep 10\" 0 15 ; sleep 10'")
|
|
|
|
def test_11_cleanup(self):
|
|
self.main = nilmtools.cleanup.main
|
|
client = nilmdb.client.Client(url=self.url)
|
|
|
|
# This mostly just gets coverage, doesn't carefully verify behavior
|
|
self.fail(f"")
|
|
self.ok(f"--help")
|
|
|
|
self.fail(f"tests/data/cleanup-bad.cfg")
|
|
self.contain("unknown units")
|
|
|
|
client.stream_create("/empty/foo", "uint16_1")
|
|
self.ok(f"tests/data/cleanup.cfg")
|
|
self.contain("'/nonexistent/bar' did not match any existing streams")
|
|
self.contain("no config for existing stream '/empty/foo'")
|
|
self.contain("nothing to do (only 0.00 weeks of data present)")
|
|
self.contain("specify --yes to actually perform")
|
|
|
|
self.ok(f"--yes tests/data/cleanup.cfg")
|
|
self.contain("removing data before")
|
|
self.contain("removing from /sf/raw")
|
|
|
|
self.ok(f"--estimate tests/data/cleanup.cfg")
|
|
self.contain("Total estimated disk usage")
|
|
self.contain("MiB")
|
|
self.contain("GiB")
|
|
|
|
self.ok(f"--yes tests/data/cleanup-nodecim.cfg")
|
|
self.ok(f"--estimate tests/data/cleanup-nodecim.cfg")
|
|
|
|
def test_12_misc(self):
|
|
# Fill in test cases that were missed by earlier code:
|
|
|
|
# math.py
|
|
with assert_raises(ValueError):
|
|
nilmtools.math.sfit4([1], 5)
|
|
nilmtools.math.sfit4([1,2], 5)
|
|
|
|
# filter.py
|
|
client = nilmdb.client.numpyclient.NumpyClient(self.url)
|
|
client.stream_create("/misc/a", "uint8_1")
|
|
client.stream_create("/misc/b", "uint8_1")
|
|
with client.stream_insert_context("/misc/a") as ctx:
|
|
for n in range(10000):
|
|
ctx.insert(b"%d 0\n" % n)
|
|
pni = nilmtools.filter.process_numpy_interval
|
|
src = nilmtools.filter.get_stream_info(client, "/misc/a")
|
|
extractor = functools.partial(
|
|
client.stream_extract_numpy, "/misc/a",
|
|
layout=src.layout, maxrows=1000)
|
|
inserter = functools.partial(
|
|
client.stream_insert_numpy_context, "/misc/b")
|
|
def func1(*args):
|
|
return 0
|
|
def func2(*args):
|
|
return -1
|
|
def func3(array, interval, args, insert_func, last):
|
|
if last:
|
|
return array.shape[0]
|
|
return 0
|
|
saved = (sys.stdout, sys.stderr)
|
|
try:
|
|
with open(os.devnull, 'w') as sys.stdout:
|
|
with open(os.devnull, 'w') as sys.stderr:
|
|
pni(Interval(0, 10000), extractor, inserter, 100, func1)
|
|
with assert_raises(SystemExit):
|
|
f = nilmtools.filter.Filter("hello world")
|
|
finally:
|
|
(sys.stdout, sys.stderr) = saved
|
|
|
|
with assert_raises(Exception):
|
|
pni(Interval(0, 10000), extractor, inserter, 100, func2)
|
|
pni(Interval(0, 10000), extractor, inserter, 100000, func3)
|
|
|
|
with assert_raises(NotImplementedError):
|
|
pni(Interval(0, 10000), extractor, inserter, 100000,
|
|
nilmtools.filter.example_callback_function)
|
|
|
|
self.main = nilmtools.filter.main
|
|
self.fail(f"")
|
|
self.ok(f"--help")
|
|
|
|
self.fail(f"/misc/a /misc/a")
|
|
self.contain("must be different")
|
|
|
|
self.fail(f"--start HELLOWORLD /misc/a /misc/a")
|
|
self.contain("not enough digits for a timestamp")
|
|
|
|
client.stream_create("/misc/c", "uint8_1")
|
|
self.ok(f"--quiet /misc/a /misc/c")
|
|
self.contain("Source: /misc/a", False)
|
|
self.contain("Generic filter: need to handle")
|
|
|
|
f = nilmtools.filter.Filter()
|
|
parser = f.setup_parser()
|
|
args = f.parse_args(["--quiet", "/misc/a", "/misc/c"])
|
|
x = f.client_src
|
|
x = f.client_dest
|
|
for i in f.intervals():
|
|
with assert_raises(Exception) as e:
|
|
x = f.client_src
|
|
in_("client is in use", str(e.exception))
|
|
with assert_raises(Exception) as e:
|
|
x = f.client_dest
|
|
in_("client is in use", str(e.exception))
|