Compare commits

...

23 Commits

Author SHA1 Message Date
f530edd8a0 sfit4: if interpolated DFT fails, use peak 2013-08-16 15:36:39 -04:00
4d946bee79 Set shell and path in sample cron script 2013-08-16 15:36:20 -04:00
13ceb91999 Add test_sinefit makefile target 2013-08-16 15:36:11 -04:00
dab9625296 Run fsck at startup 2013-08-09 16:03:14 -04:00
3e7527ab57 Support -h and -v in nilm-trainola 2013-08-08 16:30:08 -04:00
31b6d82dfc Make 'make test' do nothing from command line 2013-08-07 20:19:39 -04:00
077010ba3a Store nshift in prep metadata 2013-08-07 20:19:28 -04:00
77751a8529 Fix typo in help text 2013-08-07 18:39:19 -04:00
9c711300a2 Add short form of --force-metadata, -F 2013-08-06 23:07:36 -04:00
74cf34e2ad Update sharon cleanup.cfg 2013-08-06 22:48:06 -04:00
120bf58b85 Support --nometa option for copy_one and copy_wildcard 2013-08-06 22:47:16 -04:00
c26daa9a3b Update crontab 2013-08-03 11:23:43 -04:00
6993f5c886 Fix process termination in nilm-pipewatch 2013-08-03 11:13:30 -04:00
Sharon NILM
dd69f3e51d Update process.sh 2013-08-02 23:19:14 -04:00
dc26e32b6e Make interhost, force_metadata private to Filter 2013-08-02 23:14:19 -04:00
981f23ff14 Better documentation for callback function 2013-08-02 23:14:19 -04:00
492445a469 Split off useful math functions to math.py 2013-08-02 17:27:39 -04:00
33c3586bea trainola: suppress peaks if larger ones are nearby
Might fix the problem Mark noticed where turn-off transients
are erroneously matching the drop that follows startup transients.
2013-07-31 19:12:16 -04:00
c1e0f8ffbc Fix bug in copy_one 2013-07-31 14:47:16 -04:00
d2853bdb0e Add test case for bad trainola detections 2013-07-30 20:35:54 -04:00
a4d4bc22fc Add --skip option to nilm-insert 2013-07-30 18:25:47 -04:00
6090dd6112 prep: only process intervals present in both raw & sinefit 2013-07-30 14:55:06 -04:00
Sharon NILM
9c0d9ad324 Sample scripts from Sharon 2013-07-29 18:37:55 -04:00
18 changed files with 393 additions and 170 deletions

View File

