Compare commits

...

25 Commits

Author SHA1 Message Date
62c8af41ea Cleanup comments 2013-06-06 15:34:23 -04:00
4f6bc48619 sinefit: include timestamps on marking output too 2013-05-11 11:00:31 -04:00
cf9eb0ed48 Improve sinefit resiliancy 2013-05-10 14:19:55 -04:00
32066fc260 Remove hard matplotlib dependency 2013-05-09 13:17:36 -04:00
739da3f973 Add median filter 2013-05-08 23:36:50 -04:00
83ad18ebf6 Fix non-string arguments to metadata_check 2013-05-08 12:49:38 -04:00
c76d527f95 Fix unicode handling in filter metadata match 2013-05-07 12:40:53 -04:00
b8a73278e7 Always store metadata rotation as a string 2013-04-29 14:25:11 -04:00
ce0691d6c4 sineefit: Change sfit4 to fit to \sin instead of \cos
And adjust the period locator accordingly.
Fitting \sin is the same mathematically, it's just conceptually more
straightforward since we're locating zero crossings anyway.
2013-04-27 18:12:20 -04:00
4da658e960 sinefit: move initial estimate into the main iteration loop
Just a little less code.  Same results.
2013-04-27 17:50:23 -04:00
8ab31eafc2 Allow shorthand method for creating an option-less parser.
This is mostly just intended to make a simple filter example shorter.
2013-04-21 16:53:28 -04:00
979ab13bff Force fs to be a float in sfit4 2013-04-17 17:58:15 -04:00
f4fda837ae Bump required nilmdb version to 1.6.0 2013-04-11 11:55:11 -04:00
5547d266d0 filter: Don't include trailing unprocessed data in the inserted intervals 2013-04-11 11:53:17 -04:00
372e977e4a Reverse cleanup order to handle interruptions better 2013-04-10 18:38:41 -04:00
640a680704 Increase default min amplitude in sinefit 2013-04-10 17:09:52 -04:00
2e74e6cd63 Skip over data if we aren't able to process any. Change output format 2013-04-10 17:01:07 -04:00
de2a794e00 Support wildcards in nilm-decimate-auto 2013-04-10 16:05:16 -04:00
065a40f265 sinefit: add minimum amplitude check 2013-04-10 15:33:51 -04:00
65fa43aff1 sinefit: catch all errors in sfit4 2013-04-10 14:36:50 -04:00
57c23c3792 sinefit: allow user to override min/max frequency detection 2013-04-10 14:36:40 -04:00
d4c8e4acb4 Include rotation in metadata 2013-04-10 14:36:05 -04:00
fd1b33401f Require a --yes argument before actually cleaning data 2013-04-09 20:13:38 -04:00
4c748ec00c Fix minor bugs 2013-04-09 20:08:25 -04:00
b72d6b6908 Warn if column count is wrong for this nharm value 2013-04-09 19:59:59 -04:00
9 changed files with 249 additions and 70 deletions

View File

@@ -12,7 +12,7 @@ test: test_cleanup
test_cleanup:
src/cleanup.py -e extras/cleanup.cfg
src/cleanup.py -D extras/cleanup.cfg
src/cleanup.py extras/cleanup.cfg
test_insert:
@make install >/dev/null

View File

@@ -5,10 +5,10 @@ by Jim Paris <jim@jtan.com>
Prerequisites:
# Runtime and build environments
sudo apt-get install python2.7 python2.7-dev python-setuptools
sudo apt-get install python-numpy python-scipy python-matplotlib
sudo apt-get install python2.7 python2.7-dev python-setuptools python-pip
sudo apt-get install python-numpy python-scipy
nilmdb (1.5.0+)
nilmdb (1.6.3+)
Install:

View File

@@ -61,10 +61,10 @@ setup(name='nilmtools',
long_description = "NILM Database Tools",
license = "Proprietary",
author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.5.0',
install_requires = [ 'nilmdb >= 1.6.3',
'numpy',
'scipy',
'matplotlib',
#'matplotlib',
],
packages = [ 'nilmtools',
],
@@ -79,6 +79,7 @@ setup(name='nilmtools',
'nilm-copy-wildcard = nilmtools.copy_wildcard:main',
'nilm-sinefit = nilmtools.sinefit:main',
'nilm-cleanup = nilmtools.cleanup:main',
'nilm-median = nilmtools.median:main',
],
},
zip_safe = False,

View File

