Compare commits

...

10 Commits

15 changed files with 199 additions and 75 deletions

View File

@@ -11,18 +11,24 @@ endif
test: test_cleanup
test_cleanup:
src/cleanup.py -e extras/cleanup.cfg
src/cleanup.py extras/cleanup.cfg
nilmtools/cleanup.py -e extras/cleanup.cfg
nilmtools/cleanup.py extras/cleanup.cfg
test_insert:
@make install >/dev/null
src/insert.py --file --dry-run /test/foo </dev/null
nilmtools/insert.py --file --dry-run /test/foo </dev/null
test_copy:
@make install >/dev/null
src/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees*
nilmtools/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees*
test_prep:
/tmp/raw.dat:
octave --eval 'fs = 8000;' \
--eval 't = (0:fs*10)*2*pi*60/fs;' \
--eval 'raw = transpose([sin(t); 0.3*sin(3*t)+sin(t)]);' \
--eval 'save("-ascii","/tmp/raw.dat","raw");'
test_prep: /tmp/raw.dat
@make install >/dev/null
-nilmtool destroy -R /test/raw
-nilmtool destroy -R /test/sinefit
@@ -31,8 +37,8 @@ test_prep:
nilmtool create /test/sinefit float32_3
nilmtool create /test/prep float32_8
nilmtool insert -s '@0' -t -r 8000 /test/raw /tmp/raw.dat
src/sinefit.py -c 1 /test/raw /test/sinefit
src/prep.py -c 2 /test/raw /test/sinefit /test/prep
nilmtools/sinefit.py -a 0.5 -c 1 /test/raw /test/sinefit
nilmtools/prep.py -c 2 /test/raw /test/sinefit /test/prep
nilmtool extract -s min -e max /test/prep | head -20
test_decimate:
@@ -40,8 +46,8 @@ test_decimate:
-@nilmtool destroy /lees-compressor/no-leak/raw/16 || true
-@nilmtool create /lees-compressor/no-leak/raw/4 float32_18 || true
-@nilmtool create /lees-compressor/no-leak/raw/16 float32_18 || true
time python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
time python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
version:
python setup.py version

View File

@@ -5,10 +5,10 @@ by Jim Paris <jim@jtan.com>
Prerequisites:
# Runtime and build environments
sudo apt-get install python2.7 python2.7-dev python-setuptools
sudo apt-get install python-numpy python-scipy python-matplotlib
sudo apt-get install python2.7 python2.7-dev python-setuptools python-pip
sudo apt-get install python-numpy python-scipy
nilmdb (1.5.0+)
nilmdb (1.6.3+)
Install:

View File

View File

@@ -181,7 +181,7 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False)
tag_prefix = "nilmtools-"
parentdir_prefix = "nilmtools-"
versionfile_source = "src/_version.py"
versionfile_source = "nilmtools/_version.py"
def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
variables = { "refnames": git_refnames, "full": git_full }

View File

View File

View File

View File

