Compare commits

...

6 Commits

Author SHA1 Message Date
dc26e32b6e Make interhost, force_metadata private to Filter 2013-08-02 23:14:19 -04:00
981f23ff14 Better documentation for callback function 2013-08-02 23:14:19 -04:00
492445a469 Split off useful math functions to math.py 2013-08-02 17:27:39 -04:00
33c3586bea trainola: suppress peaks if larger ones are nearby
Might fix the problem Mark noticed where turn-off transients
are erroneously matching the drop that follows startup transients.
2013-07-31 19:12:16 -04:00
c1e0f8ffbc Fix bug in copy_one 2013-07-31 14:47:16 -04:00
d2853bdb0e Add test case for bad trainola detections 2013-07-30 20:35:54 -04:00
7 changed files with 234 additions and 142 deletions

View File

@@ -8,19 +8,26 @@ else
@echo "Try 'make install'"
endif
test: test_insert
test: test_trainola3
test_pipewatch:
nilmtools/pipewatch.py -t 3 "seq 10 20" "seq 20 30"
test_trainola:
-nilmtool -u http://bucket/nilmdb remove -s min -e max \
/sharon/prep-a-matches
nilmtools/trainola.py "$$(cat extras/trainola-test-param-2.js)"
-nilmtool -u http://bucket/nilmdb remove -s min -e max \
/sharon/prep-a-matches
nilmtools/trainola.py "$$(cat extras/trainola-test-param.js)"
test_trainola2:
-nilmtool -u http://bucket/nilmdb remove -s min -e max \
/sharon/prep-a-matches
nilmtools/trainola.py "$$(cat extras/trainola-test-param-2.js)"
test_trainola3:
-nilmtool -u "http://bucket/nilmdb" destroy -R /test/jim
nilmtool -u "http://bucket/nilmdb" create /test/jim uint8_3
nilmtools/trainola.py "$$(cat extras/trainola-test-param-3.js)"
nilmtool -u "http://bucket/nilmdb" extract /test/jim -s min -e max
test_cleanup:
nilmtools/cleanup.py -e extras/cleanup.cfg

View File

@@ -0,0 +1,40 @@
{
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"dest_stream": "/test/jim",
"start": 1364184839901599,
"end": 1364184942407610.2,
"columns": [ { "index": 0, "name": "P1" } ],
"exemplars": [
{
"name": "A - True DBL Freezer ON",
"dest_column": 0,
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"columns": [ { "index": 0, "name": "P1" } ],
"start": 1365277707649000,
"end": 1365277710705000
},
{
"name": "A - Boiler 1 Fan OFF",
"dest_column": 1,
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"columns": [ { "index": 0, "name": "P1" } ],
"start": 1364188370735000,
"end": 1364188373819000
},
{
"name": "A - True DBL Freezer OFF",
"dest_column": 2,
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"columns": [ { "index": 0, "name": "P1" } ],
"start": 1365278087982000,
"end": 1365278089340000
}
]
}

View File

@@ -32,7 +32,7 @@ def main(argv = None):
extractor = NumpyClient(f.src.url).stream_extract_numpy
inserter = NumpyClient(f.dest.url).stream_insert_numpy_context
for i in f.intervals():
print "Processing", f.interval_string(i)
print "Processing", i.human_string()
with inserter(f.dest.path, i.start, i.end) as insert_ctx:
for data in extractor(f.src.path, i.start, i.end):
insert_ctx.insert(data)

View File