@@ -19,7 +19,7 @@ def warn(msg, *args):
fprintf(sys.stderr, "warning: " + msg + "\n", *args)
class TimePeriod(object):
_units = { 'h': ('hour', 60*60*24),
_units = { 'h': ('hour', 60*60),
'd': ('day', 60*60*24),
'w': ('week', 60*60*24*7),
'm': ('month', 60*60*24*30),
@@ -96,9 +96,9 @@ def main(argv = None):
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-D", "--dry-run", action="store_true",
parser.add_argument("-y", "--yes", action="store_true",
default = False,
help="Don't actually remove any data")
help="Actually remove the data (default: no)")
parser.add_argument("-e", "--estimate", action="store_true",
default = False,
help="Estimate how much disk space will be used")
@@ -228,7 +228,7 @@ def main(argv = None):
keep = seconds_to_timestamp(streams[path].keep.seconds())
for i in intervals:
total += i.end - i.start
if total < keep:
if total <= keep:
continue
remove_before = i.start + (total - keep)
break
@@ -238,14 +238,19 @@ def main(argv = None):
timestamp_to_seconds(total)))
continue
printf(" removing data before %s\n", timestamp_to_human(remove_before))
if not args.dry_run:
client.stream_remove(path, None, remove_before)
for ap in streams[path].also_clean_paths:
printf(" also removing from %s\n", ap)
if not args.dry_run:
client.stream_remove(ap, None, remove_before)
# Clean in reverse order. Since we only use the primary stream and not
# the decimated streams to figure out which data to remove, removing
# the primary stream last means that we might recover more nicely if
# we are interrupted and restarted.
clean_paths = list(reversed(streams[path].also_clean_paths)) + [ path ]
for p in clean_paths:
printf(" removing from %s\n", p)
if args.yes:
client.stream_remove(p, None, remove_before)
# All done
if not args.yes:
printf("Note: specify --yes to actually perform removals\n")
return
if __name__ == "__main__":

View File