@@ -236,8 +236,14 @@ class Filter(object):
metadata = self._client_dest.stream_get_metadata(self.dest.path)
if not self.force_metadata:
for key in data:
wanted = str(data[key])
wanted = data[key]
if not isinstance(wanted, basestring):
wanted = str(wanted)
val = metadata.get(key, wanted)
# Force UTF-8 encoding for comparison and display
wanted = wanted.encode('utf-8')
val = val.encode('utf-8')
key = key.encode('utf-8')
if val != wanted and self.dest.rows > 0:
m = "Metadata in destination stream:\n"
m += " %s = %s\n" % (key, val)
@@ -251,47 +257,21 @@ class Filter(object):
# All good -- write the metadata in case it's not already there
self._client_dest.stream_update_metadata(self.dest.path, data)
# The main filter processing method.
def process_numpy(self, function, args = None, rows = 100000):
"""For all intervals that exist in self.src but don't exist in
self.dest, call 'function' with a Numpy array corresponding to
the data. The data is converted to a Numpy array in chunks of
'rows' rows at a time.
# Filter processing for a single interval of data.
def process_numpy_interval(self, interval, extractor, insert_ctx,
function, args = None, rows = 100000):
"""For the given 'interval' of data, extract data, process it
through 'function', and insert the result.
'function' should be defined as:
def function(data, interval, args, insert_func, final)
'extractor' should be a function like NumpyClient.stream_extract_numpy
'insert_ctx' should be a class like StreamInserterNumpy, with member
functions 'insert', 'send', and 'update_end'.
'data': array of data to process -- may be empty
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
If unprocessed data remains after 'final' is True, the interval
being inserted will be ended at the timestamp of the first
unprocessed data point.
See process_numpy for details on 'function', 'args', and 'rows'.
"""
if args is None:
args = []
extractor = NumpyClient(self.src.url).stream_extract_numpy
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
for interval in self.intervals():
print "Processing", self.interval_string(interval)
with inserter(self.dest.path,
interval.start, interval.end) as insert_ctx:
insert_function = insert_ctx.insert
old_array = np.array([])
for new_array in extractor(self.src.path,
@@ -334,6 +314,51 @@ class Filter(object):
# we'll not miss any data when we run again later.
insert_ctx.update_end(old_array[processed][0])
# The main filter processing method.
def process_numpy(self, function, args = None, rows = 100000):
"""Calls process_numpy_interval for each interval that currently
exists in self.src, but doesn't exist in self.dest. It will
process the data in chunks as follows:
For each chunk of data, call 'function' with a Numpy array
corresponding to the data. The data is converted to a Numpy
array in chunks of 'rows' rows at a time.
'function' should be defined as:
# def function(data, interval, args, insert_func, final)
'data': array of data to process -- may be empty
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
If unprocessed data remains after 'final' is True, the interval
being inserted will be ended at the timestamp of the first
unprocessed data point.
"""
extractor = NumpyClient(self.src.url).stream_extract_numpy
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
for interval in self.intervals():
print "Processing", self.interval_string(interval)
with inserter(self.dest.path,
interval.start, interval.end) as insert_ctx:
self.process_numpy_interval(interval, extractor, insert_ctx,
function, args, rows)
def main(argv = None):
# This is just a dummy function; actual filters can use the other
# functions to prepare stuff, and then do something with the data.

View File

43
nilmtools/median.py Executable file
View File

@@ -0,0 +1,43 @@
#!/usr/bin/python
import nilmtools.filter, scipy.signal
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Median Filter")
group = parser.add_argument_group("Median filter options")
group.add_argument("-z", "--size", action="store", type=int, default=25,
help = "median filter size (default %(default)s)")
group.add_argument("-d", "--difference", action="store_true",
help = "store difference rather than filtered values")
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, e.src.layout)
raise SystemExit(1)
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata({ "median_filter_source": f.src.path,
"median_filter_size": args.size,
"median_filter_difference": repr(args.difference) })
f.process_numpy(median_filter, args = (args.size, args.difference))
def median_filter(data, interval, args, insert, final):
(size, diff) = args
(rows, cols) = data.shape
for i in range(cols - 1):
filtered = scipy.signal.medfilt(data[:, i+1], size)
if diff:
data[:, i+1] -= filtered
else:
data[:, i+1] = filtered
insert(data)
return rows
if __name__ == "__main__":
main()

View File

View File

@@ -1,13 +1,18 @@
#!/usr/bin/python
# Sine wave fitting. This runs about 5x faster than realtime on raw data.
# Sine wave fitting.
from nilmdb.utils.printf import *
import nilmtools.filter
import nilmdb.client
from nilmdb.utils.time import (timestamp_to_human,
timestamp_to_seconds,
seconds_to_timestamp)
from numpy import *
from scipy import *
#import pylab as p
import operator
import sys
def main(argv = None):
f = nilmtools.filter.Filter()
@@ -59,12 +64,40 @@ def main(argv = None):
f.process_numpy(process, args = (args.column, args.frequency, args.min_amp,
args.min_freq, args.max_freq))
class SuppressibleWarning(object):
def __init__(self, maxcount = 10, maxsuppress = 100):
self.maxcount = maxcount
self.maxsuppress = maxsuppress
self.count = 0
self.last_msg = ""
def _write(self, sec, msg):
if sec:
now = timestamp_to_human(seconds_to_timestamp(sec)) + ": "
else:
now = ""
sys.stderr.write(now + msg)
def warn(self, msg, seconds = None):
self.count += 1
if self.count <= self.maxcount:
self._write(seconds, msg)
if (self.count - self.maxcount) >= self.maxsuppress:
self.reset(seconds)
def reset(self, seconds = None):
if self.count > self.maxcount:
self._write(seconds, sprintf("(%d warnings suppressed)\n",
self.count - self.maxcount))
self.count = 0
def process(data, interval, args, insert_function, final):
(column, f_expected, a_min, f_min, f_max) = args
rows = data.shape[0]
# Estimate sampling frequency from timestamps
fs = 1e6 * (rows-1) / (data[-1][0] - data[0][0])
fs = (rows-1) / (timestamp_to_seconds(data[-1][0]) -
timestamp_to_seconds(data[0][0]))
# Pull out about 3.5 periods of data at once;
# we'll expect to match 3 zero crossings in each window
@@ -74,26 +107,31 @@ def process(data, interval, args, insert_function, final):
if rows < N:
return 0
warn = SuppressibleWarning(3, 1000)
# Process overlapping windows
start = 0
num_zc = 0
last_inserted_timestamp = None
while start < (rows - N):
this = data[start:start+N, column]
t_min = data[start, 0]/1e6
t_max = data[start+N-1, 0]/1e6
t_min = timestamp_to_seconds(data[start, 0])
t_max = timestamp_to_seconds(data[start+N-1, 0])
# Do 4-parameter sine wave fit
(A, f0, phi, C) = sfit4(this, fs)
# Check bounds. If frequency is too crazy, ignore this window
if f0 < f_min or f0 > f_max:
print "frequency", f0, "outside valid range", f_min, "-", f_max
warn.warn(sprintf("frequency %s outside valid range %s - %s\n",
str(f0), str(f_min), str(f_max)), t_min)
start += N
continue
# If amplitude is too low, results are probably just noise
if A < a_min:
print "amplitude", A, "below minimum threshold", a_min
warn.warn(sprintf("amplitude %s below minimum threshold %s\n",
str(A), str(a_min)), t_min)
start += N
continue
@@ -116,7 +154,13 @@ def process(data, interval, args, insert_function, final):
while zc_n < (N - period_n/2):
#p.plot(zc_n, C, 'ro')
t = t_min + zc_n / fs
insert_function([[t * 1e6, f0, A, C]])
if (last_inserted_timestamp is None or
t > last_inserted_timestamp):
insert_function([[seconds_to_timestamp(t), f0, A, C]])
last_inserted_timestamp = t
warn.reset(t)
else:
warn.warn("timestamp overlap\n", t)
num_zc += 1
last_zc = zc_n
zc_n += period_n
@@ -134,7 +178,13 @@ def process(data, interval, args, insert_function, final):
start = int(round(start + advance))
# Return the number of rows we've processed
print "Marked", num_zc, "zero-crossings in", start, "rows"
warn.reset(last_inserted_timestamp)
if last_inserted_timestamp:
now = timestamp_to_human(seconds_to_timestamp(
last_inserted_timestamp)) + ": "
else:
now = ""
printf("%sMarked %d zero-crossings in %d rows\n", now, num_zc, start)
return start
def sfit4(data, fs):

View File

@@ -30,7 +30,7 @@ except ImportError:
# Versioneer manages version numbers from git tags.
# https://github.com/warner/python-versioneer
import versioneer
versioneer.versionfile_source = 'src/_version.py'
versioneer.versionfile_source = 'nilmtools/_version.py'
versioneer.versionfile_build = 'nilmtools/_version.py'
versioneer.tag_prefix = 'nilmtools-'
versioneer.parentdir_prefix = 'nilmtools-'
@@ -61,14 +61,13 @@ setup(name='nilmtools',
long_description = "NILM Database Tools",
license = "Proprietary",
author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.6.0',
install_requires = [ 'nilmdb >= 1.6.3',
'numpy',
'scipy',
'matplotlib',
#'matplotlib',
],
packages = [ 'nilmtools',
],
package_dir = { 'nilmtools': 'src' },
entry_points = {
'console_scripts': [
'nilm-decimate = nilmtools.decimate:main',
@@ -79,6 +78,7 @@ setup(name='nilmtools',
'nilm-copy-wildcard = nilmtools.copy_wildcard:main',
'nilm-sinefit = nilmtools.sinefit:main',
'nilm-cleanup = nilmtools.cleanup:main',
'nilm-median = nilmtools.median:main',
],
},
zip_safe = False,