|
- # -*- coding: utf-8 -*-
-
- import nilmtools.copy_one
- import nilmtools.cleanup
- import nilmtools.copy_one
- import nilmtools.copy_wildcard
- import nilmtools.decimate_auto
- import nilmtools.decimate
- import nilmtools.insert
- import nilmtools.median
- import nilmtools.pipewatch
- import nilmtools.prep
- import nilmtools.sinefit
- import nilmtools.trainola
-
- from nose.tools import assert_raises
- import unittest
-
- from testutil.helpers import *
- import multiprocessing
- import traceback
-
- from urllib.request import urlopen
- from nilmtools.filter import ArgumentError
-
- def run_cherrypy_server(path, port, event):
- db = nilmdb.utils.serializer_proxy(nilmdb.server.NilmDB)(path)
- server = nilmdb.server.Server(db, host="127.0.0.1",
- port=port, stoppable=True)
- server.start(blocking = True, event = event)
- db.close()
-
- class CommandTester():
-
- url = "http://localhost:32182/"
- url2 = "http://localhost:32183/"
-
- @classmethod
- def setup_class(cls):
- # Use multiprocessing with "spawn" method, so that we can
- # start two fully independent cherrypy instances
- # (needed for copy-wildcard)
- multiprocessing.set_start_method('spawn')
-
- events = []
- for (path, port) in (("tests/testdb1", 32182),
- ("tests/testdb2", 32183)):
- recursive_unlink(path)
- event = multiprocessing.Event()
- proc = multiprocessing.Process(target=run_cherrypy_server,
- args=(path, port, event))
- proc.start()
- events.append(event)
- for event in events:
- if not event.wait(timeout = 10):
- raise AssertionError("server didn't start")
-
- @classmethod
- def teardown_class(cls):
- urlopen("http://127.0.0.1:32182/exit/", timeout = 1)
- urlopen("http://127.0.0.1:32183/exit/", timeout = 1)
-
- def run(self, arg_string, infile=None, outfile=None):
- """Run a cmdline client with the specified argument string,
- passing the given input. Save the output and exit code."""
- os.environ['NILMDB_URL'] = self.url
- class stdio_wrapper:
- def __init__(self, stdin, stdout, stderr):
- self.io = (stdin, stdout, stderr)
- def __enter__(self):
- self.saved = ( sys.stdin, sys.stdout, sys.stderr )
- ( sys.stdin, sys.stdout, sys.stderr ) = self.io
- def __exit__(self, type, value, traceback):
- ( sys.stdin, sys.stdout, sys.stderr ) = self.saved
- # Empty input if none provided
- if infile is None:
- infile = io.TextIOWrapper(io.BytesIO(b""))
- # Capture stderr
- errfile = io.TextIOWrapper(io.BytesIO())
- if outfile is None:
- # If no output file, capture stdout with stderr
- outfile = errfile
- with stdio_wrapper(infile, outfile, errfile) as s:
- try:
- args = shlex.split(arg_string)
- sys.argv[0] = "test_runner"
- self.main(args)
- sys.exit(0)
- except SystemExit as e:
- exitcode = e.code
- except Exception as e:
- traceback.print_exc()
- exitcode = 1
-
- # Capture raw binary output, and also try to decode a Unicode
- # string copy.
- self.captured_binary = outfile.buffer.getvalue()
- try:
- outfile.seek(0)
- self.captured = outfile.read()
- except UnicodeDecodeError:
- self.captured = None
-
- self.exitcode = exitcode
-
- def ok(self, arg_string, infile = None):
- self.run(arg_string, infile)
- if self.exitcode != 0:
- self.dump()
- eq_(self.exitcode, 0)
-
- def fail(self, arg_string, infile=None, exitcode=None):
- self.run(arg_string, infile)
- if exitcode is not None and self.exitcode != exitcode:
- # Wrong exit code
- self.dump()
- eq_(self.exitcode, exitcode)
- if self.exitcode == 0:
- # Success, when we wanted failure
- self.dump()
- ne_(self.exitcode, 0)
-
- def contain(self, checkstring, contain=True):
- if contain:
- in_(checkstring, self.captured)
- else:
- nin_(checkstring, self.captured)
-
- def match(self, checkstring):
- eq_(checkstring, self.captured)
-
- def matchfile(self, file):
- # Captured data should match file contents exactly
- with open(file) as f:
- contents = f.read()
- if contents != self.captured:
- print("--- reference file (first 1000 bytes):\n")
- print(contents[0:1000] + "\n")
- print("--- captured data (first 1000 bytes):\n")
- print(self.captured[0:1000] + "\n")
- zipped = itertools.zip_longest(contents, self.captured)
- for (n, (a, b)) in enumerate(zipped):
- if a != b:
- print("--- first difference is at offset", n)
- print("--- reference:", repr(a))
- print("--- captured:", repr(b))
- break
- raise AssertionError("captured data doesn't match " + file)
-
- def matchfilecount(self, file):
- # Last line of captured data should match the number of
- # non-commented lines in file
- count = 0
- with open(file) as f:
- for line in f:
- if line[0] != '#':
- count += 1
- eq_(self.captured.splitlines()[-1], sprintf("%d", count))
-
- def dump(self):
- printf("-----dump start-----\n%s-----dump end-----\n", self.captured)
-
-
- class TestAllCommands(CommandTester):
-
- def load_data(self):
- client = nilmdb.client.Client(url=self.url)
- client.stream_create("/newton/prep", "float32_8")
- client.stream_set_metadata("/newton/prep",
- { "description": "newton" })
-
- for ts in ("20120323T1000", "20120323T1002", "20120323T1004"):
- start = nilmdb.utils.time.parse_time(ts)
- fn = f"tests/data/prep-{ts}"
- data = nilmdb.utils.timestamper.TimestamperRate(fn, start, 120)
- client.stream_insert("/newton/prep", data);
-
- def test_01_copy(self):
- self.main = nilmtools.copy_one.main
-
- client = nilmdb.client.Client(url=self.url)
-
- self.load_data()
-
- # basic arguments
- self.fail(f"")
- self.fail(f"no-such-src no-such-dest")
- self.contain("source path no-such-src not found")
- self.fail(f"-u {self.url} no-such-src no-such-dest")
-
- # nonexistent dest
- self.fail(f"/newton/prep /newton/prep-copy")
- self.contain("Destination /newton/prep-copy doesn't exist")
-
- # wrong type
- client.stream_create("/newton/prep-copy-wrongtype", "uint16_6")
- self.fail(f"/newton/prep /newton/prep-copy-wrongtype")
- self.contain("wrong number of fields")
-
- # copy with metadata, and compare
- client.stream_create("/newton/prep-copy", "float32_8")
- self.ok(f"/newton/prep /newton/prep-copy")
- a = list(client.stream_extract("/newton/prep"))
- b = list(client.stream_extract("/newton/prep-copy"))
- eq_(a, b)
- a = client.stream_get_metadata("/newton/prep")
- b = client.stream_get_metadata("/newton/prep-copy")
- eq_(a, b)
-
- # copy with no metadata
- client.stream_create("/newton/prep-copy-nometa", "float32_8")
- self.ok(f"--nometa /newton/prep /newton/prep-copy-nometa")
- a = list(client.stream_extract("/newton/prep"))
- b = list(client.stream_extract("/newton/prep-copy-nometa"))
- eq_(a, b)
- a = client.stream_get_metadata("/newton/prep")
- b = client.stream_get_metadata("/newton/prep-copy-nometa")
- ne_(a, b)
-
- def test_copy_wildcard(self):
- self.main = nilmtools.copy_wildcard.main
-
- def test_decimate(self):
- self.main = nilmtools.decimate.main
-
- def test_decimate_auto(self):
- self.main = nilmtools.decimate_auto.main
-
- def test_insert(self):
- self.main = nilmtools.insert.main
-
- def test_sinefit(self):
- self.main = nilmtools.sinefit.main
-
- def test_cleanup(self):
- self.main = nilmtools.cleanup.main
-
- def test_median(self):
- self.main = nilmtools.median.main
-
- def test_trainola(self):
- self.main = nilmtools.trainola.main
-
- def test_pipewatch(self):
- self.main = nilmtools.pipewatch.main
-
- def test_prep(self):
- self.main = nilmtools.prep.main
|