@@ -1,33 +1,40 @@
#URL="http://bucket.mit.edu:8080/nilmdb"
URL="http://localhost/nilmdb"
all:
ifeq ($(INSIDE_EMACS), t)
@make test
else
@echo "Try 'make install'"
endif
all: test
test: test_pipewatch
test:
ifeq ($(INSIDE_EMACS), t)
@make test_sinefit
else
@echo 'No test suite for nilmtools. Try "make install"'
endif
test_pipewatch:
nilmtools/pipewatch.py -t 3 "seq 10 20" "seq 20 30"
test_trainola:
-nilmtool -u http://bucket/nilmdb remove -s min -e max \
/sharon/prep-a-matches
nilmtools/trainola.py "$$(cat extras/trainola-test-param-2.js)"
-nilmtool -u http://bucket/nilmdb remove -s min -e max \
/sharon/prep-a-matches
nilmtools/trainola.py "$$(cat extras/trainola-test-param.js)"
test_trainola2:
-nilmtool -u http://bucket/nilmdb remove -s min -e max \
/sharon/prep-a-matches
nilmtools/trainola.py "$$(cat extras/trainola-test-param-2.js)"
test_trainola3:
-nilmtool -u "http://bucket/nilmdb" destroy -R /test/jim
nilmtool -u "http://bucket/nilmdb" create /test/jim uint8_3
nilmtools/trainola.py "$$(cat extras/trainola-test-param-3.js)"
nilmtool -u "http://bucket/nilmdb" extract /test/jim -s min -e max
test_cleanup:
nilmtools/cleanup.py -e extras/cleanup.cfg
nilmtools/cleanup.py extras/cleanup.cfg
test_insert:
nilmtools/insert.py --file --dry-run /test/foo </dev/null
nilmtools/insert.py --skip --file --dry-run /foo/bar ~/data/20130311T2100.prep1.gz ~/data/20130311T2100.prep1.gz ~/data/20130311T2200.prep1.gz
test_copy:
nilmtools/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees*
@@ -46,10 +53,19 @@ test_prep: /tmp/raw.dat
nilmtool create /test/sinefit float32_3
nilmtool create /test/prep float32_8
nilmtool insert -s '@0' -t -r 8000 /test/raw /tmp/raw.dat
nilmtools/sinefit.py -a 0.5 -c 1 /test/raw /test/sinefit
nilmtools/sinefit.py -a 0.5 -c 1 -s '@0' -e '@5000000' /test/raw /test/sinefit
nilmtools/prep.py -c 2 /test/raw /test/sinefit /test/prep
nilmtools/prep.py -c 2 /test/raw /test/sinefit /test/prep
nilmtool extract -s min -e max /test/prep | head -20
test_sinefit:
make install >/dev/null 2>&1
-nilmtool destroy -R /test/sinefit
nilmtool create /test/sinefit float32_3
nilmtools/sinefit.py -c 5 -s '2013/03/25 09:11:00' \
-e '2013/03/25 10:11:00' /sharon/raw /test/sinefit
nilmtool extract -s min -e max /test/sinefit | head -20
test_decimate:
-@nilmtool destroy /lees-compressor/no-leak/raw/4 || true
-@nilmtool destroy /lees-compressor/no-leak/raw/16 || true

View File

@@ -8,7 +8,7 @@ Prerequisites:
sudo apt-get install python2.7 python2.7-dev python-setuptools
sudo apt-get install python-numpy python-scipy python-daemon
nilmdb (1.8.1+)
nilmdb (1.8.5+)
Install:

View File

@@ -0,0 +1,10 @@
#!/bin/bash
# Start the ethstream capture using nilm-pipewatch
# Bail out on errors
set -e
nilm-pipewatch --daemon --lock "/tmp/nilmdb-capture.lock" --timeout 30 \
"ethstream -a 192.168.1.209 -n 9 -r 8000 -N" \
"nilm-insert -m 10 -r 8000 --live /sharon/raw"

View File

@@ -0,0 +1,9 @@
[/sharon/prep-*]
keep = 1y
[/sharon/raw]
keep = 2w
[/sharon/sinefit]
keep = 1y
decimated = false

View File

@@ -0,0 +1,15 @@
# Install this by running "crontab crontab" (will replace existing crontab)
SHELL=/bin/bash
PATH=/usr/local/bin:/usr/local/sbin:/usr/bin:/usr/sbin:/bin:/sbin
# m h dom mon dow cmd
# Run NilmDB processing every 5 minutes
*/5 * * * * chronic /home/nilm/data/process.sh
# Try frequently restarting the capture process in case it died
*/5 * * * * chronic /home/nilm/data/capture.sh
# Run fsck at startup
@reboot chronic nilmdb-fsck --fix --no-data /home/nilm/data/db/

View File

