Compare commits

...

29 Commits

Author SHA1 Message Date
62c8af41ea Cleanup comments 2013-06-06 15:34:23 -04:00
4f6bc48619 sinefit: include timestamps on marking output too 2013-05-11 11:00:31 -04:00
cf9eb0ed48 Improve sinefit resiliancy 2013-05-10 14:19:55 -04:00
32066fc260 Remove hard matplotlib dependency 2013-05-09 13:17:36 -04:00
739da3f973 Add median filter 2013-05-08 23:36:50 -04:00
83ad18ebf6 Fix non-string arguments to metadata_check 2013-05-08 12:49:38 -04:00
c76d527f95 Fix unicode handling in filter metadata match 2013-05-07 12:40:53 -04:00
b8a73278e7 Always store metadata rotation as a string 2013-04-29 14:25:11 -04:00
ce0691d6c4 sineefit: Change sfit4 to fit to \sin instead of \cos
And adjust the period locator accordingly.
Fitting \sin is the same mathematically, it's just conceptually more
straightforward since we're locating zero crossings anyway.
2013-04-27 18:12:20 -04:00
4da658e960 sinefit: move initial estimate into the main iteration loop
Just a little less code.  Same results.
2013-04-27 17:50:23 -04:00
8ab31eafc2 Allow shorthand method for creating an option-less parser.
This is mostly just intended to make a simple filter example shorter.
2013-04-21 16:53:28 -04:00
979ab13bff Force fs to be a float in sfit4 2013-04-17 17:58:15 -04:00
f4fda837ae Bump required nilmdb version to 1.6.0 2013-04-11 11:55:11 -04:00
5547d266d0 filter: Don't include trailing unprocessed data in the inserted intervals 2013-04-11 11:53:17 -04:00
372e977e4a Reverse cleanup order to handle interruptions better 2013-04-10 18:38:41 -04:00
640a680704 Increase default min amplitude in sinefit 2013-04-10 17:09:52 -04:00
2e74e6cd63 Skip over data if we aren't able to process any. Change output format 2013-04-10 17:01:07 -04:00
de2a794e00 Support wildcards in nilm-decimate-auto 2013-04-10 16:05:16 -04:00
065a40f265 sinefit: add minimum amplitude check 2013-04-10 15:33:51 -04:00
65fa43aff1 sinefit: catch all errors in sfit4 2013-04-10 14:36:50 -04:00
57c23c3792 sinefit: allow user to override min/max frequency detection 2013-04-10 14:36:40 -04:00
d4c8e4acb4 Include rotation in metadata 2013-04-10 14:36:05 -04:00
fd1b33401f Require a --yes argument before actually cleaning data 2013-04-09 20:13:38 -04:00
4c748ec00c Fix minor bugs 2013-04-09 20:08:25 -04:00
b72d6b6908 Warn if column count is wrong for this nharm value 2013-04-09 19:59:59 -04:00
80d642e52e Change nilm-cleanup config file format, tweak output 2013-04-09 19:43:41 -04:00
001b89b1d2 Support multiple shifted FFTs per period in nilm-prep.
New option --nshift controls how many shifted FFT windows to perform
per period.  "nilm-prep -N 2" is similar to old prep behavior.  Note
that this is redundant information and takes up extra storage space,
though.
2013-04-09 18:53:27 -04:00
f978823505 Fix prep scaling and fix comments 2013-04-09 17:44:13 -04:00
ffd6675979 Remove outdated code 2013-04-08 19:46:16 -04:00
10 changed files with 601 additions and 109 deletions

View File

@@ -8,8 +8,11 @@ else
@echo "Try 'make install'"
endif
test:
src/decimate.py
test: test_cleanup
test_cleanup:
src/cleanup.py -e extras/cleanup.cfg
src/cleanup.py extras/cleanup.cfg
test_insert:
@make install >/dev/null
@@ -21,12 +24,16 @@ test_copy:
test_prep:
@make install >/dev/null
src/prep.py -c 3 \
/lees-compressor/no-leak/raw \
/lees-compressor/no-leak/sinefit \
/lees-compressor/no-leak/prep \
-s '2013-02-19 18:00:00' \
-r 0
-nilmtool destroy -R /test/raw
-nilmtool destroy -R /test/sinefit
-nilmtool destroy -R /test/prep
nilmtool create /test/raw float32_2
nilmtool create /test/sinefit float32_3
nilmtool create /test/prep float32_8
nilmtool insert -s '@0' -t -r 8000 /test/raw /tmp/raw.dat
src/sinefit.py -c 1 /test/raw /test/sinefit
src/prep.py -c 2 /test/raw /test/sinefit /test/prep
nilmtool extract -s min -e max /test/prep | head -20
test_decimate:
-@nilmtool destroy /lees-compressor/no-leak/raw/4 || true

