Compare commits

...

11 Commits

Author SHA1 Message Date
6dce8c5296 More output 2013-07-11 18:56:53 -04:00
25c35a56f6 Trainola inserts into the destination stream now 2013-07-10 12:59:39 -04:00
d610deaef0 More trainola work 2013-07-10 11:38:32 -04:00
d7d5ccc9a7 More filter cleanup 2013-07-09 19:27:20 -04:00
f28753ff5c Move process_numpy_interval outside the class 2013-07-09 18:40:49 -04:00
c9c2e0d5a8 Improve split between process_numpy and process_numpy_interval 2013-07-09 18:09:05 -04:00
5a2a32bec5 WIP on trainola improvements 2013-07-09 17:56:26 -04:00
706c3933f9 Add trainola from nilmrun 2013-07-09 17:55:57 -04:00
cfd1719152 Use nilmdb.utils.interval.optimize; bump nilmdb min version 2013-07-09 17:53:04 -04:00
c62fb45980 Makefile cleanup; add nilm-trainola binary 2013-07-09 16:53:47 -04:00
57d856f2fa Split filter.py internals up a little more
This makes it easier to use the filter stuff from other code, but it's
also turning it into more of a spaghetti nightmare.  Might not be
worth continuing down this path.
2013-07-09 16:52:00 -04:00
6 changed files with 448 additions and 116 deletions

View File

@@ -8,18 +8,21 @@ else
@echo "Try 'make install'"
endif
test: test_cleanup
test: test_trainola
test_trainola:
-nilmtool -u http://bucket/nilmdb remove -s min -e max \
/sharon/prep-a-matches
nilmtools/trainola.py "$$(cat extras/trainola-test-param.js)"
test_cleanup:
nilmtools/cleanup.py -e extras/cleanup.cfg
nilmtools/cleanup.py extras/cleanup.cfg
test_insert:
@make install >/dev/null
nilmtools/insert.py --file --dry-run /test/foo </dev/null
test_copy:
@make install >/dev/null
nilmtools/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees*
/tmp/raw.dat:
@@ -29,7 +32,6 @@ test_copy:
--eval 'save("-ascii","/tmp/raw.dat","raw");'
test_prep: /tmp/raw.dat
@make install >/dev/null
-nilmtool destroy -R /test/raw
-nilmtool destroy -R /test/sinefit
-nilmtool destroy -R /test/prep
@@ -69,4 +71,4 @@ clean::
gitclean::
git clean -dXf
.PHONY: all version dist sdist install clean gitclean
.PHONY: all version dist sdist install clean gitclean test

View File

@@ -8,7 +8,7 @@ Prerequisites:
sudo apt-get install python2.7 python2.7-dev python-setuptools python-pip
sudo apt-get install python-numpy python-scipy
nilmdb (1.6.3+)
nilmdb (1.8.1+)
Install:

View File

@@ -0,0 +1,31 @@
{ "url": "http://bucket.mit.edu/nilmdb",
"dest_stream": "/sharon/prep-a-matches",
"stream": "/sharon/prep-a",
"start": 1366111383280463,
"end": 1366126163457797,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 },
{ "name": "P3", "index": 2 } ],
"exemplars": [
{ "name": "Boiler Pump ON",
"url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366260494269078,
"end": 1366260608185031,
"dest_column": 0,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
},
{ "name": "Boiler Pump OFF",
"url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366260864215764,
"end": 1366260870882998,
"dest_column": 1,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
}
]
}

View File