@@ -0,0 +1,32 @@
#!/bin/bash
# Run all necessary processing on NilmDB data.
# Bail out on errors
set -e
# Ensure only one copy of this code runs at a time:
LOCKFILE="/tmp/nilmdb-process.lock"
exec 99>"$LOCKFILE"
if ! flock -n -x 99 ; then
echo "NilmDB processing already running, giving up..."
exit 0
fi
trap 'rm -f "$LOCKFILE"' 0
# redirect stdout/stderr to log, but keep it on the console too
exec > >(tee /home/nilm/data/process.log)
exec 2> >(tee -a /home/nilm/data/process.log >&2)
echo "sinefit on phase A voltage"
nilm-sinefit -c 5 /sharon/raw /sharon/sinefit
echo "prep on A, B, C with appropriate rotations"
nilm-prep -c 1 -r 0 /sharon/raw /sharon/sinefit /sharon/prep-a
nilm-prep -c 2 -r 120 /sharon/raw /sharon/sinefit /sharon/prep-b
nilm-prep -c 3 -r 240 /sharon/raw /sharon/sinefit /sharon/prep-c
echo "decimate raw and prep data"
nilm-decimate-auto /sharon/raw /sharon/prep*
echo "run cleanup"
nilm-cleanup --yes /home/nilm/data/cleanup.cfg

View File

@@ -0,0 +1,40 @@
{
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"dest_stream": "/test/jim",
"start": 1364184839901599,
"end": 1364184942407610.2,
"columns": [ { "index": 0, "name": "P1" } ],
"exemplars": [
{
"name": "A - True DBL Freezer ON",
"dest_column": 0,
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"columns": [ { "index": 0, "name": "P1" } ],
"start": 1365277707649000,
"end": 1365277710705000
},
{
"name": "A - Boiler 1 Fan OFF",
"dest_column": 1,
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"columns": [ { "index": 0, "name": "P1" } ],
"start": 1364188370735000,
"end": 1364188373819000
},
{
"name": "A - True DBL Freezer OFF",
"dest_column": 2,
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"columns": [ { "index": 0, "name": "P1" } ],
"start": 1365278087982000,
"end": 1365278089340000
}
]
}

View File

@@ -12,6 +12,8 @@ import sys
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Copy a stream")
parser.add_argument('-n', '--nometa', action='store_true',
help="Don't copy or check metadata")
# Parse arguments
try:
@@ -25,14 +27,15 @@ def main(argv = None):
raise SystemExit(1)
# Copy metadata
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata(meta)
if not args.nometa:
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata(meta)
# Copy all rows of data using the faster Numpy interfaces
extractor = NumpyClient(f.src.url).stream_extract_numpy
inserter = NumpyClient(f.dest.url).stream_insert_numpy_context
for i in f.intervals():
print "Processing", f.interval_string(i)
print "Processing", i.human_string()
with inserter(f.dest.path, i.start, i.end) as insert_ctx:
for data in extractor(f.src.path, i.start, i.end):
insert_ctx.insert(data)

View File

