|
|
@@ -18,10 +18,16 @@ from nilmdb.utils.interval import Interval |
|
|
|
from nose.tools import assert_raises |
|
|
|
import unittest |
|
|
|
|
|
|
|
import numpy |
|
|
|
import math |
|
|
|
import json |
|
|
|
import random |
|
|
|
from testutil.helpers import * |
|
|
|
import multiprocessing |
|
|
|
import subprocess |
|
|
|
import traceback |
|
|
|
import os |
|
|
|
import atexit |
|
|
|
import signal |
|
|
|
|
|
|
|
from urllib.request import urlopen |
|
|
|
from nilmtools.filter import ArgumentError |
|
|
@@ -40,28 +46,50 @@ class CommandTester(): |
|
|
|
|
|
|
|
@classmethod |
|
|
|
def setup_class(cls): |
|
|
|
# Use multiprocessing with "spawn" method, so that we can |
|
|
|
# start two fully independent cherrypy instances |
|
|
|
# (needed for copy-wildcard) |
|
|
|
multiprocessing.set_start_method('spawn') |
|
|
|
|
|
|
|
events = [] |
|
|
|
# We need two servers running for "copy_multiple", but |
|
|
|
# cherrypy uses globals and can only run once per process. |
|
|
|
# Using multiprocessing with "spawn" method should work in |
|
|
|
# theory, but is hard to get working when the test suite is |
|
|
|
# spawned directly by nosetests (rather than ./run-tests.py). |
|
|
|
# Instead, just run the real nilmdb-server that got installed |
|
|
|
# along with our nilmdb dependency. |
|
|
|
def terminate_servers(): |
|
|
|
for p in cls.servers: |
|
|
|
p.terminate() |
|
|
|
atexit.register(terminate_servers) |
|
|
|
cls.servers = [] |
|
|
|
for (path, port) in (("tests/testdb1", 32182), |
|
|
|
("tests/testdb2", 32183)): |
|
|
|
def listening(): |
|
|
|
try: |
|
|
|
urlopen(f"http://127.0.0.1:{port}/", timeout=0.1) |
|
|
|
return True |
|
|
|
except Exception as e: |
|
|
|
return False |
|
|
|
|
|
|
|
if listening(): |
|
|
|
raise Exception(f"another server already running on {port}") |
|
|
|
|
|
|
|
recursive_unlink(path) |
|
|
|
event = multiprocessing.Event() |
|
|
|
proc = multiprocessing.Process(target=run_cherrypy_server, |
|
|
|
args=(path, port, event)) |
|
|
|
proc.start() |
|
|
|
events.append(event) |
|
|
|
for event in events: |
|
|
|
if not event.wait(timeout = 10): |
|
|
|
raise AssertionError("server didn't start") |
|
|
|
p = subprocess.Popen(["nilmdb-server", |
|
|
|
"--address", "127.0.0.1", |
|
|
|
"--database", path, |
|
|
|
"--port", str(port), |
|
|
|
"--quiet", |
|
|
|
"--traceback"], |
|
|
|
stdin=subprocess.DEVNULL, |
|
|
|
stdout=subprocess.DEVNULL) |
|
|
|
for i in range(50): |
|
|
|
if listening(): |
|
|
|
break |
|
|
|
time.sleep(0.1) |
|
|
|
else: |
|
|
|
raise Exception(f"server didn't start on port {port}") |
|
|
|
|
|
|
|
@classmethod |
|
|
|
def teardown_class(cls): |
|
|
|
urlopen("http://127.0.0.1:32182/exit/", timeout = 1) |
|
|
|
urlopen("http://127.0.0.1:32183/exit/", timeout = 1) |
|
|
|
for p in cls.servers: |
|
|
|
p.terminate() |
|
|
|
|
|
|
|
def run(self, arg_string, infile=None, outfile=None): |
|
|
|
"""Run a cmdline client with the specified argument string, |
|
|
@@ -613,15 +641,127 @@ class TestAllCommands(CommandTester): |
|
|
|
self.ok(f"-c 2 /sf/raw /sf/out-empty /prep/out-empty") |
|
|
|
self.contain("warning: no periods found; skipping") |
|
|
|
|
|
|
|
def generate_trainola_data(self): |
|
|
|
# Build some fake data for trainola, which is just pulses of varying |
|
|
|
# length. |
|
|
|
client = nilmdb.client.Client(url=self.url) |
|
|
|
|
|
|
|
total_sec = 100 |
|
|
|
fs = 100 |
|
|
|
rg = numpy.random.Generator(numpy.random.MT19937(1234567)) |
|
|
|
path = "/train/data" |
|
|
|
|
|
|
|
# Just build up some random pulses. This uses seeded random numbers, |
|
|
|
# so any changes here will affect the success/failures of tests later. |
|
|
|
client.stream_create(path, "float32_1") |
|
|
|
with client.stream_insert_context(path) as ctx: |
|
|
|
remaining = 0 |
|
|
|
for n in range(fs * total_sec): |
|
|
|
t = n / fs |
|
|
|
data = rg.normal(100) / 100 - 1 |
|
|
|
if remaining > 0: |
|
|
|
remaining -= 1 |
|
|
|
data += 1 |
|
|
|
else: |
|
|
|
if rg.integers(fs * 10 * total_sec) < fs: |
|
|
|
if rg.integers(3) < 2: |
|
|
|
remaining = fs*2 |
|
|
|
else: |
|
|
|
remaining = fs/2 |
|
|
|
line = b"%d %f\n" % (t * 1e6, data) |
|
|
|
ctx.insert(line) |
|
|
|
|
|
|
|
# To view what was made, try: |
|
|
|
if 0: |
|
|
|
subprocess.call(f"nilmtool -u {self.url} extract -s min -e max " + |
|
|
|
f"{path} > /tmp/data", shell=True) |
|
|
|
# then in Octave: a=load("/tmp/data"); plot(a(:,2)); |
|
|
|
if 0: |
|
|
|
for (s, e) in client.stream_intervals(path): |
|
|
|
print(Interval(s,e).human_string()) |
|
|
|
|
|
|
|
# Also generate something with more than 100k data points |
|
|
|
client.stream_create("/train/big", "uint8_1") |
|
|
|
with client.stream_insert_context("/train/big") as ctx: |
|
|
|
for n in range(110000): |
|
|
|
ctx.insert(b"%d 0\n" % n) |
|
|
|
|
|
|
|
def test_09_trainola(self): |
|
|
|
self.main = nilmtools.trainola.main |
|
|
|
client = nilmdb.client.numpyclient.NumpyClient(url=self.url) |
|
|
|
|
|
|
|
self.fail(f"") |
|
|
|
self.ok(f"--help") |
|
|
|
self.ok(f"--version") |
|
|
|
|
|
|
|
self.generate_trainola_data() |
|
|
|
|
|
|
|
self.ok(f"-v") |
|
|
|
def get_json(path): |
|
|
|
with open(path) as f: |
|
|
|
js = f.read().replace('\n', ' ') |
|
|
|
return f"'{js}'" |
|
|
|
|
|
|
|
self.dump() |
|
|
|
# pass a dict as argv[0] |
|
|
|
with assert_raises(KeyError): |
|
|
|
saved_stdout = sys.stdout |
|
|
|
try: |
|
|
|
with open(os.devnull, 'w') as sys.stdout: |
|
|
|
nilmtools.trainola.main([{ "url": self.url }]) |
|
|
|
finally: |
|
|
|
sys.stdout = saved_stdout |
|
|
|
|
|
|
|
# pass no args and they come from sys.argv |
|
|
|
saved_argv = sys.argv |
|
|
|
try: |
|
|
|
sys.argv = [ "prog", "bad-json," ] |
|
|
|
with assert_raises(json.decoder.JSONDecodeError): |
|
|
|
nilmtools.trainola.main() |
|
|
|
finally: |
|
|
|
sys.argv = saved_argv |
|
|
|
|
|
|
|
# catch a bunch of errors based on different json input |
|
|
|
client.stream_create("/train/matches", "uint8_1") |
|
|
|
for (num, error) in [ (1, "no columns"), |
|
|
|
(2, "duplicated columns"), |
|
|
|
(3, "bad column number"), |
|
|
|
(4, "source path '/c/d' does not exist"), |
|
|
|
(5, "destination path '/a/b' does not exist"), |
|
|
|
(6, "missing exemplars"), |
|
|
|
(7, "missing exemplars"), |
|
|
|
(8, "exemplar stream '/e/f' does not exist"), |
|
|
|
(9, "No data in this exemplar"), |
|
|
|
(10, "Too few data points"), |
|
|
|
(11, "Too many data points"), |
|
|
|
(12, "column FOO is not available in source") ]: |
|
|
|
self.fail(get_json(f"tests/data/trainola-bad{num}.js")) |
|
|
|
self.contain(error) |
|
|
|
|
|
|
|
# not enough columns in dest |
|
|
|
self.fail(get_json("tests/data/trainola1.js")) |
|
|
|
self.contain("bad destination column number") |
|
|
|
|
|
|
|
# run normally |
|
|
|
client.stream_destroy("/train/matches") |
|
|
|
client.stream_create("/train/matches", "uint8_2") |
|
|
|
self.ok(get_json("tests/data/trainola1.js")) |
|
|
|
self.contain("matched 10 exemplars") |
|
|
|
|
|
|
|
# check actual matches, since we made up the data |
|
|
|
matches = list(client.stream_extract_numpy("/train/matches")) |
|
|
|
eq_(matches[0].tolist(), [[34000000, 1, 0], |
|
|
|
[36000000, 0, 1], |
|
|
|
[40800000, 1, 0], |
|
|
|
[42800000, 0, 1], |
|
|
|
[60310000, 1, 0], |
|
|
|
[62310000, 0, 1], |
|
|
|
[69290000, 1, 0], |
|
|
|
[71290000, 0, 1], |
|
|
|
[91210000, 1, 0], |
|
|
|
[93210000, 0, 1]]) |
|
|
|
|
|
|
|
# another run using random noise as an exemplar, to get better coverage |
|
|
|
client.stream_create("/train/matches2", "uint8_1") |
|
|
|
self.ok(get_json("tests/data/trainola2.js")) |
|
|
|
|
|
|
|
def test_10_pipewatch(self): |
|
|
|
self.main = nilmtools.pipewatch.main |
|
|
|