@@ -133,6 +133,34 @@ def process_numpy_interval(interval, extractor, inserter, warn_rows,
# we'll not miss any data when we run again later.
insert_ctx.update_end(old_array[processed][0])
def example_callback_function(data, interval, args, insert_func, final):
"""Example of the signature for the function that gets passed
to process_numpy_interval.
'data': array of data to process -- may be empty
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
If unprocessed data remains after 'final' is True, the interval
being inserted will be ended at the timestamp of the first
unprocessed data point.
"""
raise NotImplementedError("example_callback_function does nothing")
class Filter(object):
def __init__(self, parser_description = None):
@@ -144,8 +172,8 @@ class Filter(object):
self.dest = None
self.start = None
self.end = None
self.interhost = False
self.force_metadata = False
self._interhost = False
self._force_metadata = False
if parser_description is not None:
self.setup_parser(parser_description)
self.parse_args()
@@ -208,12 +236,12 @@ class Filter(object):
if dest_url is None:
dest_url = url
if url != dest_url:
self.interhost = True
self._interhost = True
self._client_src = Client(url)
self._client_dest = Client(dest_url)
if (not self.interhost) and (srcpath == destpath):
if (not self._interhost) and (srcpath == destpath):
raise ArgumentError("source and destination path must be different")
# Open the streams
@@ -231,8 +259,8 @@ class Filter(object):
# Print info
if not quiet:
print "Source:", self.src.string(self.interhost)
print " Dest:", self.dest.string(self.interhost)
print "Source:", self.src.string(self._interhost)
print " Dest:", self.dest.string(self._interhost)
def parse_args(self, argv = None):
"""Parse arguments from a command line"""
@@ -241,7 +269,7 @@ class Filter(object):
self.set_args(args.url, args.dest_url, args.srcpath, args.destpath,
args.start, args.end, quiet = False, parsed_args = args)
self.force_metadata = args.force_metadata
self._force_metadata = args.force_metadata
if args.dry_run:
for interval in self.intervals():
print interval.human_string()
@@ -252,7 +280,7 @@ class Filter(object):
"""Generate all the intervals that this filter should process"""
self._using_client = True
if self.interhost:
if self._interhost:
# Do the difference ourselves
s_intervals = ( Interval(start, end)
for (start, end) in
@@ -289,10 +317,11 @@ class Filter(object):
str(e), toparse))
def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. If
there's no conflict, update the metadata to match 'data'."""
"""See if the metadata jives, and complain if it doesn't. For
each key in data, if the stream contains the key, it must match
values. If the stream does not contain the key, it is created."""
metadata = self._client_dest.stream_get_metadata(self.dest.path)
if not self.force_metadata:
if not self._force_metadata:
for key in data:
wanted = data[key]
if not isinstance(wanted, basestring):
@@ -329,30 +358,10 @@ class Filter(object):
If 'intervals' is not None, process those intervals instead of
the default list.
'function' should be defined as:
# def function(data, interval, args, insert_func, final)
'data': array of data to process -- may be empty
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
If unprocessed data remains after 'final' is True, the interval
being inserted will be ended at the timestamp of the first
unprocessed data point.
'function' should be defined with the same interface as
nilmtools.filter.example_callback_function. See the
documentation of that for details. 'args' are passed to
'function'.
"""
extractor = NumpyClient(self.src.url).stream_extract_numpy
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context

107
nilmtools/math.py Normal file
View File

@@ -0,0 +1,107 @@
#!/usr/bin/python
# Miscellaenous useful mathematical functions
from nilmdb.utils.printf import *
from numpy import *
from scipy import *
def sfit4(data, fs):
"""(A, f0, phi, C) = sfit4(data, fs)
Compute 4-parameter (unknown-frequency) least-squares fit to
sine-wave data, according to IEEE Std 1241-2010 Annex B
Input:
data vector of input samples
fs sampling rate (Hz)
Output:
Parameters [A, f0, phi, C] to fit the equation
x[n] = A * sin(f0/fs * 2 * pi * n + phi) + C
where n is sample number. Or, as a function of time:
x(t) = A * sin(f0 * 2 * pi * t + phi) + C
by Jim Paris
(Verified to match sfit4.m)
"""
N = len(data)
t = linspace(0, (N-1) / float(fs), N)
## Estimate frequency using FFT (step b)
Fc = fft(data)
F = abs(Fc)
F[0] = 0 # eliminate DC
# Find pair of spectral lines with largest amplitude:
# resulting values are in F(i) and F(i+1)
i = argmax(F[0:int(N/2)] + F[1:int(N/2+1)])
# Interpolate FFT to get a better result (from Markus [B37])
U1 = real(Fc[i])
U2 = real(Fc[i+1])
V1 = imag(Fc[i])
V2 = imag(Fc[i+1])
n = 2 * pi / N
ni1 = n * i
ni2 = n * (i+1)
K = ((V2-V1)*sin(ni1) + (U2-U1)*cos(ni1)) / (U2-U1)
Z1 = V1 * (K - cos(ni1)) / sin(ni1) + U1
Z2 = V2 * (K - cos(ni2)) / sin(ni2) + U2
i = arccos((Z2*cos(ni2) - Z1*cos(ni1)) / (Z2-Z1)) / n
# Convert to Hz
f0 = i * float(fs) / N
# Fit it. We'll catch exceptions here and just returns zeros
# if something fails with the least squares fit, etc.
try:
# first guess for A0, B0 using 3-parameter fit (step c)
s = zeros(3)
w = 2*pi*f0
# Now iterate 7 times (step b, plus 6 iterations of step i)
for idx in range(7):
D = c_[cos(w*t), sin(w*t), ones(N),
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
s = linalg.lstsq(D, data)[0] # eqn B.18
w = w + s[3] # update frequency estimate
## Extract results
A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
f0 = w / (2*pi)
phi = arctan2(s[0], s[1]) # eqn B.22 (flipped for sin instead of cos)
C = s[2]
return (A, f0, phi, C)
except Exception as e:
# something broke down; just return zeros
return (0, 0, 0, 0)
def peak_detect(data, delta = 0.1):
"""Simple min/max peak detection algorithm, taken from my code
in the disagg.m from the 10-8-5 paper.
Returns an array of peaks: each peak is a tuple
(n, p, is_max)
where n is the row number in 'data', and p is 'data[n]',
and is_max is True if this is a maximum, False if it's a minimum,
"""
peaks = [];
cur_min = (None, inf)
cur_max = (None, -inf)
lookformax = False
for (n, p) in enumerate(data):
if p > cur_max[1]:
cur_max = (n, p)
if p < cur_min[1]:
cur_min = (n, p)
if lookformax:
if p < (cur_max[1] - delta):
peaks.append((cur_max[0], cur_max[1], True))
cur_min = (n, p)
lookformax = False
else:
if p > (cur_min[1] + delta):
peaks.append((cur_min[0], cur_min[1], False))
cur_max = (n, p)
lookformax = True
return peaks

View File

@@ -3,6 +3,7 @@
# Sine wave fitting.
from nilmdb.utils.printf import *
import nilmtools.filter
import nilmtools.math
import nilmdb.client
from nilmdb.utils.time import (timestamp_to_human,
timestamp_to_seconds,
@@ -11,7 +12,6 @@ from nilmdb.utils.time import (timestamp_to_human,
from numpy import *
from scipy import *
#import pylab as p
import operator
import sys
def main(argv = None):
@@ -119,7 +119,7 @@ def process(data, interval, args, insert_function, final):
t_max = timestamp_to_seconds(data[start+N-1, 0])
# Do 4-parameter sine wave fit
(A, f0, phi, C) = sfit4(this, fs)
(A, f0, phi, C) = nilmtools.math.sfit4(this, fs)
# Check bounds. If frequency is too crazy, ignore this window
if f0 < f_min or f0 > f_max:
@@ -187,76 +187,5 @@ def process(data, interval, args, insert_function, final):
printf("%sMarked %d zero-crossings in %d rows\n", now, num_zc, start)
return start
def sfit4(data, fs):
"""(A, f0, phi, C) = sfit4(data, fs)
Compute 4-parameter (unknown-frequency) least-squares fit to
sine-wave data, according to IEEE Std 1241-2010 Annex B
Input:
data vector of input samples
fs sampling rate (Hz)
Output:
Parameters [A, f0, phi, C] to fit the equation
x[n] = A * sin(f0/fs * 2 * pi * n + phi) + C
where n is sample number. Or, as a function of time:
x(t) = A * sin(f0 * 2 * pi * t + phi) + C
by Jim Paris
(Verified to match sfit4.m)
"""
N = len(data)
t = linspace(0, (N-1) / float(fs), N)
## Estimate frequency using FFT (step b)
Fc = fft(data)
F = abs(Fc)
F[0] = 0 # eliminate DC
# Find pair of spectral lines with largest amplitude:
# resulting values are in F(i) and F(i+1)
i = argmax(F[0:int(N/2)] + F[1:int(N/2+1)])
# Interpolate FFT to get a better result (from Markus [B37])
U1 = real(Fc[i])
U2 = real(Fc[i+1])
V1 = imag(Fc[i])
V2 = imag(Fc[i+1])
n = 2 * pi / N
ni1 = n * i
ni2 = n * (i+1)
K = ((V2-V1)*sin(ni1) + (U2-U1)*cos(ni1)) / (U2-U1)
Z1 = V1 * (K - cos(ni1)) / sin(ni1) + U1
Z2 = V2 * (K - cos(ni2)) / sin(ni2) + U2
i = arccos((Z2*cos(ni2) - Z1*cos(ni1)) / (Z2-Z1)) / n
# Convert to Hz
f0 = i * float(fs) / N
# Fit it. We'll catch exceptions here and just returns zeros
# if something fails with the least squares fit, etc.
try:
# first guess for A0, B0 using 3-parameter fit (step c)
s = zeros(3)
w = 2*pi*f0
# Now iterate 7 times (step b, plus 6 iterations of step i)
for idx in range(7):
D = c_[cos(w*t), sin(w*t), ones(N),
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
s = linalg.lstsq(D, data)[0] # eqn B.18
w = w + s[3] # update frequency estimate
## Extract results
A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
f0 = w / (2*pi)
phi = arctan2(s[0], s[1]) # eqn B.22 (flipped for sin instead of cos)
C = s[2]
return (A, f0, phi, C)
except Exception as e:
# something broke down, just return zeros
return (0, 0, 0, 0)
if __name__ == "__main__":
main()

View File

@@ -3,6 +3,7 @@
from nilmdb.utils.printf import *
import nilmdb.client
import nilmtools.filter
import nilmtools.math
from nilmdb.utils.time import (timestamp_to_human,
timestamp_to_seconds,
seconds_to_timestamp)
@@ -104,31 +105,6 @@ class Exemplar(object):
self.name, self.stream, ",".join(self.columns.keys()),
self.count)
def peak_detect(data, delta):
"""Simple min/max peak detection algorithm, taken from my code
in the disagg.m from the 10-8-5 paper"""
mins = [];
maxs = [];
cur_min = (None, np.inf)
cur_max = (None, -np.inf)
lookformax = False
for (n, p) in enumerate(data):
if p > cur_max[1]:
cur_max = (n, p)
if p < cur_min[1]:
cur_min = (n, p)
if lookformax:
if p < (cur_max[1] - delta):
maxs.append(cur_max)
cur_min = (n, p)
lookformax = False
else:
if p > (cur_min[1] + delta):
mins.append(cur_min)
cur_max = (n, p)
lookformax = True
return (mins, maxs)
def timestamp_to_short_human(timestamp):
dt = datetime_tz.datetime_tz.fromtimestamp(timestamp_to_seconds(timestamp))
return dt.strftime("%H:%M:%S")
@@ -164,11 +140,35 @@ def trainola_matcher(data, interval, args, insert_func, final_chunk):
# Find the peaks using the column with the largest amplitude
biggest = e.scale.index(max(e.scale))
peaks_minmax = peak_detect(corrs[biggest], 0.1)
peaks = [ p[0] for p in peaks_minmax[1] ]
peaks = nilmtools.math.peak_detect(corrs[biggest], 0.1)
# Now look at every peak
for row in peaks:
# To try to reduce false positives, discard peaks where
# there's a higher-magnitude peak (either min or max) within
# one exemplar width nearby.
good_peak_locations = []
for (i, (n, p, is_max)) in enumerate(peaks):
if not is_max:
continue
ok = True
# check up to 'e.count' rows before this one
j = i-1
while ok and j >= 0 and peaks[j][0] > (n - e.count):
if abs(peaks[j][1]) > abs(p):
ok = False
j -= 1
# check up to 'e.count' rows after this one
j = i+1
while ok and j < len(peaks) and peaks[j][0] < (n + e.count):
if abs(peaks[j][1]) > abs(p):
ok = False
j += 1
if ok:
good_peak_locations.append(n)
# Now look at all good peaks
for row in good_peak_locations:
# Correlation for each column must be close enough to 1.
for (corr, scale) in zip(corrs, e.scale):
# The accepted distance from 1 is based on the relative