@@ -16,6 +16,8 @@ def main(argv = None):
Example: %(prog)s -u http://host1/nilmdb -U http://host2/nilmdb /sharon/*
""", skip_paths = True)
parser.add_argument('-n', '--nometa', action='store_true',
help="Don't copy or check metadata")
parser.add_argument("path", action="store", nargs="+",
help='Wildcard paths to copy')
args = parser.parse_args(argv)
@@ -56,6 +58,8 @@ def main(argv = None):
new_argv.extend(["--end", "@" + repr(args.end)])
if args.dry_run:
new_argv.extend(["--dry-run"])
if args.nometa:
new_argv.extend(["--nometa"])
if args.force_metadata:
new_argv.extend(["--force-metadata"])
new_argv.extend([stream[0], stream[0]])

View File

@@ -21,9 +21,9 @@ def main(argv = None):
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument('-f', '--factor', action='store', default=4, type=int,
parser.add_argument("-f", "--factor", action="store", default=4, type=int,
help='Decimation factor (default: %(default)s)')
parser.add_argument("--force-metadata", action="store_true",
parser.add_argument("-F", "--force-metadata", action="store_true",
default = False,
help="Force metadata changes if the dest "
"doesn't match")

View File

@@ -133,6 +133,34 @@ def process_numpy_interval(interval, extractor, inserter, warn_rows,
# we'll not miss any data when we run again later.
insert_ctx.update_end(old_array[processed][0])
def example_callback_function(data, interval, args, insert_func, final):
"""Example of the signature for the function that gets passed
to process_numpy_interval.
'data': array of data to process -- may be empty
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
If unprocessed data remains after 'final' is True, the interval
being inserted will be ended at the timestamp of the first
unprocessed data point.
"""
raise NotImplementedError("example_callback_function does nothing")
class Filter(object):
def __init__(self, parser_description = None):
@@ -144,8 +172,8 @@ class Filter(object):
self.dest = None
self.start = None
self.end = None
self.interhost = False
self.force_metadata = False
self._interhost = False
self._force_metadata = False
if parser_description is not None:
self.setup_parser(parser_description)
self.parse_args()
@@ -178,7 +206,7 @@ class Filter(object):
default = False,
help="Just print intervals that would be "
"processed")
group.add_argument("--force-metadata", action="store_true",
group.add_argument("-F", "--force-metadata", action="store_true",
default = False,
help="Force metadata changes if the dest "
"doesn't match")
@@ -208,12 +236,12 @@ class Filter(object):
if dest_url is None:
dest_url = url
if url != dest_url:
self.interhost = True
self._interhost = True
self._client_src = Client(url)
self._client_dest = Client(dest_url)
if (not self.interhost) and (srcpath == destpath):
if (not self._interhost) and (srcpath == destpath):
raise ArgumentError("source and destination path must be different")
# Open the streams
@@ -231,8 +259,8 @@ class Filter(object):
# Print info
if not quiet:
print "Source:", self.src.string(self.interhost)
print " Dest:", self.dest.string(self.interhost)
print "Source:", self.src.string(self._interhost)
print " Dest:", self.dest.string(self._interhost)
def parse_args(self, argv = None):
"""Parse arguments from a command line"""
@@ -241,7 +269,7 @@ class Filter(object):
self.set_args(args.url, args.dest_url, args.srcpath, args.destpath,
args.start, args.end, quiet = False, parsed_args = args)
self.force_metadata = args.force_metadata
self._force_metadata = args.force_metadata
if args.dry_run:
for interval in self.intervals():
print interval.human_string()
@@ -252,7 +280,7 @@ class Filter(object):
"""Generate all the intervals that this filter should process"""
self._using_client = True
if self.interhost:
if self._interhost:
# Do the difference ourselves
s_intervals = ( Interval(start, end)
for (start, end) in
@@ -289,10 +317,11 @@ class Filter(object):
str(e), toparse))
def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. If
there's no conflict, update the metadata to match 'data'."""
"""See if the metadata jives, and complain if it doesn't. For
each key in data, if the stream contains the key, it must match
values. If the stream does not contain the key, it is created."""
metadata = self._client_dest.stream_get_metadata(self.dest.path)
if not self.force_metadata:
if not self._force_metadata:
for key in data:
wanted = data[key]
if not isinstance(wanted, basestring):
@@ -316,7 +345,8 @@ class Filter(object):
self._client_dest.stream_update_metadata(self.dest.path, data)
# The main filter processing method.
def process_numpy(self, function, args = None, rows = 100000):
def process_numpy(self, function, args = None, rows = 100000,
intervals = None):
"""Calls process_numpy_interval for each interval that currently
exists in self.src, but doesn't exist in self.dest. It will
process the data in chunks as follows:
@@ -325,30 +355,13 @@ class Filter(object):
corresponding to the data. The data is converted to a Numpy
array in chunks of 'rows' rows at a time.
'function' should be defined as:
# def function(data, interval, args, insert_func, final)
If 'intervals' is not None, process those intervals instead of
the default list.
'data': array of data to process -- may be empty
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
If unprocessed data remains after 'final' is True, the interval
being inserted will be ended at the timestamp of the first
unprocessed data point.
'function' should be defined with the same interface as
nilmtools.filter.example_callback_function. See the
documentation of that for details. 'args' are passed to
'function'.
"""
extractor = NumpyClient(self.src.url).stream_extract_numpy
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
@@ -358,7 +371,7 @@ class Filter(object):
maxrows = rows)
inserter_func = functools.partial(inserter, self.dest.path)
for interval in self.intervals():
for interval in (intervals or self.intervals()):
print "Processing", interval.human_string()
process_numpy_interval(interval, extractor_func, inserter_func,
rows * 3, function, args)

