Compare commits
5 Commits
nilmtools-
...
nilmtools-
Author | SHA1 | Date | |
---|---|---|---|
5d83d93019 | |||
5f847a0513 | |||
29cd7eb6c7 | |||
62c8af41ea | |||
4f6bc48619 |
24
Makefile
24
Makefile
@@ -11,18 +11,24 @@ endif
|
||||
test: test_cleanup
|
||||
|
||||
test_cleanup:
|
||||
src/cleanup.py -e extras/cleanup.cfg
|
||||
src/cleanup.py extras/cleanup.cfg
|
||||
nilmtools/cleanup.py -e extras/cleanup.cfg
|
||||
nilmtools/cleanup.py extras/cleanup.cfg
|
||||
|
||||
test_insert:
|
||||
@make install >/dev/null
|
||||
src/insert.py --file --dry-run /test/foo </dev/null
|
||||
nilmtools/insert.py --file --dry-run /test/foo </dev/null
|
||||
|
||||
test_copy:
|
||||
@make install >/dev/null
|
||||
src/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees*
|
||||
nilmtools/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees*
|
||||
|
||||
test_prep:
|
||||
/tmp/raw.dat:
|
||||
octave --eval 'fs = 8000;' \
|
||||
--eval 't = (0:fs*10)*2*pi*60/fs;' \
|
||||
--eval 'raw = transpose([sin(t); 0.3*sin(3*t)+sin(t)]);' \
|
||||
--eval 'save("-ascii","/tmp/raw.dat","raw");'
|
||||
|
||||
test_prep: /tmp/raw.dat
|
||||
@make install >/dev/null
|
||||
-nilmtool destroy -R /test/raw
|
||||
-nilmtool destroy -R /test/sinefit
|
||||
@@ -31,8 +37,8 @@ test_prep:
|
||||
nilmtool create /test/sinefit float32_3
|
||||
nilmtool create /test/prep float32_8
|
||||
nilmtool insert -s '@0' -t -r 8000 /test/raw /tmp/raw.dat
|
||||
src/sinefit.py -c 1 /test/raw /test/sinefit
|
||||
src/prep.py -c 2 /test/raw /test/sinefit /test/prep
|
||||
nilmtools/sinefit.py -a 0.5 -c 1 /test/raw /test/sinefit
|
||||
nilmtools/prep.py -c 2 /test/raw /test/sinefit /test/prep
|
||||
nilmtool extract -s min -e max /test/prep | head -20
|
||||
|
||||
test_decimate:
|
||||
@@ -40,8 +46,8 @@ test_decimate:
|
||||
-@nilmtool destroy /lees-compressor/no-leak/raw/16 || true
|
||||
-@nilmtool create /lees-compressor/no-leak/raw/4 float32_18 || true
|
||||
-@nilmtool create /lees-compressor/no-leak/raw/16 float32_18 || true
|
||||
time python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
|
||||
python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
|
||||
time python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
|
||||
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
|
||||
|
||||
version:
|
||||
python setup.py version
|
||||
|
@@ -181,7 +181,7 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False)
|
||||
|
||||
tag_prefix = "nilmtools-"
|
||||
parentdir_prefix = "nilmtools-"
|
||||
versionfile_source = "src/_version.py"
|
||||
versionfile_source = "nilmtools/_version.py"
|
||||
|
||||
def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
|
||||
variables = { "refnames": git_refnames, "full": git_full }
|
@@ -257,15 +257,75 @@ class Filter(object):
|
||||
# All good -- write the metadata in case it's not already there
|
||||
self._client_dest.stream_update_metadata(self.dest.path, data)
|
||||
|
||||
# Filter processing for a single interval of data.
|
||||
def process_numpy_interval(self, interval, extractor, insert_ctx,
|
||||
function, args = None, rows = 100000):
|
||||
"""For the given 'interval' of data, extract data, process it
|
||||
through 'function', and insert the result.
|
||||
|
||||
'extractor' should be a function like NumpyClient.stream_extract_numpy
|
||||
'insert_ctx' should be a class like StreamInserterNumpy, with member
|
||||
functions 'insert', 'send', and 'update_end'.
|
||||
|
||||
See process_numpy for details on 'function', 'args', and 'rows'.
|
||||
"""
|
||||
if args is None:
|
||||
args = []
|
||||
|
||||
insert_function = insert_ctx.insert
|
||||
old_array = np.array([])
|
||||
for new_array in extractor(self.src.path,
|
||||
interval.start, interval.end,
|
||||
layout = self.src.layout,
|
||||
maxrows = rows):
|
||||
# If we still had old data left, combine it
|
||||
if old_array.shape[0] != 0:
|
||||
array = np.vstack((old_array, new_array))
|
||||
else:
|
||||
array = new_array
|
||||
|
||||
# Pass it to the process function
|
||||
processed = function(array, interval, args,
|
||||
insert_function, False)
|
||||
|
||||
# Send any pending data
|
||||
insert_ctx.send()
|
||||
|
||||
# Save the unprocessed parts
|
||||
if processed >= 0:
|
||||
old_array = array[processed:]
|
||||
else:
|
||||
raise Exception(
|
||||
sprintf("%s return value %s must be >= 0",
|
||||
str(function), str(processed)))
|
||||
|
||||
# Warn if there's too much data remaining
|
||||
if old_array.shape[0] > 3 * rows:
|
||||
printf("warning: %d unprocessed rows in buffer\n",
|
||||
old_array.shape[0])
|
||||
|
||||
# Last call for this contiguous interval
|
||||
if old_array.shape[0] != 0:
|
||||
processed = function(old_array, interval, args,
|
||||
insert_function, True)
|
||||
if processed != old_array.shape[0]:
|
||||
# Truncate the interval we're inserting at the first
|
||||
# unprocessed data point. This ensures that
|
||||
# we'll not miss any data when we run again later.
|
||||
insert_ctx.update_end(old_array[processed][0])
|
||||
|
||||
# The main filter processing method.
|
||||
def process_numpy(self, function, args = None, rows = 100000):
|
||||
"""For all intervals that exist in self.src but don't exist in
|
||||
self.dest, call 'function' with a Numpy array corresponding to
|
||||
the data. The data is converted to a Numpy array in chunks of
|
||||
'rows' rows at a time.
|
||||
"""Calls process_numpy_interval for each interval that currently
|
||||
exists in self.src, but doesn't exist in self.dest. It will
|
||||
process the data in chunks as follows:
|
||||
|
||||
For each chunk of data, call 'function' with a Numpy array
|
||||
corresponding to the data. The data is converted to a Numpy
|
||||
array in chunks of 'rows' rows at a time.
|
||||
|
||||
'function' should be defined as:
|
||||
def function(data, interval, args, insert_func, final)
|
||||
# def function(data, interval, args, insert_func, final)
|
||||
|
||||
'data': array of data to process -- may be empty
|
||||
|
||||
@@ -289,8 +349,6 @@ class Filter(object):
|
||||
being inserted will be ended at the timestamp of the first
|
||||
unprocessed data point.
|
||||
"""
|
||||
if args is None:
|
||||
args = []
|
||||
extractor = NumpyClient(self.src.url).stream_extract_numpy
|
||||
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
|
||||
|
||||
@@ -298,47 +356,8 @@ class Filter(object):
|
||||
print "Processing", self.interval_string(interval)
|
||||
with inserter(self.dest.path,
|
||||
interval.start, interval.end) as insert_ctx:
|
||||
insert_function = insert_ctx.insert
|
||||
old_array = np.array([])
|
||||
for new_array in extractor(self.src.path,
|
||||
interval.start, interval.end,
|
||||
layout = self.src.layout,
|
||||
maxrows = rows):
|
||||
# If we still had old data left, combine it
|
||||
if old_array.shape[0] != 0:
|
||||
array = np.vstack((old_array, new_array))
|
||||
else:
|
||||
array = new_array
|
||||
|
||||
# Pass it to the process function
|
||||
processed = function(array, interval, args,
|
||||
insert_function, False)
|
||||
|
||||
# Send any pending data
|
||||
insert_ctx.send()
|
||||
|
||||
# Save the unprocessed parts
|
||||
if processed >= 0:
|
||||
old_array = array[processed:]
|
||||
else:
|
||||
raise Exception(
|
||||
sprintf("%s return value %s must be >= 0",
|
||||
str(function), str(processed)))
|
||||
|
||||
# Warn if there's too much data remaining
|
||||
if old_array.shape[0] > 3 * rows:
|
||||
printf("warning: %d unprocessed rows in buffer\n",
|
||||
old_array.shape[0])
|
||||
|
||||
# Last call for this contiguous interval
|
||||
if old_array.shape[0] != 0:
|
||||
processed = function(old_array, interval, args,
|
||||
insert_function, True)
|
||||
if processed != old_array.shape[0]:
|
||||
# Truncate the interval we're inserting at the first
|
||||
# unprocessed data point. This ensures that
|
||||
# we'll not miss any data when we run again later.
|
||||
insert_ctx.update_end(old_array[processed][0])
|
||||
self.process_numpy_interval(interval, extractor, insert_ctx,
|
||||
function, args, rows)
|
||||
|
||||
def main(argv = None):
|
||||
# This is just a dummy function; actual filters can use the other
|
@@ -1,7 +1,6 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
# Sine wave fitting. This runs about 5x faster than realtime on raw data.
|
||||
|
||||
# Sine wave fitting.
|
||||
from nilmdb.utils.printf import *
|
||||
import nilmtools.filter
|
||||
import nilmdb.client
|
||||
@@ -74,7 +73,7 @@ class SuppressibleWarning(object):
|
||||
|
||||
def _write(self, sec, msg):
|
||||
if sec:
|
||||
now = "[" + timestamp_to_human(seconds_to_timestamp(sec)) + "] "
|
||||
now = timestamp_to_human(seconds_to_timestamp(sec)) + ": "
|
||||
else:
|
||||
now = ""
|
||||
sys.stderr.write(now + msg)
|
||||
@@ -180,7 +179,12 @@ def process(data, interval, args, insert_function, final):
|
||||
|
||||
# Return the number of rows we've processed
|
||||
warn.reset(last_inserted_timestamp)
|
||||
print "Marked", num_zc, "zero-crossings in", start, "rows"
|
||||
if last_inserted_timestamp:
|
||||
now = timestamp_to_human(seconds_to_timestamp(
|
||||
last_inserted_timestamp)) + ": "
|
||||
else:
|
||||
now = ""
|
||||
printf("%sMarked %d zero-crossings in %d rows\n", now, num_zc, start)
|
||||
return start
|
||||
|
||||
def sfit4(data, fs):
|
3
setup.py
3
setup.py
@@ -30,7 +30,7 @@ except ImportError:
|
||||
# Versioneer manages version numbers from git tags.
|
||||
# https://github.com/warner/python-versioneer
|
||||
import versioneer
|
||||
versioneer.versionfile_source = 'src/_version.py'
|
||||
versioneer.versionfile_source = 'nilmtools/_version.py'
|
||||
versioneer.versionfile_build = 'nilmtools/_version.py'
|
||||
versioneer.tag_prefix = 'nilmtools-'
|
||||
versioneer.parentdir_prefix = 'nilmtools-'
|
||||
@@ -68,7 +68,6 @@ setup(name='nilmtools',
|
||||
],
|
||||
packages = [ 'nilmtools',
|
||||
],
|
||||
package_dir = { 'nilmtools': 'src' },
|
||||
entry_points = {
|
||||
'console_scripts': [
|
||||
'nilm-decimate = nilmtools.decimate:main',
|
||||
|
Reference in New Issue
Block a user