View File

@@ -5,10 +5,10 @@ by Jim Paris <jim@jtan.com>
Prerequisites:
# Runtime and build environments
sudo apt-get install python2.7 python2.7-dev python-setuptools
sudo apt-get install python-numpy python-scipy python-matplotlib
sudo apt-get install python2.7 python2.7-dev python-setuptools python-pip
sudo apt-get install python-numpy python-scipy
nilmdb (1.5.0+)
nilmdb (1.6.3+)
Install:

22
extras/cleanup.cfg Normal file
View File

@@ -0,0 +1,22 @@
[/lees-compressor/no-leak/prep]
keep = 2d
rate = 60
[*/raw]
keep = 2d
[*/something]
rate = 10
[*/sinefit]
keep = 1w
decimated = False
[/test/raw]
keep = 0.01d
[/test/sinefit]
keep = 0.01d
[/test/prep]
keep = 0.01d

View File

@@ -61,10 +61,10 @@ setup(name='nilmtools',
long_description = "NILM Database Tools",
license = "Proprietary",
author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.5.0',
install_requires = [ 'nilmdb >= 1.6.3',
'numpy',
'scipy',
'matplotlib',
#'matplotlib',
],
packages = [ 'nilmtools',
],
@@ -78,6 +78,8 @@ setup(name='nilmtools',
'nilm-prep = nilmtools.prep:main',
'nilm-copy-wildcard = nilmtools.copy_wildcard:main',
'nilm-sinefit = nilmtools.sinefit:main',
'nilm-cleanup = nilmtools.cleanup:main',
'nilm-median = nilmtools.median:main',
],
},
zip_safe = False,

257
src/cleanup.py Executable file
View File