View File

@@ -53,7 +53,8 @@ def parse_args(argv = None):
is stepped forward to match 'clock'.
- If 'data' is running ahead, there is overlap in the data, and an
error is raised.
error is raised. If '--skip' is specified, the current file
is skipped instead of raising an error.
"""))
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
@@ -61,6 +62,8 @@ def parse_args(argv = None):
group = parser.add_argument_group("Misc options")
group.add_argument("-D", "--dry-run", action="store_true",
help="Parse files, but don't insert any data")
group.add_argument("-s", "--skip", action="store_true",
help="Skip files if the data would overlap")
group.add_argument("-m", "--max-gap", action="store", default=10.0,
metavar="SEC", type=float,
help="Max discrepency between clock and data "
@@ -235,6 +238,10 @@ def main(argv = None):
"is %s but clock time is only %s",
timestamp_to_human(data_ts),
timestamp_to_human(clock_ts))
if args.skip:
printf("%s\n", err)
printf("Skipping the remainder of this file\n")
break
raise ParseError(filename, err)
if (data_ts + max_gap) < clock_ts:

111
nilmtools/math.py Normal file
View File

@@ -0,0 +1,111 @@
#!/usr/bin/python
# Miscellaenous useful mathematical functions
from nilmdb.utils.printf import *
from numpy import *
from scipy import *
def sfit4(data, fs):
"""(A, f0, phi, C) = sfit4(data, fs)
Compute 4-parameter (unknown-frequency) least-squares fit to
sine-wave data, according to IEEE Std 1241-2010 Annex B
Input:
data vector of input samples
fs sampling rate (Hz)
Output:
Parameters [A, f0, phi, C] to fit the equation
x[n] = A * sin(f0/fs * 2 * pi * n + phi) + C
where n is sample number. Or, as a function of time:
x(t) = A * sin(f0 * 2 * pi * t + phi) + C
by Jim Paris
(Verified to match sfit4.m)
"""
N = len(data)
t = linspace(0, (N-1) / float(fs), N)
## Estimate frequency using FFT (step b)
Fc = fft(data)
F = abs(Fc)
F[0] = 0 # eliminate DC
# Find pair of spectral lines with largest amplitude:
# resulting values are in F(i) and F(i+1)
i = argmax(F[0:int(N/2)] + F[1:int(N/2+1)])
# Interpolate FFT to get a better result (from Markus [B37])
try:
U1 = real(Fc[i])
U2 = real(Fc[i+1])
V1 = imag(Fc[i])
V2 = imag(Fc[i+1])
n = 2 * pi / N
ni1 = n * i
ni2 = n * (i+1)
K = ((V2-V1)*sin(ni1) + (U2-U1)*cos(ni1)) / (U2-U1)
Z1 = V1 * (K - cos(ni1)) / sin(ni1) + U1
Z2 = V2 * (K - cos(ni2)) / sin(ni2) + U2
i = arccos((Z2*cos(ni2) - Z1*cos(ni1)) / (Z2-Z1)) / n
except Exception:
# Just go with the biggest FFT peak
i = argmax(F[0:int(N/2)])
# Convert to Hz
f0 = i * float(fs) / N
# Fit it. We'll catch exceptions here and just returns zeros
# if something fails with the least squares fit, etc.
try:
# first guess for A0, B0 using 3-parameter fit (step c)
s = zeros(3)
w = 2*pi*f0
# Now iterate 7 times (step b, plus 6 iterations of step i)
for idx in range(7):
D = c_[cos(w*t), sin(w*t), ones(N),
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
s = linalg.lstsq(D, data)[0] # eqn B.18
w = w + s[3] # update frequency estimate
## Extract results
A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
f0 = w / (2*pi)
phi = arctan2(s[0], s[1]) # eqn B.22 (flipped for sin instead of cos)
C = s[2]
return (A, f0, phi, C)
except Exception as e:
# something broke down; just return zeros
return (0, 0, 0, 0)
def peak_detect(data, delta = 0.1):
"""Simple min/max peak detection algorithm, taken from my code
in the disagg.m from the 10-8-5 paper.
Returns an array of peaks: each peak is a tuple
(n, p, is_max)
where n is the row number in 'data', and p is 'data[n]',
and is_max is True if this is a maximum, False if it's a minimum,
"""
peaks = [];
cur_min = (None, inf)
cur_max = (None, -inf)
lookformax = False
for (n, p) in enumerate(data):
if p > cur_max[1]:
cur_max = (n, p)
if p < cur_min[1]:
cur_min = (n, p)
if lookformax:
if p < (cur_max[1] - delta):
peaks.append((cur_max[0], cur_max[1], True))
cur_min = (n, p)
lookformax = False
else:
if p > (cur_min[1] + delta):
peaks.append((cur_min[0], cur_min[1], False))
cur_max = (n, p)
lookformax = True
return peaks

View File

@@ -84,13 +84,16 @@ def pipewatch(args):
bufsize = -1, close_fds = True,
stdin = devnull,
stdout = subprocess.PIPE,
stderr = None)
stderr = None,
preexec_fn = os.setpgrp)
consumer = subprocess.Popen(args.consumer, shell = True,
bufsize = -11, close_fds = True,
stdin = subprocess.PIPE,
stdout = None, stderr = None)
stdout = None,
stderr = None,
preexec_fn = os.setpgrp)
queue = Queue.Queue(maxsize = 32)
queue = Queue.Queue(maxsize = 4)
reader = threading.Thread(target = reader_thread,
args = (queue, generator.stdout.fileno()))
reader.start()
@@ -125,16 +128,21 @@ def pipewatch(args):
return proc.poll()
try:
if poll_timeout(proc, 0.5) is None:
proc.terminate()
os.killpg(proc.pid, signal.SIGTERM)
if poll_timeout(proc, 0.5) is None:
proc.kill()
os.killpg(proc.pid, signal.SIGKILL)
except OSError:
pass
return poll_timeout(proc, 0.5)
# Wait for them to die, or kill them
gret = kill(generator)
cret = kill(consumer)
gret = kill(generator)
# Consume all remaining data in the queue until the reader
# and watcher threads are done
while reader.is_alive() or watcher.is_alive():
queue.get(True, 0.1)
fprintf(sys.stderr, "pipewatch: generator returned %d, " +
"consumer returned %d\n", gret, cret)

View File

@@ -12,6 +12,7 @@ import scipy.fftpack
import scipy.signal
#from matplotlib import pyplot as p
import bisect
from nilmdb.utils.interval import Interval
def main(argv = None):
# Set up argument parser
@@ -80,11 +81,23 @@ def main(argv = None):
f.check_dest_metadata({ "prep_raw_source": f.src.path,
"prep_sinefit_source": sinefit.path,
"prep_column": args.column,
"prep_rotation": repr(rotation) })
"prep_rotation": repr(rotation),
"prep_nshift": args.nshift })
# Run the processing function on all data
# Find the intersection of the usual set of intervals we'd filter,
# and the intervals actually present in sinefit data. This is
# what we will process.
filter_int = f.intervals()
sinefit_int = ( Interval(start, end) for (start, end) in
client_sinefit.stream_intervals(
args.sinepath, start = f.start, end = f.end) )
intervals = nilmdb.utils.interval.intersection(filter_int, sinefit_int)
# Run the process (using the helper in the filter module)
f.process_numpy(process, args = (client_sinefit, sinefit.path, args.column,
args.nharm, rotation, args.nshift))
args.nharm, rotation, args.nshift),
intervals = intervals)
def process(data, interval, args, insert_function, final):
(client, sinefit_path, column, nharm, rotation, nshift) = args

View File

@@ -3,6 +3,7 @@
# Sine wave fitting.
from nilmdb.utils.printf import *
import nilmtools.filter
import nilmtools.math
import nilmdb.client
from nilmdb.utils.time import (timestamp_to_human,
timestamp_to_seconds,
@@ -11,7 +12,6 @@ from nilmdb.utils.time import (timestamp_to_human,
from numpy import *
from scipy import *
#import pylab as p
import operator
import sys
def main(argv = None):
@@ -119,7 +119,7 @@ def process(data, interval, args, insert_function, final):
t_max = timestamp_to_seconds(data[start+N-1, 0])
# Do 4-parameter sine wave fit
(A, f0, phi, C) = sfit4(this, fs)
(A, f0, phi, C) = nilmtools.math.sfit4(this, fs)
# Check bounds. If frequency is too crazy, ignore this window
if f0 < f_min or f0 > f_max:
@@ -187,76 +187,5 @@ def process(data, interval, args, insert_function, final):
printf("%sMarked %d zero-crossings in %d rows\n", now, num_zc, start)
return start
def sfit4(data, fs):
"""(A, f0, phi, C) = sfit4(data, fs)
Compute 4-parameter (unknown-frequency) least-squares fit to
sine-wave data, according to IEEE Std 1241-2010 Annex B
Input:
data vector of input samples
fs sampling rate (Hz)
Output:
Parameters [A, f0, phi, C] to fit the equation
x[n] = A * sin(f0/fs * 2 * pi * n + phi) + C
where n is sample number. Or, as a function of time:
x(t) = A * sin(f0 * 2 * pi * t + phi) + C
by Jim Paris
(Verified to match sfit4.m)
"""
N = len(data)
t = linspace(0, (N-1) / float(fs), N)
## Estimate frequency using FFT (step b)
Fc = fft(data)
F = abs(Fc)
F[0] = 0 # eliminate DC
# Find pair of spectral lines with largest amplitude:
# resulting values are in F(i) and F(i+1)
i = argmax(F[0:int(N/2)] + F[1:int(N/2+1)])
# Interpolate FFT to get a better result (from Markus [B37])
U1 = real(Fc[i])
U2 = real(Fc[i+1])
V1 = imag(Fc[i])
V2 = imag(Fc[i+1])
n = 2 * pi / N
ni1 = n * i
ni2 = n * (i+1)
K = ((V2-V1)*sin(ni1) + (U2-U1)*cos(ni1)) / (U2-U1)
Z1 = V1 * (K - cos(ni1)) / sin(ni1) + U1
Z2 = V2 * (K - cos(ni2)) / sin(ni2) + U2
i = arccos((Z2*cos(ni2) - Z1*cos(ni1)) / (Z2-Z1)) / n
# Convert to Hz
f0 = i * float(fs) / N
# Fit it. We'll catch exceptions here and just returns zeros
# if something fails with the least squares fit, etc.
try:
# first guess for A0, B0 using 3-parameter fit (step c)
s = zeros(3)
w = 2*pi*f0
# Now iterate 7 times (step b, plus 6 iterations of step i)
for idx in range(7):
D = c_[cos(w*t), sin(w*t), ones(N),
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
s = linalg.lstsq(D, data)[0] # eqn B.18
w = w + s[3] # update frequency estimate
## Extract results
A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
f0 = w / (2*pi)
phi = arctan2(s[0], s[1]) # eqn B.22 (flipped for sin instead of cos)
C = s[2]
return (A, f0, phi, C)
except Exception as e:
# something broke down, just return zeros
return (0, 0, 0, 0)
if __name__ == "__main__":
main()

View File

@@ -3,6 +3,7 @@
from nilmdb.utils.printf import *
import nilmdb.client
import nilmtools.filter
import nilmtools.math
from nilmdb.utils.time import (timestamp_to_human,
timestamp_to_seconds,
seconds_to_timestamp)
@@ -104,31 +105,6 @@ class Exemplar(object):
self.name, self.stream, ",".join(self.columns.keys()),
self.count)
def peak_detect(data, delta):
"""Simple min/max peak detection algorithm, taken from my code
in the disagg.m from the 10-8-5 paper"""
mins = [];
maxs = [];
cur_min = (None, np.inf)
cur_max = (None, -np.inf)
lookformax = False
for (n, p) in enumerate(data):
if p > cur_max[1]:
cur_max = (n, p)
if p < cur_min[1]:
cur_min = (n, p)
if lookformax:
if p < (cur_max[1] - delta):
maxs.append(cur_max)
cur_min = (n, p)
lookformax = False
else:
if p > (cur_min[1] + delta):
mins.append(cur_min)
cur_max = (n, p)
lookformax = True
return (mins, maxs)
def timestamp_to_short_human(timestamp):
dt = datetime_tz.datetime_tz.fromtimestamp(timestamp_to_seconds(timestamp))
return dt.strftime("%H:%M:%S")
@@ -164,11 +140,35 @@ def trainola_matcher(data, interval, args, insert_func, final_chunk):
# Find the peaks using the column with the largest amplitude
biggest = e.scale.index(max(e.scale))
peaks_minmax = peak_detect(corrs[biggest], 0.1)
peaks = [ p[0] for p in peaks_minmax[1] ]
peaks = nilmtools.math.peak_detect(corrs[biggest], 0.1)
# Now look at every peak
for row in peaks:
# To try to reduce false positives, discard peaks where
# there's a higher-magnitude peak (either min or max) within
# one exemplar width nearby.
good_peak_locations = []
for (i, (n, p, is_max)) in enumerate(peaks):
if not is_max:
continue
ok = True
# check up to 'e.count' rows before this one
j = i-1
while ok and j >= 0 and peaks[j][0] > (n - e.count):
if abs(peaks[j][1]) > abs(p):
ok = False
j -= 1
# check up to 'e.count' rows after this one
j = i+1
while ok and j < len(peaks) and peaks[j][0] < (n + e.count):
if abs(peaks[j][1]) > abs(p):
ok = False
j += 1
if ok:
good_peak_locations.append(n)
# Now look at all good peaks
for row in good_peak_locations:
# Correlation for each column must be close enough to 1.
for (corr, scale) in zip(corrs, e.scale):
# The accepted distance from 1 is based on the relative
@@ -287,8 +287,21 @@ def main(argv = None):
if argv is None:
argv = sys.argv[1:]
if len(argv) != 1:
raise DataError("need one argument, either a dictionary or JSON string")
if len(argv) != 1 or argv[0] == '-h' or argv[0] == '--help':
printf("usage: %s [-h] [-v] <json-config-dictionary>\n\n", sys.argv[0])
printf(" Where <json-config-dictionary> is a JSON-encoded " +
"dictionary string\n")
printf(" with exemplar and stream data.\n\n")
printf(" See extras/trainola-test-param*.js in the nilmtools " +
"repository\n")
printf(" for examples.\n")
if len(argv) != 1:
raise SystemExit(1)
raise SystemExit(0)
if argv[0] == '-v' or argv[0] == '--version':
printf("%s\n", nilmtools.__version__)
raise SystemExit(0)
try:
# Passed in a JSON string (e.g. on the command line)

View File

@@ -61,7 +61,7 @@ setup(name='nilmtools',
long_description = "NILM Database Tools",
license = "Proprietary",
author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.8.1',
install_requires = [ 'nilmdb >= 1.8.5',
'numpy',
'scipy',
'python-daemon >= 1.5',