Add nilm-sinefit test, and update for Python 3

This commit is contained in:
Jim Paris 2020-08-05 14:24:51 -04:00
parent 3a8c04e04a
commit 7bed742957
3 changed files with 72 additions and 4 deletions

View File

@ -67,7 +67,7 @@ def sfit4(data, fs):
for idx in range(7):
D = c_[cos(w*t), sin(w*t), ones(N),
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
s = linalg.lstsq(D, data)[0] # eqn B.18
s = linalg.lstsq(D, data, rcond=None)[0] # eqn B.18
w = w + s[3] # update frequency estimate
## Extract results

View File

@ -83,7 +83,7 @@ class SuppressibleWarning(object):
if self.count <= self.maxcount:
self._write(seconds, msg)
if (self.count - self.maxcount) >= self.maxsuppress:
self.reset(seconds)
self.reset()
def reset(self, seconds = None):
if self.count > self.maxcount:
@ -98,7 +98,7 @@ def process(data, interval, args, insert_function, final):
# Estimate sampling frequency from timestamps
ts_min = timestamp_to_seconds(data[0][0])
ts_max = timestamp_to_seconds(data[-1][0])
if ts_min >= ts_max:
if ts_min >= ts_max: # pragma: no cover; process_numpy shouldn't send this
return 0
fs = (rows-1) / (ts_max - ts_min)
@ -162,7 +162,9 @@ def process(data, interval, args, insert_function, final):
insert_function([[seconds_to_timestamp(t), f0, A, C]])
last_inserted_timestamp = t
warn.reset(t)
else:
else: # pragma: no cover -- this is hard to trigger,
# if it's even possible at all; I think it would require
# some jitter in how the waves fit, across a window boundary.
warn.warn("timestamp overlap\n", t)
num_zc += 1
last_zc = zc_n

View File

@ -13,9 +13,12 @@ import nilmtools.prep
import nilmtools.sinefit
import nilmtools.trainola
from nilmdb.utils.interval import Interval
from nose.tools import assert_raises
import unittest
import math
from testutil.helpers import *
import multiprocessing
import traceback
@ -472,6 +475,69 @@ class TestAllCommands(CommandTester):
def test_06_sinefit(self):
self.main = nilmtools.sinefit.main
client = nilmdb.client.Client(url=self.url)
self.fail(f"")
self.ok(f"--help")
# generate raw data
data_sec = 50
client.stream_create("/sf/raw", "uint16_2")
with client.stream_insert_context("/sf/raw") as ctx:
fs = 8000
freq = 60.0
for n in range(fs * data_sec):
t = n / fs
v = math.sin(t * 2 * math.pi * freq)
i = 0.3 * math.sin(3*t) + math.sin(t)
line = b"%d %d %d\n" % (
(t + 1234567890) * 1e6,
v * 32767 + 32768,
i * 32768 + 32768)
ctx.insert(line)
if 0:
for (s, e) in client.stream_intervals("/sf/raw"):
print(Interval(s,e).human_string())
client.stream_create("/sf/out-bad", "float32_4")
self.fail(f"--column 1 /sf/raw /sf/out-bad")
self.contain("wrong number of fields")
self.fail(f"--column 1 /sf/raw /sf/out")
self.contain("/sf/out doesn't exist")
# basic run
client.stream_create("/sf/out", "float32_3")
self.ok(f"--column 1 /sf/raw /sf/out")
eq_(client.stream_count("/sf/out"), 60 * data_sec)
# parameter errors
self.fail(f"--column 0 /sf/raw /sf/out")
self.contain("need a column number")
self.fail(f"/sf/raw /sf/out")
self.contain("need a column number")
self.fail(f"-c 1 --frequency 0 /sf/raw /sf/out")
self.contain("frequency must be")
self.fail(f"-c 1 --min-freq 100 /sf/raw /sf/out")
self.contain("invalid min or max frequency")
self.fail(f"-c 1 --max-freq 5 /sf/raw /sf/out")
self.contain("invalid min or max frequency")
self.fail(f"-c 1 --min-amp -1 /sf/raw /sf/out")
self.contain("min amplitude must be")
# trigger some warnings
client.stream_create("/sf/out2", "float32_3")
self.ok(f"-c 1 -f 500 -e @1234567897000000 /sf/raw /sf/out2")
self.contain("outside valid range")
self.contain("1000 warnings suppressed")
eq_(client.stream_count("/sf/out2"), 0)
self.ok(f"-c 1 -a 40000 -e @1234567898000000 /sf/raw /sf/out2")
self.contain("below minimum threshold")
# get coverage for "advance = N/2" line near end of sinefit,
# where we found a fit but it was after the end of the window,
# so we didn't actually mark anything in this window.
self.ok(f"-c 1 -f 240 -m 50 -e @1234567898010000 /sf/raw /sf/out2")
def test_07_cleanup(self):
self.main = nilmtools.cleanup.main