@@ -0,0 +1,257 @@
#!/usr/bin/python
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds, seconds_to_timestamp)
from nilmdb.utils.diskusage import human_size
from nilmdb.utils.interval import Interval
import nilmdb.client
import nilmdb.client.numpyclient
import nilmtools
import argparse
import ConfigParser
import sys
import collections
import fnmatch
import re
def warn(msg, *args):
fprintf(sys.stderr, "warning: " + msg + "\n", *args)
class TimePeriod(object):
_units = { 'h': ('hour', 60*60),
'd': ('day', 60*60*24),
'w': ('week', 60*60*24*7),
'm': ('month', 60*60*24*30),
'y': ('year', 60*60*24*365) }
def __init__(self, val):
for u in self._units:
if val.endswith(u):
self.unit = self._units[u][0]
self.scale = self._units[u][1]
self.count = float(val[:-len(u)])
break
else:
raise ValueError("unknown units: " + units)
def seconds(self):
return self.count * self.scale
def describe_seconds(self, seconds):
count = seconds / self.scale
units = self.unit if count == 1 else (self.unit + "s")
if count == int(count):
return sprintf("%d %s", count, units)
else:
return sprintf("%.2f %s", count, units)
def __str__(self):
return self.describe_seconds(self.seconds())
class StreamCleanupConfig(object):
def __init__(self, info):
self.path = info[0]
self.layout = info[1]
if info[4] != 0 and info[5] != 0:
self.rate = info[4] / timestamp_to_seconds(info[5])
else:
self.rate = None
self.keep = None
self.clean_decimated = True
self.decimated_from = None
self.also_clean_paths = []
def main(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = """\
Clean up old data from streams using a configuration file to specify
which data to remove.
The format of the config file is as follows:
[/stream/path]
keep = 3w # keep up to 3 weeks of data
rate = 8000 # optional, used for the --estimate option
decimated = false # whether to delete decimated data too (default true)
[*/prep]
keep = 3.5m # or 2520h or 105d or 15w or 0.29y
The suffix for 'keep' is 'h' for hours, 'd' for days, 'w' for weeks,
'm' for months, or 'y' for years.
Streams paths may include wildcards. If a path is matched by more than
one config section, data from the last config section counts.
Decimated streams (paths containing '~decim-') are treated specially:
- They don't match wildcards
- When deleting data from a parent stream, data is also deleted
from its decimated streams, unless decimated=false
Rate is optional and is only used for the --estimate option.
""")
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-y", "--yes", action="store_true",
default = False,
help="Actually remove the data (default: no)")
parser.add_argument("-e", "--estimate", action="store_true",
default = False,
help="Estimate how much disk space will be used")
parser.add_argument("configfile", type=argparse.FileType('r'),
help="Configuration file")
args = parser.parse_args(argv)
# Parse config file
config = ConfigParser.RawConfigParser()
config.readfp(args.configfile)
# List all streams
client = nilmdb.client.Client(args.url)
streamlist = client.stream_list(extended = True)
# Create config objects
streams = collections.OrderedDict()
for s in streamlist:
streams[s[0]] = StreamCleanupConfig(s)
m = re.search(r"^(.*)~decim-[0-9]+$", s[0])
if m:
streams[s[0]].decimated_from = m.group(1)
# Build up configuration
for section in config.sections():
matched = False
for path in streams.iterkeys():
# Decimated streams only allow exact matches
if streams[path].decimated_from and path != section:
continue
if not fnmatch.fnmatch(path, section):
continue
matched = True
options = config.options(section)
# Keep period (days, weeks, months, years)
if 'keep' in options:
streams[path].keep = TimePeriod(config.get(section, 'keep'))
options.remove('keep')
# Rate
if 'rate' in options:
streams[path].rate = config.getfloat(section, 'rate')
options.remove('rate')
# Decimated
if 'decimated' in options:
val = config.getboolean(section, 'decimated')
streams[path].clean_decimated = val
options.remove('decimated')
for leftover in options:
warn("option '%s' for '%s' is unknown", leftover, section)
if not matched:
warn("config for '%s' did not match any existing streams", section)
# List all decimated streams in the parent stream's info
for path in streams.keys():
src = streams[path].decimated_from
if src and src in streams:
if streams[src].clean_decimated:
streams[src].also_clean_paths.append(path)
del streams[path]
# Warn about streams that aren't getting cleaned up
for path in streams.keys():
if streams[path].keep is None or streams[path].keep.seconds() < 0:
warn("no config for existing stream '%s'", path)
del streams[path]
if args.estimate:
# Estimate disk usage
total = 0
for path in streams.keys():
rate = streams[path].rate
if not rate or rate < 0:
warn("unable to estimate disk usage for stream '%s' because "
"the data rate is unknown", path)
continue
printf("%s:\n", path)
layout = streams[path].layout
dtype = nilmdb.client.numpyclient.layout_to_dtype(layout)
per_row = dtype.itemsize
per_sec = per_row * rate
printf("%17s: %s per row, %s rows per second\n",
"base rate",
human_size(per_row),
round(rate,1))
printf("%17s: %s per hour, %s per day\n",
"base size",
human_size(per_sec * 3600),
human_size(per_sec * 3600 * 24))
# If we'll be cleaning up decimated data, add an
# estimation for how much room decimated data takes up.
if streams[path].clean_decimated:
d_layout = "float32_" + str(3*(int(layout.split('_')[1])))
d_dtype = nilmdb.client.numpyclient.layout_to_dtype(d_layout)
# Assume the decimations will be a factor of 4
# sum_{k=0..inf} (rate / (n^k)) * d_dtype.itemsize
d_per_row = d_dtype.itemsize
factor = 4.0
d_per_sec = d_per_row * (rate / factor) * (1 / (1 - (1/factor)))
per_sec += d_per_sec
printf("%17s: %s per hour, %s per day\n",
"with decimation",
human_size(per_sec * 3600),
human_size(per_sec * 3600 * 24))
keep = per_sec * streams[path].keep.seconds()
printf("%17s: %s\n\n",
"keep " + str(streams[path].keep), human_size(keep))
total += keep
printf("Total estimated disk usage for these streams:\n")
printf(" %s\n", human_size(total))
raise SystemExit(0)
# Do the cleanup
for path in streams:
printf("%s: keep %s\n", path, streams[path].keep)
# Figure out the earliest timestamp we should keep.
intervals = [ Interval(start, end) for (start, end) in
reversed(list(client.stream_intervals(path))) ]
total = 0
keep = seconds_to_timestamp(streams[path].keep.seconds())
for i in intervals:
total += i.end - i.start
if total <= keep:
continue
remove_before = i.start + (total - keep)
break
else:
printf(" nothing to do (only %s of data present)\n",
streams[path].keep.describe_seconds(
timestamp_to_seconds(total)))
continue
printf(" removing data before %s\n", timestamp_to_human(remove_before))
# Clean in reverse order. Since we only use the primary stream and not
# the decimated streams to figure out which data to remove, removing
# the primary stream last means that we might recover more nicely if
# we are interrupted and restarted.
clean_paths = list(reversed(streams[path].also_clean_paths)) + [ path ]
for p in clean_paths:
printf(" removing from %s\n", p)
if args.yes:
client.stream_remove(p, None, remove_before)
# All done
if not args.yes:
printf("Note: specify --yes to actually perform removals\n")
return
if __name__ == "__main__":
main()