@@ -4,15 +4,19 @@ import nilmtools.filter
import nilmtools.decimate
import nilmdb.client
import argparse
import fnmatch
def main(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = "1.0",
version = nilmtools.__version__,
description = """\
Automatically create multiple decimations from a single source
stream, continuing until the last decimated level contains fewer
than 500 points total.
Wildcards and multiple paths are accepted. Decimated paths are
ignored when matching wildcards.
""")
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
@@ -23,20 +27,36 @@ def main(argv = None):
default = False,
help="Force metadata changes if the dest "
"doesn't match")
parser.add_argument("path", action="store",
parser.add_argument("path", action="store", nargs='+',
help='Path of base stream')
args = parser.parse_args(argv)
# Pull out info about the base stream
client = nilmdb.client.Client(args.url)
info = nilmtools.filter.get_stream_info(client, args.path)
if not info:
raise Exception("path " + args.path + " not found")
# Find list of paths to process
streams = [ unicode(s[0]) for s in client.stream_list() ]
streams = [ s for s in streams if "~decim-" not in s ]
paths = []
for path in args.path:
new = fnmatch.filter(streams, unicode(path))
if not new:
print "error: no stream matched path:", path
raise SystemExit(1)
paths.extend(new)
meta = client.stream_get_metadata(args.path)
for path in paths:
do_decimation(client, args, path)
def do_decimation(client, args, path):
print "Decimating", path
info = nilmtools.filter.get_stream_info(client, path)
if not info:
raise Exception("path " + path + " not found")
meta = client.stream_get_metadata(path)
if "decimate_source" in meta:
print "Stream", args.path, "was decimated from", meta["decimate_source"]
print "Stream", path, "was decimated from", meta["decimate_source"]
print "You need to pass the base stream instead"
raise SystemExit(1)
@@ -53,7 +73,7 @@ def main(argv = None):
if info.rows <= 500:
break
factor *= args.factor
new_path = "%s~decim-%d" % (args.path, factor)
new_path = "%s~decim-%d" % (path, factor)
# Create the stream if needed
new_info = nilmtools.filter.get_stream_info(client, new_path)
@@ -72,5 +92,7 @@ def main(argv = None):
# Update info using the newly decimated stream
info = nilmtools.filter.get_stream_info(client, new_path)
return
if __name__ == "__main__":
main()

View File

@@ -67,7 +67,7 @@ def get_stream_info(client, path):
class Filter(object):
def __init__(self):
def __init__(self, parser_description = None):
self._parser = None
self._client_src = None
self._client_dest = None
@@ -78,6 +78,9 @@ class Filter(object):
self.end = None
self.interhost = False
self.force_metadata = False
if parser_description is not None:
self.setup_parser(parser_description)
self.parse_args()
@property
def client_src(self):
@@ -233,8 +236,14 @@ class Filter(object):
metadata = self._client_dest.stream_get_metadata(self.dest.path)
if not self.force_metadata:
for key in data:
wanted = str(data[key])
wanted = data[key]
if not isinstance(wanted, basestring):
wanted = str(wanted)
val = metadata.get(key, wanted)
# Force UTF-8 encoding for comparison and display
wanted = wanted.encode('utf-8')
val = val.encode('utf-8')
key = key.encode('utf-8')
if val != wanted and self.dest.rows > 0:
m = "Metadata in destination stream:\n"
m += " %s = %s\n" % (key, val)
@@ -275,6 +284,10 @@ class Filter(object):
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
If unprocessed data remains after 'final' is True, the interval
being inserted will be ended at the timestamp of the first
unprocessed data point.
"""
if args is None:
args = []
@@ -319,7 +332,13 @@ class Filter(object):
# Last call for this contiguous interval
if old_array.shape[0] != 0:
function(old_array, interval, args, insert_function, True)
processed = function(old_array, interval, args,
insert_function, True)
if processed != old_array.shape[0]:
# Truncate the interval we're inserting at the first
# unprocessed data point. This ensures that
# we'll not miss any data when we run again later.
insert_ctx.update_end(old_array[processed][0])
def main(argv = None):
# This is just a dummy function; actual filters can use the other

43
src/median.py Executable file
View File

@@ -0,0 +1,43 @@
#!/usr/bin/python
import nilmtools.filter, scipy.signal
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Median Filter")
group = parser.add_argument_group("Median filter options")
group.add_argument("-z", "--size", action="store", type=int, default=25,
help = "median filter size (default %(default)s)")
group.add_argument("-d", "--difference", action="store_true",
help = "store difference rather than filtered values")
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, e.src.layout)
raise SystemExit(1)
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata({ "median_filter_source": f.src.path,
"median_filter_size": args.size,
"median_filter_difference": repr(args.difference) })
f.process_numpy(median_filter, args = (args.size, args.difference))
def median_filter(data, interval, args, insert, final):
(size, diff) = args
(rows, cols) = data.shape
for i in range(cols - 1):
filtered = scipy.signal.medfilt(data[:, i+1], size)
if diff:
data[:, i+1] -= filtered
else:
data[:, i+1] = filtered
insert(data)
return rows
if __name__ == "__main__":
main()

View File

@@ -3,6 +3,8 @@
# Spectral envelope preprocessor.
# Requires two streams as input: the original raw data, and sinefit data.
from nilmdb.utils.printf import *
from nilmdb.utils.time import timestamp_to_human
import nilmtools.filter
import nilmdb.client
from numpy import *
@@ -46,6 +48,10 @@ def main(argv = None):
print " nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec)
raise SystemExit(1)
if f.dest.layout_count != args.nharm * 2:
print "error: need", args.nharm*2, "columns in destination stream"
raise SystemExit(1)
# Check arguments
if args.column is None or args.column < 1:
parser.error("need a column number >= 1")
@@ -73,7 +79,8 @@ def main(argv = None):
# Check and set metadata in prep stream
f.check_dest_metadata({ "prep_raw_source": f.src.path,
"prep_sinefit_source": sinefit.path,
"prep_column": args.column })
"prep_column": args.column,
"prep_rotation": repr(rotation) })
# Run the processing function on all data
f.process_numpy(process, args = (client_sinefit, sinefit.path, args.column,
@@ -101,7 +108,6 @@ def process(data, interval, args, insert_function, final):
# Pull out sinefit data for the entire time range of this block
for sinefit_line in client.stream_extract(sinefit_path,
data[0, 0], data[rows-1, 0]):
def prep_period(t_min, t_max, rot):
"""
Compute prep coefficients from time t_min to t_max, which
@@ -158,7 +164,15 @@ def process(data, interval, args, insert_function, final):
break
processed = idx_max
print "Processed", processed, "of", rows, "rows"
# If we processed no data but there's lots in here, pretend we
# processed half of it.
if processed == 0 and rows > 10000:
processed = rows / 2
printf("%s: warning: no periods found; skipping %d rows\n",
timestamp_to_human(data[0][0]), processed)
else:
printf("%s: processed %d of %d rows\n",
timestamp_to_human(data[0][0]), processed, rows)
return processed
if __name__ == "__main__":

View File

@@ -1,13 +1,18 @@
#!/usr/bin/python
# Sine wave fitting. This runs about 5x faster than realtime on raw data.
# Sine wave fitting.
from nilmdb.utils.printf import *
import nilmtools.filter
import nilmdb.client
from nilmdb.utils.time import (timestamp_to_human,
timestamp_to_seconds,
seconds_to_timestamp)
from numpy import *
from scipy import *
#import pylab as p
import operator
import sys
def main(argv = None):
f = nilmtools.filter.Filter()
@@ -18,6 +23,15 @@ def main(argv = None):
group.add_argument('-f', '--frequency', action='store', type=float,
default=60.0,
help='Approximate frequency (default: %(default)s)')
group.add_argument('-m', '--min-freq', action='store', type=float,
help='Minimum valid frequency '
'(default: approximate frequency / 2))')
group.add_argument('-M', '--max-freq', action='store', type=float,
help='Maximum valid frequency '
'(default: approximate frequency * 2))')
group.add_argument('-a', '--min-amp', action='store', type=float,
default=20.0,
help='Minimum signal amplitude (default: %(default)s)')
# Parse arguments
try:
@@ -34,17 +48,56 @@ def main(argv = None):
parser.error("need a column number >= 1")
if args.frequency < 0.1:
parser.error("frequency must be >= 0.1")
if args.min_freq is None:
args.min_freq = args.frequency / 2
if args.max_freq is None:
args.max_freq = args.frequency * 2
if (args.min_freq > args.max_freq or
args.min_freq > args.frequency or
args.max_freq < args.frequency):
parser.error("invalid min or max frequency")
if args.min_amp < 0:
parser.error("min amplitude must be >= 0")
f.check_dest_metadata({ "sinefit_source": f.src.path,
"sinefit_column": args.column })
f.process_numpy(process, args = (args.column, args.frequency))
f.process_numpy(process, args = (args.column, args.frequency, args.min_amp,
args.min_freq, args.max_freq))
class SuppressibleWarning(object):
def __init__(self, maxcount = 10, maxsuppress = 100):
self.maxcount = maxcount
self.maxsuppress = maxsuppress
self.count = 0
self.last_msg = ""
def _write(self, sec, msg):
if sec:
now = timestamp_to_human(seconds_to_timestamp(sec)) + ": "
else:
now = ""
sys.stderr.write(now + msg)
def warn(self, msg, seconds = None):
self.count += 1
if self.count <= self.maxcount:
self._write(seconds, msg)
if (self.count - self.maxcount) >= self.maxsuppress:
self.reset(seconds)
def reset(self, seconds = None):
if self.count > self.maxcount:
self._write(seconds, sprintf("(%d warnings suppressed)\n",
self.count - self.maxcount))
self.count = 0
def process(data, interval, args, insert_function, final):
(column, f_expected) = args
(column, f_expected, a_min, f_min, f_max) = args
rows = data.shape[0]
# Estimate sampling frequency from timestamps
fs = 1e6 * (rows-1) / (data[-1][0] - data[0][0])
fs = (rows-1) / (timestamp_to_seconds(data[-1][0]) -
timestamp_to_seconds(data[0][0]))
# Pull out about 3.5 periods of data at once;
# we'll expect to match 3 zero crossings in each window
@@ -54,30 +107,41 @@ def process(data, interval, args, insert_function, final):
if rows < N:
return 0
warn = SuppressibleWarning(3, 1000)
# Process overlapping windows
start = 0
num_zc = 0
last_inserted_timestamp = None
while start < (rows - N):
this = data[start:start+N, column]
t_min = data[start, 0]/1e6
t_max = data[start+N-1, 0]/1e6
t_min = timestamp_to_seconds(data[start, 0])
t_max = timestamp_to_seconds(data[start+N-1, 0])
# Do 4-parameter sine wave fit
(A, f0, phi, C) = sfit4(this, fs)
# Check bounds. If frequency is too crazy, ignore this window
if f0 < (f_expected/2) or f0 > (f_expected*2):
print "frequency", f0, "too far from expected value", f_expected
if f0 < f_min or f0 > f_max:
warn.warn(sprintf("frequency %s outside valid range %s - %s\n",
str(f0), str(f_min), str(f_max)), t_min)
start += N
continue
# If amplitude is too low, results are probably just noise
if A < a_min:
warn.warn(sprintf("amplitude %s below minimum threshold %s\n",
str(A), str(a_min)), t_min)
start += N
continue
#p.plot(arange(N), this)
#p.plot(arange(N), A * cos(f0/fs * 2 * pi * arange(N) + phi) + C, 'g')
#p.plot(arange(N), A * sin(f0/fs * 2 * pi * arange(N) + phi) + C, 'g')
# Period starts when the argument of cosine is 3*pi/2 degrees,
# Period starts when the argument of sine is 0 degrees,
# so we're looking for sample number:
# n = (3 * pi / 2 - phi) / (f0/fs * 2 * pi)
zc_n = (3 * pi / 2 - phi) / (f0 / fs * 2 * pi)
# n = (0 - phi) / (f0/fs * 2 * pi)
zc_n = (0 - phi) / (f0 / fs * 2 * pi)
period_n = fs/f0
# Add periods to make N positive
@@ -90,7 +154,13 @@ def process(data, interval, args, insert_function, final):
while zc_n < (N - period_n/2):
#p.plot(zc_n, C, 'ro')
t = t_min + zc_n / fs
insert_function([[t * 1e6, f0, A, C]])
if (last_inserted_timestamp is None or
t > last_inserted_timestamp):
insert_function([[seconds_to_timestamp(t), f0, A, C]])
last_inserted_timestamp = t
warn.reset(t)
else:
warn.warn("timestamp overlap\n", t)
num_zc += 1
last_zc = zc_n
zc_n += period_n
@@ -108,7 +178,13 @@ def process(data, interval, args, insert_function, final):
start = int(round(start + advance))
# Return the number of rows we've processed
print "Marked", num_zc, "zero-crossings in", start, "rows"
warn.reset(last_inserted_timestamp)
if last_inserted_timestamp:
now = timestamp_to_human(seconds_to_timestamp(
last_inserted_timestamp)) + ": "
else:
now = ""
printf("%sMarked %d zero-crossings in %d rows\n", now, num_zc, start)
return start
def sfit4(data, fs):
@@ -123,15 +199,15 @@ def sfit4(data, fs):
Output:
Parameters [A, f0, phi, C] to fit the equation
x[n] = A * cos(f0/fs * 2 * pi * n + phi) + C
x[n] = A * sin(f0/fs * 2 * pi * n + phi) + C
where n is sample number. Or, as a function of time:
x(t) = A * cos(f0 * 2 * pi * t + phi) + C
x(t) = A * sin(f0 * 2 * pi * t + phi) + C
by Jim Paris
(Verified to match sfit4.m)
"""
N = len(data)
t = linspace(0, (N-1) / fs, N)
t = linspace(0, (N-1) / float(fs), N)
## Estimate frequency using FFT (step b)
Fc = fft(data)
@@ -156,32 +232,31 @@ def sfit4(data, fs):
i = arccos((Z2*cos(ni2) - Z1*cos(ni1)) / (Z2-Z1)) / n
# Convert to Hz
f0 = i * fs / N
f0 = i * float(fs) / N
## Fit it
# first guess for A0, B0 using 3-parameter fit (step c)
w = 2*pi*f0
D = c_[cos(w*t), sin(w*t), ones(N)]
s = linalg.lstsq(D, data)[0]
# Now iterate 6 times (step i)
for idx in range(6):
D = c_[cos(w*t), sin(w*t), ones(N),
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
s = linalg.lstsq(D, data)[0] # eqn B.18
w = w + s[3] # update frequency estimate
## Extract results
A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
f0 = w / (2*pi)
# Fit it. We'll catch exceptions here and just returns zeros
# if something fails with the least squares fit, etc.
try:
phi = -arctan2(s[1], s[0]) # eqn B.22
except TypeError:
# first guess for A0, B0 using 3-parameter fit (step c)
s = zeros(3)
w = 2*pi*f0
# Now iterate 7 times (step b, plus 6 iterations of step i)
for idx in range(7):
D = c_[cos(w*t), sin(w*t), ones(N),
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
s = linalg.lstsq(D, data)[0] # eqn B.18
w = w + s[3] # update frequency estimate
## Extract results
A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
f0 = w / (2*pi)
phi = arctan2(s[0], s[1]) # eqn B.22 (flipped for sin instead of cos)
C = s[2]
return (A, f0, phi, C)
except Exception as e:
# something broke down, just return zeros
return (0, 0, 0, 0)
C = s[2]
return (A, f0, phi, C)
if __name__ == "__main__":
main()