@@ -19,6 +19,10 @@ import re
import argparse
import numpy as np
import cStringIO
import functools
class ArgumentError(Exception):
pass
class MissingDestination(Exception):
def __init__(self, args, src, dest):
@@ -65,6 +69,70 @@ def get_stream_info(client, path):
return None
return StreamInfo(client.geturl(), streams[0])
# Filter processing for a single interval of data.
def process_numpy_interval(interval, extractor, inserter, warn_rows,
function, args = None):
"""For the given 'interval' of data, extract data, process it
through 'function', and insert the result.
'extractor' should be a function like NumpyClient.stream_extract_numpy
but with the the interval 'start' and 'end' as the only parameters,
e.g.:
extractor = functools.partial(NumpyClient.stream_extract_numpy,
src_path, layout = l, maxrows = m)
'inserter' should be a function like NumpyClient.stream_insert_context
but with the interval 'start' and 'end' as the only parameters, e.g.:
inserter = functools.partial(NumpyClient.stream_insert_context,
dest_path)
If 'warn_rows' is not None, print a warning to stdout when the
number of unprocessed rows exceeds this amount.
See process_numpy for details on 'function' and 'args'.
"""
if args is None:
args = []
with inserter(interval.start, interval.end) as insert_ctx:
insert_func = insert_ctx.insert
old_array = np.array([])
for new_array in extractor(interval.start, interval.end):
# If we still had old data left, combine it
if old_array.shape[0] != 0:
array = np.vstack((old_array, new_array))
else:
array = new_array
# Pass the data to the user provided function
processed = function(array, interval, args, insert_func, False)
# Send any pending data that the user function inserted
insert_ctx.send()
# Save the unprocessed parts
if processed >= 0:
old_array = array[processed:]
else:
raise Exception(
sprintf("%s return value %s must be >= 0",
str(function), str(processed)))
# Warn if there's too much data remaining
if warn_rows is not None and old_array.shape[0] > warn_rows:
printf("warning: %d unprocessed rows in buffer\n",
old_array.shape[0])
# Last call for this contiguous interval
if old_array.shape[0] != 0:
processed = function(old_array, interval, args,
insert_func, True)
if processed != old_array.shape[0]:
# Truncate the interval we're inserting at the first
# unprocessed data point. This ensures that
# we'll not miss any data when we run again later.
insert_ctx.update_end(old_array[processed][0])
class Filter(object):
def __init__(self, parser_description = None):
@@ -134,63 +202,52 @@ class Filter(object):
self._parser = parser
return parser
def interval_string(self, interval):
return sprintf("[ %s -> %s ]",
timestamp_to_human(interval.start),
timestamp_to_human(interval.end))
def parse_args(self, argv = None):
args = self._parser.parse_args(argv)
if args.dest_url is None:
args.dest_url = args.url
if args.url != args.dest_url:
def set_args(self, url, dest_url, srcpath, destpath, start, end,
parsed_args = None, quiet = True):
"""Set arguments directly from parameters"""
if dest_url is None:
dest_url = url
if url != dest_url:
self.interhost = True
self._client_src = Client(args.url)
self._client_dest = Client(args.dest_url)
self._client_src = Client(url)
self._client_dest = Client(dest_url)
if (not self.interhost) and (args.srcpath == args.destpath):
self._parser.error("source and destination path must be different")
if (not self.interhost) and (srcpath == destpath):
raise ArgumentError("source and destination path must be different")
# Open and print info about the streams
self.src = get_stream_info(self._client_src, args.srcpath)
# Open the streams
self.src = get_stream_info(self._client_src, srcpath)
if not self.src:
self._parser.error("source path " + args.srcpath + " not found")
raise ArgumentError("source path " + srcpath + " not found")
self.dest = get_stream_info(self._client_dest, args.destpath)
self.dest = get_stream_info(self._client_dest, destpath)
if not self.dest:
raise MissingDestination(args, self.src,
StreamInfo(args.dest_url, [args.destpath]))
raise MissingDestination(parsed_args, self.src,
StreamInfo(dest_url, [destpath]))
print "Source:", self.src.string(self.interhost)
print " Dest:", self.dest.string(self.interhost)
self.start = start
self.end = end
if args.dry_run:
for interval in self.intervals():
print self.interval_string(interval)
raise SystemExit(0)
# Print info
if not quiet:
print "Source:", self.src.string(self.interhost)
print " Dest:", self.dest.string(self.interhost)
def parse_args(self, argv = None):
"""Parse arguments from a command line"""
args = self._parser.parse_args(argv)
self.set_args(args.url, args.dest_url, args.srcpath, args.destpath,
args.start, args.end, quiet = False, parsed_args = args)
self.force_metadata = args.force_metadata
self.start = args.start
self.end = args.end
if args.dry_run:
for interval in self.intervals():
print interval.human_string()
raise SystemExit(0)
return args
def _optimize_int(self, it):
"""Join and yield adjacent intervals from the iterator 'it'"""
saved_int = None
for interval in it:
if saved_int is not None:
if saved_int.end == interval.start:
interval.start = saved_int.start
else:
yield saved_int
saved_int = interval
if saved_int is not None:
yield saved_int
def intervals(self):
"""Generate all the intervals that this filter should process"""
self._using_client = True
@@ -217,12 +274,13 @@ class Filter(object):
self.src.path, diffpath = self.dest.path,
start = self.start, end = self.end) )
# Optimize intervals: join intervals that are adjacent
for interval in self._optimize_int(intervals):
for interval in nilmdb.utils.interval.optimize(intervals):
yield interval
self._using_client = False
# Misc helpers
def arg_time(self, toparse):
@staticmethod
def arg_time(toparse):
"""Parse a time string argument"""
try:
return nilmdb.utils.time.parse_time(toparse)
@@ -257,63 +315,6 @@ class Filter(object):
# All good -- write the metadata in case it's not already there
self._client_dest.stream_update_metadata(self.dest.path, data)
# Filter processing for a single interval of data.
def process_numpy_interval(self, interval, extractor, insert_ctx,
function, args = None, rows = 100000):
"""For the given 'interval' of data, extract data, process it
through 'function', and insert the result.
'extractor' should be a function like NumpyClient.stream_extract_numpy
'insert_ctx' should be a class like StreamInserterNumpy, with member
functions 'insert', 'send', and 'update_end'.
See process_numpy for details on 'function', 'args', and 'rows'.
"""
if args is None:
args = []
insert_function = insert_ctx.insert
old_array = np.array([])
for new_array in extractor(self.src.path,
interval.start, interval.end,
layout = self.src.layout,
maxrows = rows):
# If we still had old data left, combine it
if old_array.shape[0] != 0:
array = np.vstack((old_array, new_array))
else:
array = new_array
# Pass it to the process function
processed = function(array, interval, args,
insert_function, False)
# Send any pending data
insert_ctx.send()
# Save the unprocessed parts
if processed >= 0:
old_array = array[processed:]
else:
raise Exception(
sprintf("%s return value %s must be >= 0",
str(function), str(processed)))
# Warn if there's too much data remaining
if old_array.shape[0] > 3 * rows:
printf("warning: %d unprocessed rows in buffer\n",
old_array.shape[0])
# Last call for this contiguous interval
if old_array.shape[0] != 0:
processed = function(old_array, interval, args,
insert_function, True)
if processed != old_array.shape[0]:
# Truncate the interval we're inserting at the first
# unprocessed data point. This ensures that
# we'll not miss any data when we run again later.
insert_ctx.update_end(old_array[processed][0])
# The main filter processing method.
def process_numpy(self, function, args = None, rows = 100000):
"""Calls process_numpy_interval for each interval that currently
@@ -352,12 +353,15 @@ class Filter(object):
extractor = NumpyClient(self.src.url).stream_extract_numpy
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
extractor_func = functools.partial(extractor, self.src.path,
layout = self.src.layout,
maxrows = rows)
inserter_func = functools.partial(inserter, self.dest.path)
for interval in self.intervals():
print "Processing", self.interval_string(interval)
with inserter(self.dest.path,
interval.start, interval.end) as insert_ctx:
self.process_numpy_interval(interval, extractor, insert_ctx,
function, args, rows)
print "Processing", interval.human_string()
process_numpy_interval(interval, extractor_func, inserter_func,
rows * 3, function, args)
def main(argv = None):
# This is just a dummy function; actual filters can use the other
@@ -366,7 +370,7 @@ def main(argv = None):
parser = f.setup_parser()
args = f.parse_args(argv)
for i in f.intervals():
print "Generic filter: need to handle", f.interval_string(i)
print "Generic filter: need to handle", i.human_string()
if __name__ == "__main__":
main()

294
nilmtools/trainola.py Executable file
View File

@@ -0,0 +1,294 @@
#!/usr/bin/python
from nilmdb.utils.printf import *
import nilmdb.client
import nilmtools.filter
from nilmdb.utils.time import (timestamp_to_human,
timestamp_to_seconds,
seconds_to_timestamp)
from nilmdb.utils import datetime_tz
from nilmdb.utils.interval import Interval
import numpy as np
import scipy
import scipy.signal
from numpy.core.umath_tests import inner1d
import nilmrun
from collections import OrderedDict
import sys
import time
import functools
import collections
class DataError(ValueError):
pass
def build_column_mapping(colinfo, streaminfo):
"""Given the 'columns' list from the JSON data, verify and
pull out a dictionary mapping for the column names/numbers."""
columns = OrderedDict()
for c in colinfo:
if (c['name'] in columns.keys() or
c['index'] in columns.values()):
raise DataError("duplicated columns")
if (c['index'] < 0 or c['index'] >= streaminfo.layout_count):
raise DataError("bad column number")
columns[c['name']] = c['index']
if not len(columns):
raise DataError("no columns")
return columns
class Exemplar(object):
def __init__(self, exinfo, min_rows = 10, max_rows = 100000):
"""Given a dictionary entry from the 'exemplars' input JSON,
verify the stream, columns, etc. Then, fetch all the data
into self.data."""
self.name = exinfo['name']
self.url = exinfo['url']
self.stream = exinfo['stream']
self.start = exinfo['start']
self.end = exinfo['end']
self.dest_column = exinfo['dest_column']
# Get stream info
self.client = nilmdb.client.numpyclient.NumpyClient(self.url)
self.info = nilmtools.filter.get_stream_info(self.client, self.stream)
# Build up name => index mapping for the columns
self.columns = build_column_mapping(exinfo['columns'], self.info)
# Count points
self.count = self.client.stream_count(self.stream, self.start, self.end)
# Verify count
if self.count == 0:
raise DataError("No data in this exemplar!")
if self.count < min_rows:
raise DataError("Too few data points: " + str(self.count))
if self.count > max_rows:
raise DataError("Too many data points: " + str(self.count))
# Extract the data
datagen = self.client.stream_extract_numpy(self.stream,
self.start, self.end,
self.info.layout,
maxrows = self.count)
self.data = list(datagen)[0]
# Discard timestamp
self.data = self.data[:,1:]
# Subtract the mean from each column
self.data = self.data - self.data.mean(axis=0)
# Get scale factors for each column by computing dot product
# of each column with itself.
self.scale = inner1d(self.data.T, self.data.T)
# Ensure a minimum (nonzero) scale and convert to list
self.scale = np.maximum(self.scale, [1e-9]).tolist()
def __str__(self):
return sprintf("\"%s\" %s [%s] %s rows",
self.name, self.stream, ",".join(self.columns.keys()),
self.count)
def peak_detect(data, delta):
"""Simple min/max peak detection algorithm, taken from my code
in the disagg.m from the 10-8-5 paper"""
mins = [];
maxs = [];
cur_min = (None, np.inf)
cur_max = (None, -np.inf)
lookformax = False
for (n, p) in enumerate(data):
if p > cur_max[1]:
cur_max = (n, p)
if p < cur_min[1]:
cur_min = (n, p)
if lookformax:
if p < (cur_max[1] - delta):
maxs.append(cur_max)
cur_min = (n, p)
lookformax = False
else:
if p > (cur_min[1] + delta):
mins.append(cur_min)
cur_max = (n, p)
lookformax = True
return (mins, maxs)
def timestamp_to_short_human(timestamp):
dt = datetime_tz.datetime_tz.fromtimestamp(timestamp_to_seconds(timestamp))
return dt.strftime("%H:%M:%S")
def trainola_matcher(data, interval, args, insert_func, final_chunk):
"""Perform cross-correlation match"""
( src_columns, dest_count, exemplars ) = args
nrows = data.shape[0]
# We want at least 10% more points than the widest exemplar.
widest = max([ x.count for x in exemplars ])
if (widest * 1.1) > nrows:
return 0
# This is how many points we'll consider valid in the
# cross-correlation.
valid = nrows + 1 - widest
matches = collections.defaultdict(list)
# Try matching against each of the exemplars
for e in exemplars:
corrs = []
# Compute cross-correlation for each column
for col_name in e.columns:
a = data[:, src_columns[col_name] + 1]
b = e.data[:, e.columns[col_name]]
corr = scipy.signal.fftconvolve(a, np.flipud(b), 'valid')[0:valid]
# Scale by the norm of the exemplar
corr = corr / e.scale[e.columns[col_name]]
corrs.append(corr)
# Find the peaks using the column with the largest amplitude
biggest = e.scale.index(max(e.scale))
peaks_minmax = peak_detect(corrs[biggest], 0.1)
peaks = [ p[0] for p in peaks_minmax[1] ]
# Now look at every peak
for row in peaks:
# Correlation for each column must be close enough to 1.
for (corr, scale) in zip(corrs, e.scale):
# The accepted distance from 1 is based on the relative
# amplitude of the column. Use a linear mapping:
# scale 1.0 -> distance 0.1
# scale 0.0 -> distance 1.0
distance = 1 - 0.9 * (scale / e.scale[biggest])
if abs(corr[row] - 1) > distance:
# No match
break
else:
# Successful match
matches[row].append(e)
# Insert matches into destination stream.
matched_rows = sorted(matches.keys())
out = np.zeros((len(matched_rows), dest_count + 1))
for n, row in enumerate(matched_rows):
# Fill timestamp
out[n][0] = data[row, 0]
# Mark matched exemplars
for exemplar in matches[row]:
out[n, exemplar.dest_column + 1] = 1.0
# Insert it
insert_func(out)
# Return how many rows we processed
valid = max(valid, 0)
printf(" [%s] matched %d exemplars in %d rows\n",
timestamp_to_short_human(data[0][0]), np.sum(out[:,1:]), valid)
return valid
def trainola(conf):
print "Trainola", nilmtools.__version__
# Load main stream data
url = conf['url']
src_path = conf['stream']
dest_path = conf['dest_stream']
start = conf['start']
end = conf['end']
# Get info for the src and dest streams
src_client = nilmdb.client.numpyclient.NumpyClient(url)
src = nilmtools.filter.get_stream_info(src_client, src_path)
if not src:
raise DataError("source path '" + src_path + "' does not exist")
src_columns = build_column_mapping(conf['columns'], src)
dest_client = nilmdb.client.numpyclient.NumpyClient(url)
dest = nilmtools.filter.get_stream_info(dest_client, dest_path)
if not dest:
raise DataError("destination path '" + dest_path + "' does not exist")
printf("Source:\n")
printf(" %s [%s]\n", src.path, ",".join(src_columns.keys()))
printf("Destination:\n")
printf(" %s (%s columns)\n", dest.path, dest.layout_count)
# Pull in the exemplar data
exemplars = []
for n, exinfo in enumerate(conf['exemplars']):
printf("Loading exemplar %d:\n", n)
e = Exemplar(exinfo)
col = e.dest_column
if col < 0 or col >= dest.layout_count:
raise DataError(sprintf("bad destination column number %d\n" +
"dest stream only has 0 through %d",
col, dest.layout_count - 1))
printf(" %s, output column %d\n", str(e), col)
exemplars.append(e)
if len(exemplars) == 0:
raise DataError("missing exemplars")
# Verify that the exemplar columns are all represented in the main data
for n, ex in enumerate(exemplars):
for col in ex.columns:
if col not in src_columns:
raise DataError(sprintf("Exemplar %d column %s is not "
"available in source data", n, col))
# Figure out which intervals we should process
intervals = ( Interval(s, e) for (s, e) in
src_client.stream_intervals(src_path,
diffpath = dest_path,
start = start, end = end) )
intervals = nilmdb.utils.interval.optimize(intervals)
# Do the processing
rows = 100000
extractor = functools.partial(src_client.stream_extract_numpy,
src.path, layout = src.layout, maxrows = rows)
inserter = functools.partial(dest_client.stream_insert_numpy_context,
dest.path)
start = time.time()
processed_time = 0
printf("Processing intervals:\n")
for interval in intervals:
printf("%s\n", interval.human_string())
nilmtools.filter.process_numpy_interval(
interval, extractor, inserter, rows * 3,
trainola_matcher, (src_columns, dest.layout_count, exemplars))
processed_time += (timestamp_to_seconds(interval.end) -
timestamp_to_seconds(interval.start))
elapsed = max(time.time() - start, 1e-3)
printf("Done. Processed %.2f seconds per second.\n",
processed_time / elapsed)
def main(argv = None):
import simplejson as json
import sys
if argv is None:
argv = sys.argv[1:]
if len(argv) != 1:
raise DataError("need one argument, either a dictionary or JSON string")
try:
# Passed in a JSON string (e.g. on the command line)
conf = json.loads(argv[0])
except TypeError as e:
# Passed in the config dictionary (e.g. from NilmRun)
conf = argv[0]
return trainola(conf)
if __name__ == "__main__":
main()

View File

@@ -61,7 +61,7 @@ setup(name='nilmtools',
long_description = "NILM Database Tools",
license = "Proprietary",
author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.6.3',
install_requires = [ 'nilmdb >= 1.8.1',
'numpy',
'scipy',
#'matplotlib',
@@ -79,6 +79,7 @@ setup(name='nilmtools',
'nilm-sinefit = nilmtools.sinefit:main',
'nilm-cleanup = nilmtools.cleanup:main',
'nilm-median = nilmtools.median:main',
'nilm-trainola = nilmtools.trainola:main',
],
},
zip_safe = False,