View File

@@ -4,15 +4,19 @@ import nilmtools.filter
import nilmtools.decimate
import nilmdb.client
import argparse
import fnmatch
def main(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = "1.0",
version = nilmtools.__version__,
description = """\
Automatically create multiple decimations from a single source
stream, continuing until the last decimated level contains fewer
than 500 points total.
Wildcards and multiple paths are accepted. Decimated paths are
ignored when matching wildcards.
""")
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
@@ -23,20 +27,36 @@ def main(argv = None):
default = False,
help="Force metadata changes if the dest "
"doesn't match")
parser.add_argument("path", action="store",
parser.add_argument("path", action="store", nargs='+',
help='Path of base stream')
args = parser.parse_args(argv)
# Pull out info about the base stream
client = nilmdb.client.Client(args.url)
info = nilmtools.filter.get_stream_info(client, args.path)
if not info:
raise Exception("path " + args.path + " not found")
# Find list of paths to process
streams = [ unicode(s[0]) for s in client.stream_list() ]
streams = [ s for s in streams if "~decim-" not in s ]
paths = []
for path in args.path:
new = fnmatch.filter(streams, unicode(path))
if not new:
print "error: no stream matched path:", path
raise SystemExit(1)
paths.extend(new)
meta = client.stream_get_metadata(args.path)
for path in paths:
do_decimation(client, args, path)
def do_decimation(client, args, path):
print "Decimating", path
info = nilmtools.filter.get_stream_info(client, path)
if not info:
raise Exception("path " + path + " not found")
meta = client.stream_get_metadata(path)
if "decimate_source" in meta:
print "Stream", args.path, "was decimated from", meta["decimate_source"]
print "Stream", path, "was decimated from", meta["decimate_source"]
print "You need to pass the base stream instead"
raise SystemExit(1)
@@ -53,7 +73,7 @@ def main(argv = None):
if info.rows <= 500:
break
factor *= args.factor
new_path = "%s~decim-%d" % (args.path, factor)
new_path = "%s~decim-%d" % (path, factor)
# Create the stream if needed
new_info = nilmtools.filter.get_stream_info(client, new_path)
@@ -72,5 +92,7 @@ def main(argv = None):
# Update info using the newly decimated stream
info = nilmtools.filter.get_stream_info(client, new_path)
return
if __name__ == "__main__":
main()

View File

@@ -67,7 +67,7 @@ def get_stream_info(client, path):
class Filter(object):
def __init__(self):
def __init__(self, parser_description = None):
self._parser = None
self._client_src = None
self._client_dest = None
@@ -78,6 +78,9 @@ class Filter(object):
self.end = None
self.interhost = False
self.force_metadata = False
if parser_description is not None:
self.setup_parser(parser_description)
self.parse_args()
@property
def client_src(self):
@@ -233,8 +236,14 @@ class Filter(object):
metadata = self._client_dest.stream_get_metadata(self.dest.path)
if not self.force_metadata:
for key in data:
wanted = str(data[key])
wanted = data[key]
if not isinstance(wanted, basestring):
wanted = str(wanted)
val = metadata.get(key, wanted)
# Force UTF-8 encoding for comparison and display
wanted = wanted.encode('utf-8')
val = val.encode('utf-8')
key = key.encode('utf-8')
if val != wanted and self.dest.rows > 0:
m = "Metadata in destination stream:\n"
m += " %s = %s\n" % (key, val)
@@ -275,20 +284,16 @@ class Filter(object):
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
If unprocessed data remains after 'final' is True, the interval
being inserted will be ended at the timestamp of the first
unprocessed data point.
"""
if args is None:
args = []
extractor = NumpyClient(self.src.url).stream_extract_numpy
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
# Format output data.
formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"
def batch(iterable, size):
c = itertools.count()
for k, g in itertools.groupby(iterable, lambda x: c.next() // size):
yield g
for interval in self.intervals():
print "Processing", self.interval_string(interval)
with inserter(self.dest.path,
@@ -327,7 +332,13 @@ class Filter(object):
# Last call for this contiguous interval
if old_array.shape[0] != 0:
function(old_array, interval, args, insert_function, True)
processed = function(old_array, interval, args,
insert_function, True)
if processed != old_array.shape[0]:
# Truncate the interval we're inserting at the first
# unprocessed data point. This ensures that
# we'll not miss any data when we run again later.
insert_ctx.update_end(old_array[processed][0])
def main(argv = None):
# This is just a dummy function; actual filters can use the other

43
src/median.py Executable file
View File

@@ -0,0 +1,43 @@
#!/usr/bin/python
import nilmtools.filter, scipy.signal
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Median Filter")
group = parser.add_argument_group("Median filter options")
group.add_argument("-z", "--size", action="store", type=int, default=25,
help = "median filter size (default %(default)s)")
group.add_argument("-d", "--difference", action="store_true",
help = "store difference rather than filtered values")
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, e.src.layout)
raise SystemExit(1)
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata({ "median_filter_source": f.src.path,
"median_filter_size": args.size,
"median_filter_difference": repr(args.difference) })
f.process_numpy(median_filter, args = (args.size, args.difference))
def median_filter(data, interval, args, insert, final):
(size, diff) = args
(rows, cols) = data.shape
for i in range(cols - 1):
filtered = scipy.signal.medfilt(data[:, i+1], size)
if diff:
data[:, i+1] -= filtered
else:
data[:, i+1] = filtered
insert(data)
return rows
if __name__ == "__main__":
main()

View File

@@ -3,6 +3,8 @@
# Spectral envelope preprocessor.
# Requires two streams as input: the original raw data, and sinefit data.
from nilmdb.utils.printf import *
from nilmdb.utils.time import timestamp_to_human
import nilmtools.filter
import nilmdb.client
from numpy import *
@@ -19,12 +21,14 @@ def main(argv = None):
group.add_argument("-c", "--column", action="store", type=int,
help="Column number (first data column is 1)")
group.add_argument("-n", "--nharm", action="store", type=int, default=4,
help="number of odd harmonics to compute")
help="number of odd harmonics to compute (default 4)")
group.add_argument("-N", "--nshift", action="store", type=int, default=1,
help="number of shifted FFTs per period (default 1)")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-r", "--rotate", action="store", type=float,
help="rotate FFT output by this many degrees")
help="rotate FFT output by this many degrees (default 0)")
exc.add_argument("-R", "--rotate-rad", action="store", type=float,
help="rotate FFT output by this many radians")
help="rotate FFT output by this many radians (default 0)")
group.add_argument("srcpath", action="store",
help="Path of raw input, e.g. /foo/raw")
@@ -44,6 +48,10 @@ def main(argv = None):
print " nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec)
raise SystemExit(1)
if f.dest.layout_count != args.nharm * 2:
print "error: need", args.nharm*2, "columns in destination stream"
raise SystemExit(1)
# Check arguments
if args.column is None or args.column < 1:
parser.error("need a column number >= 1")
@@ -51,6 +59,9 @@ def main(argv = None):
if args.nharm < 1 or args.nharm > 32:
parser.error("number of odd harmonics must be 1-32")
if args.nshift < 1:
parser.error("number of shifted FFTs must be >= 1")
if args.rotate is not None:
rotation = args.rotate * 2.0 * pi / 360.0
else:
@@ -68,58 +79,100 @@ def main(argv = None):
# Check and set metadata in prep stream
f.check_dest_metadata({ "prep_raw_source": f.src.path,
"prep_sinefit_source": sinefit.path,
"prep_column": args.column })
"prep_column": args.column,
"prep_rotation": repr(rotation) })
# Run the processing function on all data
f.process_numpy(process, args = (client_sinefit, sinefit.path, args.column,
args.nharm, rotation))
args.nharm, rotation, args.nshift))
def process(data, interval, args, insert_function, final):
(client, sinefit_path, column, nharm, rotation) = args
(client, sinefit_path, column, nharm, rotation, nshift) = args
rows = data.shape[0]
data_timestamps = data[:,0]
if rows < 2:
return 0
last_inserted = [nilmdb.utils.time.min_timestamp]
def insert_if_nonoverlapping(data):
"""Call insert_function to insert data, but only if this
data doesn't overlap with other data that we inserted."""
if data[0][0] <= last_inserted[0]:
return
last_inserted[0] = data[-1][0]
insert_function(data)
processed = 0
out = zeros((1, nharm * 2 + 1))
# Pull out sinefit data for the entire time range of this block
for sinefit_line in client.stream_extract(sinefit_path,
data[0, 0], data[rows-1, 0]):
# Extract sinefit data to get zero crossing timestamps
def prep_period(t_min, t_max, rot):
"""
Compute prep coefficients from time t_min to t_max, which
are the timestamps of the start and end of one period.
Results are rotated by an additional extra_rot before
being inserted into the database. Returns the maximum
index processed, or None if the period couldn't be
processed.
"""
# Find the indices of data that correspond to (t_min, t_max)
idx_min = bisect.bisect_left(data_timestamps, t_min)
idx_max = bisect.bisect_left(data_timestamps, t_max)
if idx_min >= idx_max or idx_max >= len(data_timestamps):
return None
# Perform FFT over those indices
N = idx_max - idx_min
d = data[idx_min:idx_max, column]
F = scipy.fftpack.fft(d) * 2.0 / N
# If we wanted more harmonics than the FFT gave us, pad with zeros
if N < (nharm * 2):
F = r_[F, zeros(nharm * 2 - N)]
# Fill output data.
out[0, 0] = round(t_min)
for k in range(nharm):
Fk = F[2 * k + 1] * e**(rot * 1j * (k+1))
out[0, 2 * k + 1] = -imag(Fk) # Pk
out[0, 2 * k + 2] = real(Fk) # Qk
insert_if_nonoverlapping(out)
return idx_max
# Extract sinefit data to get zero crossing timestamps.
# t_min = beginning of period
# t_max = end of period
(t_min, f0, A, C) = [ float(x) for x in sinefit_line.split() ]
t_max = t_min + 1e6 / f0
# Find the indices of data that correspond to (t_min, t_max)
idx_min = bisect.bisect_left(data_timestamps, t_min)
idx_max = bisect.bisect_left(data_timestamps, t_max)
if idx_min >= idx_max:
# something's wonky; ignore this period
continue
if idx_max >= len(data_timestamps):
# max is likely past the end of our chunk, so stop
# processing this chunk now.
break
# Compute prep over shifted windows of the period
# (nshift is typically 1)
for n in range(nshift):
# Compute timestamps and rotations for shifted window
time_shift = n * (t_max - t_min) / nshift
shifted_min = t_min + time_shift
shifted_max = t_max + time_shift
angle_shift = n * 2 * pi / nshift
shifted_rot = rotation - angle_shift
# Perform FFT over those indices
N = idx_max - idx_min
d = data[idx_min:idx_max, column]
F = scipy.fftpack.fft(d) / N
# Run prep computation
idx_max = prep_period(shifted_min, shifted_max, shifted_rot)
if not idx_max:
break
processed = idx_max
# If we wanted more harmonics than we have, pad with zeros
if N < (nharm * 2):
F = r_[F, zeros(nharm * 2 - N)]
# Fill output data
out[0, 0] = t_min
for k in range(nharm):
Fk = F[2 * k + 1] * e**(rotation * 1j * (k+1))
out[0, 2 * k + 1] = -imag(Fk) # Pk
out[0, 2 * k + 2] = real(Fk) # Qk
# Insert it and continue
insert_function(out)
processed = idx_max
print "Processed", processed, "of", rows, "rows"
# If we processed no data but there's lots in here, pretend we
# processed half of it.
if processed == 0 and rows > 10000:
processed = rows / 2
printf("%s: warning: no periods found; skipping %d rows\n",
timestamp_to_human(data[0][0]), processed)
else:
printf("%s: processed %d of %d rows\n",
timestamp_to_human(data[0][0]), processed, rows)
return processed
if __name__ == "__main__":

View File

@@ -1,13 +1,18 @@
#!/usr/bin/python
# Sine wave fitting. This runs about 5x faster than realtime on raw data.
# Sine wave fitting.
from nilmdb.utils.printf import *
import nilmtools.filter
import nilmdb.client
from nilmdb.utils.time import (timestamp_to_human,
timestamp_to_seconds,
seconds_to_timestamp)
from numpy import *
from scipy import *
#import pylab as p
import operator
import sys
def main(argv = None):
f = nilmtools.filter.Filter()
@@ -18,6 +23,15 @@ def main(argv = None):
group.add_argument('-f', '--frequency', action='store', type=float,
default=60.0,
help='Approximate frequency (default: %(default)s)')
group.add_argument('-m', '--min-freq', action='store', type=float,
help='Minimum valid frequency '
'(default: approximate frequency / 2))')
group.add_argument('-M', '--max-freq', action='store', type=float,
help='Maximum valid frequency '
'(default: approximate frequency * 2))')
group.add_argument('-a', '--min-amp', action='store', type=float,
default=20.0,
help='Minimum signal amplitude (default: %(default)s)')
# Parse arguments
try:
@@ -34,17 +48,56 @@ def main(argv = None):
parser.error("need a column number >= 1")
if args.frequency < 0.1:
parser.error("frequency must be >= 0.1")
if args.min_freq is None:
args.min_freq = args.frequency / 2
if args.max_freq is None:
args.max_freq = args.frequency * 2
if (args.min_freq > args.max_freq or
args.min_freq > args.frequency or
args.max_freq < args.frequency):
parser.error("invalid min or max frequency")
if args.min_amp < 0:
parser.error("min amplitude must be >= 0")
f.check_dest_metadata({ "sinefit_source": f.src.path,
"sinefit_column": args.column })
f.process_numpy(process, args = (args.column, args.frequency))
f.process_numpy(process, args = (args.column, args.frequency, args.min_amp,
args.min_freq, args.max_freq))
class SuppressibleWarning(object):
def __init__(self, maxcount = 10, maxsuppress = 100):
self.maxcount = maxcount
self.maxsuppress = maxsuppress
self.count = 0
self.last_msg = ""
def _write(self, sec, msg):
if sec:
now = timestamp_to_human(seconds_to_timestamp(sec)) + ": "
else:
now = ""
sys.stderr.write(now + msg)
def warn(self, msg, seconds = None):
self.count += 1
if self.count <= self.maxcount:
self._write(seconds, msg)
if (self.count - self.maxcount) >= self.maxsuppress:
self.reset(seconds)
def reset(self, seconds = None):
if self.count > self.maxcount:
self._write(seconds, sprintf("(%d warnings suppressed)\n",
self.count - self.maxcount))
self.count = 0
def process(data, interval, args, insert_function, final):
(column, f_expected) = args
(column, f_expected, a_min, f_min, f_max) = args
rows = data.shape[0]
# Estimate sampling frequency from timestamps
fs = 1e6 * (rows-1) / (data[-1][0] - data[0][0])
fs = (rows-1) / (timestamp_to_seconds(data[-1][0]) -
timestamp_to_seconds(data[0][0]))
# Pull out about 3.5 periods of data at once;
# we'll expect to match 3 zero crossings in each window
@@ -54,30 +107,41 @@ def process(data, interval, args, insert_function, final):
if rows < N:
return 0
warn = SuppressibleWarning(3, 1000)
# Process overlapping windows
start = 0
num_zc = 0
last_inserted_timestamp = None
while start < (rows - N):
this = data[start:start+N, column]
t_min = data[start, 0]/1e6
t_max = data[start+N-1, 0]/1e6
t_min = timestamp_to_seconds(data[start, 0])
t_max = timestamp_to_seconds(data[start+N-1, 0])
# Do 4-parameter sine wave fit
(A, f0, phi, C) = sfit4(this, fs)
# Check bounds. If frequency is too crazy, ignore this window
if f0 < (f_expected/2) or f0 > (f_expected*2):
print "frequency", f0, "too far from expected value", f_expected
if f0 < f_min or f0 > f_max:
warn.warn(sprintf("frequency %s outside valid range %s - %s\n",
str(f0), str(f_min), str(f_max)), t_min)
start += N
continue
# If amplitude is too low, results are probably just noise
if A < a_min:
warn.warn(sprintf("amplitude %s below minimum threshold %s\n",
str(A), str(a_min)), t_min)
start += N
continue
#p.plot(arange(N), this)
#p.plot(arange(N), A * cos(f0/fs * 2 * pi * arange(N) + phi) + C, 'g')
#p.plot(arange(N), A * sin(f0/fs * 2 * pi * arange(N) + phi) + C, 'g')
# Period starts when the argument of cosine is 3*pi/2 degrees,
# Period starts when the argument of sine is 0 degrees,
# so we're looking for sample number:
# n = (3 * pi / 2 - phi) / (f0/fs * 2 * pi)
zc_n = (3 * pi / 2 - phi) / (f0 / fs * 2 * pi)
# n = (0 - phi) / (f0/fs * 2 * pi)
zc_n = (0 - phi) / (f0 / fs * 2 * pi)
period_n = fs/f0
# Add periods to make N positive
@@ -90,7 +154,13 @@ def process(data, interval, args, insert_function, final):
while zc_n < (N - period_n/2):
#p.plot(zc_n, C, 'ro')
t = t_min + zc_n / fs
insert_function([[t * 1e6, f0, A, C]])
if (last_inserted_timestamp is None or
t > last_inserted_timestamp):
insert_function([[seconds_to_timestamp(t), f0, A, C]])
last_inserted_timestamp = t
warn.reset(t)
else:
warn.warn("timestamp overlap\n", t)
num_zc += 1
last_zc = zc_n
zc_n += period_n
@@ -108,7 +178,13 @@ def process(data, interval, args, insert_function, final):
start = int(round(start + advance))
# Return the number of rows we've processed
print "Marked", num_zc, "zero-crossings in", start, "rows"
warn.reset(last_inserted_timestamp)
if last_inserted_timestamp:
now = timestamp_to_human(seconds_to_timestamp(
last_inserted_timestamp)) + ": "
else:
now = ""
printf("%sMarked %d zero-crossings in %d rows\n", now, num_zc, start)
return start
def sfit4(data, fs):
@@ -123,15 +199,15 @@ def sfit4(data, fs):
Output:
Parameters [A, f0, phi, C] to fit the equation
x[n] = A * cos(f0/fs * 2 * pi * n + phi) + C
x[n] = A * sin(f0/fs * 2 * pi * n + phi) + C
where n is sample number. Or, as a function of time:
x(t) = A * cos(f0 * 2 * pi * t + phi) + C
x(t) = A * sin(f0 * 2 * pi * t + phi) + C
by Jim Paris
(Verified to match sfit4.m)
"""
N = len(data)
t = linspace(0, (N-1) / fs, N)
t = linspace(0, (N-1) / float(fs), N)
## Estimate frequency using FFT (step b)
Fc = fft(data)
@@ -156,32 +232,31 @@ def sfit4(data, fs):
i = arccos((Z2*cos(ni2) - Z1*cos(ni1)) / (Z2-Z1)) / n
# Convert to Hz
f0 = i * fs / N
f0 = i * float(fs) / N
## Fit it
# first guess for A0, B0 using 3-parameter fit (step c)
w = 2*pi*f0
D = c_[cos(w*t), sin(w*t), ones(N)]
s = linalg.lstsq(D, data)[0]
# Now iterate 6 times (step i)
for idx in range(6):
D = c_[cos(w*t), sin(w*t), ones(N),
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
s = linalg.lstsq(D, data)[0] # eqn B.18
w = w + s[3] # update frequency estimate
## Extract results
A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
f0 = w / (2*pi)
# Fit it. We'll catch exceptions here and just returns zeros
# if something fails with the least squares fit, etc.
try:
phi = -arctan2(s[1], s[0]) # eqn B.22
except TypeError:
# first guess for A0, B0 using 3-parameter fit (step c)
s = zeros(3)
w = 2*pi*f0
# Now iterate 7 times (step b, plus 6 iterations of step i)
for idx in range(7):
D = c_[cos(w*t), sin(w*t), ones(N),
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
s = linalg.lstsq(D, data)[0] # eqn B.18
w = w + s[3] # update frequency estimate
## Extract results
A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
f0 = w / (2*pi)
phi = arctan2(s[0], s[1]) # eqn B.22 (flipped for sin instead of cos)
C = s[2]
return (A, f0, phi, C)
except Exception as e:
# something broke down, just return zeros
return (0, 0, 0, 0)
C = s[2]
return (A, f0, phi, C)
if __name__ == "__main__":
main()