# -*- coding: utf-8 -*- import nilmtools.copy_one import nilmtools.cleanup import nilmtools.copy_one import nilmtools.copy_wildcard import nilmtools.decimate_auto import nilmtools.decimate import nilmtools.insert import nilmtools.median import nilmtools.pipewatch import nilmtools.prep import nilmtools.sinefit import nilmtools.trainola from nilmdb.utils.interval import Interval from nose.tools import assert_raises import unittest import numpy import math import json import random from testutil.helpers import * import subprocess import traceback import os import atexit import signal import functools from urllib.request import urlopen from nilmtools.filter import ArgumentError def run_cherrypy_server(path, port, event): db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(path) server = nilmdb.server.Server(db, host="127.0.0.1", port=port, stoppable=True) server.start(blocking = True, event = event) db.close() class CommandTester(): url = "http://localhost:32182/" url2 = "http://localhost:32183/" @classmethod def setup_class(cls): # We need two servers running for "copy_multiple", but # cherrypy uses globals and can only run once per process. # Using multiprocessing with "spawn" method should work in # theory, but is hard to get working when the test suite is # spawned directly by nosetests (rather than ./run-tests.py). # Instead, just run the real nilmdb-server that got installed # along with our nilmdb dependency. def terminate_servers(): for p in cls.servers: p.terminate() atexit.register(terminate_servers) cls.servers = [] for (path, port) in (("tests/testdb1", 32182), ("tests/testdb2", 32183)): def listening(): try: urlopen(f"http://127.0.0.1:{port}/", timeout=0.1) return True except Exception as e: return False if listening(): raise Exception(f"another server already running on {port}") recursive_unlink(path) p = subprocess.Popen(["nilmdb-server", "--address", "127.0.0.1", "--database", path, "--port", str(port), "--quiet", "--traceback"], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL) for i in range(50): if listening(): break time.sleep(0.1) else: raise Exception(f"server didn't start on port {port}") @classmethod def teardown_class(cls): for p in cls.servers: p.terminate() def run(self, arg_string, infile=None, outfile=None): """Run a cmdline client with the specified argument string, passing the given input. Save the output and exit code.""" os.environ['NILMDB_URL'] = self.url self.last_args = arg_string class stdio_wrapper: def __init__(self, stdin, stdout, stderr): self.io = (stdin, stdout, stderr) def __enter__(self): self.saved = ( sys.stdin, sys.stdout, sys.stderr ) ( sys.stdin, sys.stdout, sys.stderr ) = self.io def __exit__(self, type, value, traceback): ( sys.stdin, sys.stdout, sys.stderr ) = self.saved # Empty input if none provided if infile is None: infile = io.TextIOWrapper(io.BytesIO(b"")) # Capture stderr errfile = io.TextIOWrapper(io.BytesIO()) if outfile is None: # If no output file, capture stdout with stderr outfile = errfile with stdio_wrapper(infile, outfile, errfile) as s: try: args = shlex.split(arg_string) sys.argv[0] = "test_runner" self.main(args) sys.exit(0) except SystemExit as e: exitcode = e.code except Exception as e: traceback.print_exc() exitcode = 1 # Capture raw binary output, and also try to decode a Unicode # string copy. self.captured_binary = outfile.buffer.getvalue() try: outfile.seek(0) self.captured = outfile.read() except UnicodeDecodeError: self.captured = None self.exitcode = exitcode def ok(self, arg_string, infile = None): self.run(arg_string, infile) if self.exitcode != 0: self.dump() eq_(self.exitcode, 0) def fail(self, arg_string, infile=None, exitcode=None): self.run(arg_string, infile) if exitcode is not None and self.exitcode != exitcode: # Wrong exit code self.dump() eq_(self.exitcode, exitcode) if self.exitcode == 0: # Success, when we wanted failure self.dump() ne_(self.exitcode, 0) def contain(self, checkstring, contain=True): if contain: in_(checkstring, self.captured) else: nin_(checkstring, self.captured) def match(self, checkstring): eq_(checkstring, self.captured) def matchfile(self, file): # Captured data should match file contents exactly with open(file) as f: contents = f.read() if contents != self.captured: print("--- reference file (first 1000 bytes):\n") print(contents[0:1000] + "\n") print("--- captured data (first 1000 bytes):\n") print(self.captured[0:1000] + "\n") zipped = itertools.zip_longest(contents, self.captured) for (n, (a, b)) in enumerate(zipped): if a != b: print("--- first difference is at offset", n) print("--- reference:", repr(a)) print("--- captured:", repr(b)) break raise AssertionError("captured data doesn't match " + file) def matchfilecount(self, file): # Last line of captured data should match the number of # non-commented lines in file count = 0 with open(file) as f: for line in f: if line[0] != '#': count += 1 eq_(self.captured.splitlines()[-1], sprintf("%d", count)) def dump(self): printf("\n===args start===\n%s\n===args end===\n", self.last_args) printf("===dump start===\n%s===dump end===\n", self.captured) class TestAllCommands(CommandTester): def test_00_load_data(self): client = nilmdb.client.Client(url=self.url) client.stream_create("/newton/prep", "float32_8") client.stream_set_metadata("/newton/prep", { "description": "newton" }) for ts in ("20120323T1000", "20120323T1002", "20120323T1004"): start = nilmdb.utils.time.parse_time(ts) fn = f"tests/data/prep-{ts}" data = nilmdb.utils.timestamper.TimestamperRate(fn, start, 120) client.stream_insert("/newton/prep", data); def test_01_copy(self): self.main = nilmtools.copy_one.main client = nilmdb.client.Client(url=self.url) # basic arguments self.fail(f"") self.fail(f"no-such-src no-such-dest") self.contain("source path no-such-src not found") self.fail(f"-u {self.url} no-such-src no-such-dest") # nonexistent dest self.fail(f"/newton/prep /newton/prep-copy") self.contain("Destination /newton/prep-copy doesn't exist") # wrong type client.stream_create("/newton/prep-copy-wrongtype", "uint16_6") self.fail(f"/newton/prep /newton/prep-copy-wrongtype") self.contain("wrong number of fields") # copy with metadata, and compare client.stream_create("/newton/prep-copy", "float32_8") self.ok(f"/newton/prep /newton/prep-copy") a = list(client.stream_extract("/newton/prep")) b = list(client.stream_extract("/newton/prep-copy")) eq_(a, b) a = client.stream_get_metadata("/newton/prep") b = client.stream_get_metadata("/newton/prep-copy") eq_(a, b) # copy with no metadata client.stream_create("/newton/prep-copy-nometa", "float32_8") self.ok(f"--nometa /newton/prep /newton/prep-copy-nometa") a = list(client.stream_extract("/newton/prep")) b = list(client.stream_extract("/newton/prep-copy-nometa")) eq_(a, b) a = client.stream_get_metadata("/newton/prep") b = client.stream_get_metadata("/newton/prep-copy-nometa") ne_(a, b) def test_02_copy_wildcard(self): self.main = nilmtools.copy_wildcard.main client1 = nilmdb.client.Client(url=self.url) client2 = nilmdb.client.Client(url=self.url2) # basic arguments self.fail(f"") self.fail(f"/newton") self.fail(f"-u {self.url} -U {self.url} /newton") self.contain("URL must be different") # no matches; silent self.ok(f"-u {self.url} -U {self.url2} /newton") self.ok(f"-u {self.url} -U {self.url2} /asdf*") self.ok(f"-u {self.url2} -U {self.url} /newton*") eq_(client2.stream_list(), []) # this won't actually copy, but will still create streams self.ok(f"-u {self.url} -U {self.url2} --dry-run /newton*") self.contain("Creating destination stream /newton/prep-copy") eq_(len(list(client2.stream_extract("/newton/prep"))), 0) # this should copy a bunch self.ok(f"-u {self.url} -U {self.url2} /*") self.contain("Creating destination stream /newton/prep-copy", False) eq_(client1.stream_list(), client2.stream_list()) eq_(list(client1.stream_extract("/newton/prep")), list(client2.stream_extract("/newton/prep"))) eq_(client1.stream_get_metadata("/newton/prep"), client2.stream_get_metadata("/newton/prep")) # repeating it is OK; it just won't recreate streams. # Let's try with --nometa too client2.stream_remove("/newton/prep") client2.stream_destroy("/newton/prep") self.ok(f"-u {self.url} -U {self.url2} --nometa /newton*") self.contain("Creating destination stream /newton/prep-copy", False) self.contain("Creating destination stream /newton/prep", True) eq_(client1.stream_list(), client2.stream_list()) eq_(list(client1.stream_extract("/newton/prep")), list(client2.stream_extract("/newton/prep"))) eq_(client2.stream_get_metadata("/newton/prep"), {}) # fill in test cases self.ok(f"-u {self.url} -U {self.url2} -s 2010 -e 2020 -F /newton*") def test_03_decimate(self): self.main = nilmtools.decimate.main client = nilmdb.client.Client(url=self.url) # basic arguments self.fail(f"") # no dest self.fail(f"/newton/prep /newton/prep-decimated-1") self.contain("doesn't exist") # wrong dest shape client.stream_create("/newton/prep-decimated-bad", "float32_8") self.fail(f"/newton/prep /newton/prep-decimated-bad") self.contain("wrong number of fields") # bad factor self.fail(f"/newton/prep -f 1 /newton/prep-decimated-bad") self.contain("needs to be 2 or more") # ok, default factor 4 client.stream_create("/newton/prep-decimated-4", "float32_24") self.ok(f"/newton/prep /newton/prep-decimated-4") a = client.stream_count("/newton/prep") b = client.stream_count("/newton/prep-decimated-4") eq_(a // 4, b) # factor 10 client.stream_create("/newton/prep-decimated-10", "float32_24") self.ok(f"/newton/prep -f 10 /newton/prep-decimated-10") self.contain("Processing") a = client.stream_count("/newton/prep") b = client.stream_count("/newton/prep-decimated-10") eq_(a // 10, b) # different factor, same target self.fail(f"/newton/prep -f 16 /newton/prep-decimated-10") self.contain("Metadata in destination stream") self.contain("decimate_factor = 10") self.contain("doesn't match desired data") self.contain("decimate_factor = 16") # unless we force it self.ok(f"/newton/prep -f 16 -F /newton/prep-decimated-10") a = client.stream_count("/newton/prep") b = client.stream_count("/newton/prep-decimated-10") # but all data was already converted, so no more eq_(a // 10, b) # if we try to decimate an already-decimated stream, the suggested # shape is different self.fail(f"/newton/prep-decimated-4 -f 4 /newton/prep-decimated-16") self.contain("create /newton/prep-decimated-16 float32_24") # decimate again client.stream_create("/newton/prep-decimated-16", "float32_24") self.ok(f"/newton/prep-decimated-4 -f 4 /newton/prep-decimated-16") self.contain("Processing") # check shape suggestion for different input types for (shape, expected) in (("int32_1", "float64_3"), ("uint32_1", "float64_3"), ("int64_1", "float64_3"), ("uint64_1", "float64_3"), ("float32_1", "float32_3"), ("float64_1", "float64_3")): client.stream_create(f"/test/{shape}", shape) self.fail(f"/test/{shape} /test/{shape}-decim") self.contain(f"create /test/{shape}-decim {expected}") def test_04_decimate_auto(self): self.main = nilmtools.decimate_auto.main client = nilmdb.client.Client(url=self.url) self.fail(f"") self.fail(f"--max -1 asdf") self.contain("bad max") self.fail(f"/no/such/stream") self.contain("no stream matched path") # normal run self.ok(f"/newton/prep") # can't auto decimate a decimated stream self.fail(f"/newton/prep-decimated-16") self.contain("need to pass the base stream instead") # decimate prep again, this time much more; also use -F self.ok(f"-m 10 --force-metadata /newton/pr??") self.contain("Level 4096 decimation has 9 rows") # decimate the different shapes self.ok(f"/test/*") self.contain("Level 1 decimation has 0 rows") def test_05_insert(self): self.main = nilmtools.insert.main client = nilmdb.client.Client(url=self.url) self.fail(f"") self.ok(f"--help") # mutually exclusive arguments self.fail(f"--delta --rate 123 /foo bar") self.fail(f"--live --filename /foo bar") # Insert from file client.stream_create("/insert/prep", "float32_8") t0 = "tests/data/prep-20120323T1000" t2 = "tests/data/prep-20120323T1002" t4 = "tests/data/prep-20120323T1004" self.ok(f"--file --dry-run --rate 120 /insert/prep {t0} {t2} {t4}") self.contain("Dry run") # wrong rate self.fail(f"--file --dry-run --rate 10 /insert/prep {t0} {t2} {t4}") self.contain("Data is coming in too fast") # skip forward in time self.ok(f"--file --dry-run --rate 120 /insert/prep {t0} {t4}") self.contain("data timestamp behind by 120") self.contain("Skipping data timestamp forward") # skip backwards in time self.fail(f"--file --dry-run --rate 120 /insert/prep {t0} {t2} {t0}") self.contain("data timestamp ahead by 240") # skip backwards in time is OK if --skip provided self.ok(f"--skip -f -D -r 120 insert/prep {t0} {t2} {t0} {t4}") self.contain("Skipping the remainder of this file") # Now insert for real self.ok(f"--skip --file --rate 120 /insert/prep {t0} {t2} {t4}") self.contain("Done") # Overlap self.fail(f"--skip --file --rate 120 /insert/prep {t0}") self.contain("new data overlaps existing data") # Not overlap if we change file offset self.ok(f"--skip --file --rate 120 -o 0 /insert/prep {t0}") # Data with no timestamp self.fail(f"-f -r 120 /insert/prep tests/data/prep-notime") self.contain("No idea what timestamp to use") # Check intervals so far eq_(list(client.stream_intervals("/insert/prep")), [[1332507600000000, 1332507959991668], [1332511200000000, 1332511319991668]]) # Delta supplied by file self.ok(f"--file --delta -o 0 /insert/prep {t4}-delta") eq_(list(client.stream_intervals("/insert/prep")), [[1332507600000000, 1332507959991668], [1332511200000000, 1332511319991668], [1332511440000000, 1332511499000001]]) # Now fake live timestamps by using the delta file, and a # fake clock that increments one second per call. def fake_time_now(): nonlocal fake_time_base ret = fake_time_base fake_time_base += 1000000 return ret real_time_now = nilmtools.insert.time_now nilmtools.insert.time_now = fake_time_now # Delta supplied by file. This data is too fast because delta # contains a 50 sec jump fake_time_base = 1332511560000000 self.fail(f"--live --delta -o 0 /insert/prep {t4}-delta") self.contain("Data is coming in too fast") self.contain("data time is Fri, 23 Mar 2012 10:06:55") self.contain("clock time is only Fri, 23 Mar 2012 10:06:06") # This data is OK, no jump fake_time_base = 1332511560000000 self.ok(f"--live --delta -o 0 /insert/prep {t4}-delta2") # This has unparseable delta fake_time_base = 1332511560000000 self.fail(f"--live --delta -o 0 /insert/prep {t4}-delta3") self.contain("can't parse delta") # Insert some gzipped data, with no timestamp in name bp1 = "tests/data/bpnilm-raw-1.gz" bp2 = "tests/data/bpnilm-raw-2.gz" client.stream_create("/insert/raw", "uint16_6") self.ok(f"--file /insert/raw {bp1} {bp2}") # Try truncated data tr = "tests/data/trunc" self.ok(f"--file /insert/raw {tr}1 {tr}2 {tr}3 {tr}4") nilmtools.insert.time_now = real_time_now def generate_sine_data(self, client, path, data_sec, fs, freq): # generate raw data client.stream_create(path, "uint16_2") with client.stream_insert_context(path) as ctx: for n in range(fs * data_sec): t = n / fs v = math.sin(t * 2 * math.pi * freq) i = 0.3 * math.sin(3*t) + math.sin(t) line = b"%d %d %d\n" % ( (t + 1234567890) * 1e6, v * 32767 + 32768, i * 32768 + 32768) ctx.insert(line) if 0: for (s, e) in client.stream_intervals(path): print(Interval(s,e).human_string()) def test_06_sinefit(self): self.main = nilmtools.sinefit.main client = nilmdb.client.Client(url=self.url) self.fail(f"") self.ok(f"--help") self.generate_sine_data(client, "/sf/raw", 50, 8000, 60) client.stream_create("/sf/out-bad", "float32_4") self.fail(f"--column 1 /sf/raw /sf/out-bad") self.contain("wrong number of fields") self.fail(f"--column 1 /sf/raw /sf/out") self.contain("/sf/out doesn't exist") # basic run client.stream_create("/sf/out", "float32_3") self.ok(f"--column 1 /sf/raw /sf/out") eq_(client.stream_count("/sf/out"), 3000) # parameter errors self.fail(f"--column 0 /sf/raw /sf/out") self.contain("need a column number") self.fail(f"/sf/raw /sf/out") self.contain("need a column number") self.fail(f"-c 1 --frequency 0 /sf/raw /sf/out") self.contain("frequency must be") self.fail(f"-c 1 --min-freq 100 /sf/raw /sf/out") self.contain("invalid min or max frequency") self.fail(f"-c 1 --max-freq 5 /sf/raw /sf/out") self.contain("invalid min or max frequency") self.fail(f"-c 1 --min-amp -1 /sf/raw /sf/out") self.contain("min amplitude must be") # trigger some warnings client.stream_create("/sf/out2", "float32_3") self.ok(f"-c 1 -f 500 -e @1234567897000000 /sf/raw /sf/out2") self.contain("outside valid range") self.contain("1000 warnings suppressed") eq_(client.stream_count("/sf/out2"), 0) self.ok(f"-c 1 -a 40000 -e @1234567898000000 /sf/raw /sf/out2") self.contain("below minimum threshold") # get coverage for "advance = N/2" line near end of sinefit, # where we found a fit but it was after the end of the window, # so we didn't actually mark anything in this window. self.ok(f"-c 1 -f 240 -m 50 -e @1234567898010000 /sf/raw /sf/out2") def test_07_median(self): self.main = nilmtools.median.main client = nilmdb.client.Client(url=self.url) self.fail(f"") self.ok(f"--help") client.stream_create("/median/1", "float32_8") client.stream_create("/median/2", "float32_8") self.fail("/newton/prep /median/0") self.contain("doesn't exist") self.ok("/newton/prep /median/1") self.ok("--difference /newton/prep /median/2") def test_08_prep(self): self.main = nilmtools.prep.main client = nilmdb.client.Client(url=self.url) self.fail(f"") self.ok(f"--help") self.fail(f"-c 2 /sf/raw /sf/out /prep/out") self.contain("/prep/out doesn't exist") # basic usage client.stream_create("/prep/out", "float32_8") self.ok(f"-c 2 /sf/raw /sf/out /prep/out") self.contain("processed 100000") # test arguments self.fail(f"/sf/raw /sf/out /prep/out") self.contain("need a column number") self.fail(f"-c 0 /sf/raw /sf/out /prep/out") self.contain("need a column number") self.fail(f"-c 2 -n 3 /sf/raw /sf/out /prep/out") self.contain("need 6 columns") self.fail(f"-c 2 -n 0 /sf/raw /sf/out /prep/out") self.contain("number of odd harmonics must be") self.fail(f"-c 2 -N 0 /sf/raw /sf/out /prep/out") self.contain("number of shifted FFTs must be") self.ok(f"-c 2 -r 0 /sf/raw /sf/out /prep/out") self.ok(f"-c 2 -R 0 /sf/raw /sf/out /prep/out") self.fail(f"-c 2 -r 0 -R 0 /sf/raw /sf/out /prep/out") self.fail(f"-c 2 /sf/raw /sf/no-sinefit-data /prep/out") self.contain("sinefit data not found") self.fail(f"-c 2 /sf/raw /prep/out /prep/out") self.contain("sinefit data type is float32_8; expected float32_3") # Limit time so only one row gets passed in client.stream_create("/prep/tmp", "float32_8") s = 1234567890000000 e = 1234567890000125 self.ok(f"-c 2 -s {s} -e {e} /sf/raw /sf/out /prep/tmp") # Lower sampling rate on everything, so that the FFT doesn't # return all the harmonics, and prep has to fill with zeros. # Tests the "if N < (nharm * 2):" condition in prep self.generate_sine_data(client, "/sf/raw-low", 5, 100, 60) self.main = nilmtools.sinefit.main client.stream_create("/sf/out-low", "float32_3") self.ok(f"--column 1 /sf/raw-low /sf/out-low") self.main = nilmtools.prep.main client.stream_create("/prep/out-low", "float32_8") self.ok(f"-c 2 /sf/raw-low /sf/out-low /prep/out-low") # Test prep with empty sinefit data client.stream_create("/sf/out-empty", "float32_3") with client.stream_insert_context("/sf/out-empty", 1034567890123456, 2034567890123456): pass client.stream_create("/prep/out-empty", "float32_8") self.ok(f"-c 2 /sf/raw /sf/out-empty /prep/out-empty") self.contain("warning: no periods found; skipping") def generate_trainola_data(self): # Build some fake data for trainola, which is just pulses of varying # length. client = nilmdb.client.Client(url=self.url) total_sec = 100 fs = 100 rg = numpy.random.Generator(numpy.random.MT19937(1234567)) path = "/train/data" # Just build up some random pulses. This uses seeded random numbers, # so any changes here will affect the success/failures of tests later. client.stream_create(path, "float32_1") with client.stream_insert_context(path) as ctx: remaining = 0 for n in range(fs * total_sec): t = n / fs data = rg.normal(100) / 100 - 1 if remaining > 0: remaining -= 1 data += 1 else: if rg.integers(fs * 10 * total_sec) < fs: if rg.integers(3) < 2: remaining = fs*2 else: remaining = fs/2 line = b"%d %f\n" % (t * 1e6, data) ctx.insert(line) # To view what was made, try: if 0: subprocess.call(f"nilmtool -u {self.url} extract -s min -e max " + f"{path} > /tmp/data", shell=True) # then in Octave: a=load("/tmp/data"); plot(a(:,2)); if 0: for (s, e) in client.stream_intervals(path): print(Interval(s,e).human_string()) # Also generate something with more than 100k data points client.stream_create("/train/big", "uint8_1") with client.stream_insert_context("/train/big") as ctx: for n in range(110000): ctx.insert(b"%d 0\n" % n) def test_09_trainola(self): self.main = nilmtools.trainola.main client = nilmdb.client.numpyclient.NumpyClient(url=self.url) self.fail(f"") self.ok(f"--help") self.ok(f"--version") self.generate_trainola_data() def get_json(path): with open(path) as f: js = f.read().replace('\n', ' ') return f"'{js}'" # pass a dict as argv[0] with assert_raises(KeyError): saved_stdout = sys.stdout try: with open(os.devnull, 'w') as sys.stdout: nilmtools.trainola.main([{ "url": self.url }]) finally: sys.stdout = saved_stdout # pass no args and they come from sys.argv saved_argv = sys.argv try: sys.argv = [ "prog", "bad-json," ] with assert_raises(json.decoder.JSONDecodeError): nilmtools.trainola.main() finally: sys.argv = saved_argv # catch a bunch of errors based on different json input client.stream_create("/train/matches", "uint8_1") for (num, error) in [ (1, "no columns"), (2, "duplicated columns"), (3, "bad column number"), (4, "source path '/c/d' does not exist"), (5, "destination path '/a/b' does not exist"), (6, "missing exemplars"), (7, "missing exemplars"), (8, "exemplar stream '/e/f' does not exist"), (9, "No data in this exemplar"), (10, "Too few data points"), (11, "Too many data points"), (12, "column FOO is not available in source") ]: self.fail(get_json(f"tests/data/trainola-bad{num}.js")) self.contain(error) # not enough columns in dest self.fail(get_json("tests/data/trainola1.js")) self.contain("bad destination column number") # run normally client.stream_destroy("/train/matches") client.stream_create("/train/matches", "uint8_2") self.ok(get_json("tests/data/trainola1.js")) self.contain("matched 10 exemplars") # check actual matches, since we made up the data matches = list(client.stream_extract_numpy("/train/matches")) eq_(matches[0].tolist(), [[34000000, 1, 0], [36000000, 0, 1], [40800000, 1, 0], [42800000, 0, 1], [60310000, 1, 0], [62310000, 0, 1], [69290000, 1, 0], [71290000, 0, 1], [91210000, 1, 0], [93210000, 0, 1]]) # another run using random noise as an exemplar, to get better coverage client.stream_create("/train/matches2", "uint8_1") self.ok(get_json("tests/data/trainola2.js")) def test_10_pipewatch(self): self.main = nilmtools.pipewatch.main self.fail(f"") self.ok(f"--help") lock = "tests/pipewatch.lock" lk = f"--lock {lock}" try: os.unlink(lock) except OSError: pass # try locking so pipewatch will exit (with code 0) lockfile = open(lock, "w") nilmdb.utils.lock.exclusive_lock(lockfile) self.ok(f"{lk} true true") self.contain("pipewatch process already running") os.unlink(lock) # have pipewatch remove its own lock to trigger error later self.ok(f"{lk} 'rm {lock}' true") # various cases to get coverage self.ok(f"{lk} true 'cat >/dev/null'") self.contain("generator returned 0, consumer returned 0") self.fail(f"{lk} false true") self.contain("generator returned 1, consumer returned 0") self.fail(f"{lk} false false") self.contain("generator returned 1, consumer returned 1") self.fail(f"{lk} true false") self.contain("generator returned 0, consumer returned 1") self.fail(f"{lk} 'kill -15 $$' true") self.ok(f"{lk} 'sleep 1 ; echo hi' 'cat >/dev/null'") self.ok(f"{lk} 'echo hi' 'cat >/dev/null'") self.fail(f"{lk} --timeout 0.5 'sleep 10 ; echo hi' 'cat >/dev/null'") self.fail(f"{lk} 'yes' 'head -1 >/dev/null'") self.fail(f"{lk} false 'exec 2>&-; trap \"sleep 10\" 0 15 ; sleep 10'") def test_11_cleanup(self): self.main = nilmtools.cleanup.main client = nilmdb.client.Client(url=self.url) # This mostly just gets coverage, doesn't carefully verify behavior self.fail(f"") self.ok(f"--help") self.fail(f"tests/data/cleanup-bad.cfg") self.contain("unknown units") client.stream_create("/empty/foo", "uint16_1") self.ok(f"tests/data/cleanup.cfg") self.contain("'/nonexistent/bar' did not match any existing streams") self.contain("no config for existing stream '/empty/foo'") self.contain("nothing to do (only 0.00 weeks of data present)") self.contain("specify --yes to actually perform") self.ok(f"--yes tests/data/cleanup.cfg") self.contain("removing data before") self.contain("removing from /sf/raw") self.ok(f"--estimate tests/data/cleanup.cfg") self.contain("Total estimated disk usage") self.contain("MiB") self.contain("GiB") self.ok(f"--yes tests/data/cleanup-nodecim.cfg") self.ok(f"--estimate tests/data/cleanup-nodecim.cfg") def test_12_misc(self): # Fill in test cases that were missed by earlier code: # math.py with assert_raises(ValueError): nilmtools.math.sfit4([1], 5) nilmtools.math.sfit4([1,2], 5) # filter.py client = nilmdb.client.numpyclient.NumpyClient(self.url) client.stream_create("/misc/a", "uint8_1") client.stream_create("/misc/b", "uint8_1") with client.stream_insert_context("/misc/a") as ctx: for n in range(10000): ctx.insert(b"%d 0\n" % n) pni = nilmtools.filter.process_numpy_interval src = nilmtools.filter.get_stream_info(client, "/misc/a") extractor = functools.partial( client.stream_extract_numpy, "/misc/a", layout=src.layout, maxrows=1000) inserter = functools.partial( client.stream_insert_numpy_context, "/misc/b") def func1(*args): return 0 def func2(*args): return -1 def func3(array, interval, args, insert_func, last): if last: return array.shape[0] return 0 saved = (sys.stdout, sys.stderr) try: with open(os.devnull, 'w') as sys.stdout: with open(os.devnull, 'w') as sys.stderr: pni(Interval(0, 10000), extractor, inserter, 100, func1) with assert_raises(SystemExit): f = nilmtools.filter.Filter("hello world") finally: (sys.stdout, sys.stderr) = saved with assert_raises(Exception): pni(Interval(0, 10000), extractor, inserter, 100, func2) pni(Interval(0, 10000), extractor, inserter, 100000, func3) with assert_raises(NotImplementedError): pni(Interval(0, 10000), extractor, inserter, 100000, nilmtools.filter.example_callback_function) self.main = nilmtools.filter.main self.fail(f"") self.ok(f"--help") self.fail(f"/misc/a /misc/a") self.contain("must be different") self.fail(f"--start HELLOWORLD /misc/a /misc/a") self.contain("not enough digits for a timestamp") client.stream_create("/misc/c", "uint8_1") self.ok(f"--quiet /misc/a /misc/c") self.contain("Source: /misc/a", False) self.contain("Generic filter: need to handle") f = nilmtools.filter.Filter() parser = f.setup_parser() args = f.parse_args(["--quiet", "/misc/a", "/misc/c"]) x = f.client_src x = f.client_dest for i in f.intervals(): with assert_raises(Exception) as e: x = f.client_src in_("client is in use", str(e.exception)) with assert_raises(Exception) as e: x = f.client_dest in_("client is in use", str(e.exception))