If the max scale in the exemplar was a column we weren't using, it would bail out when looking for that correlation later. Change things around so exemplars in RAM only keep around the columns we care about.
305 lines
11 KiB
Python
Executable File
305 lines
11 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
from nilmdb.utils.printf import *
|
|
import nilmdb.client
|
|
import nilmtools.filter
|
|
from nilmdb.utils.time import (timestamp_to_human,
|
|
timestamp_to_seconds,
|
|
seconds_to_timestamp)
|
|
from nilmdb.utils import datetime_tz
|
|
from nilmdb.utils.interval import Interval
|
|
|
|
import numpy as np
|
|
import scipy
|
|
import scipy.signal
|
|
from numpy.core.umath_tests import inner1d
|
|
import nilmrun
|
|
from collections import OrderedDict
|
|
import sys
|
|
import time
|
|
import functools
|
|
import collections
|
|
|
|
class DataError(ValueError):
|
|
pass
|
|
|
|
def build_column_mapping(colinfo, streaminfo):
|
|
"""Given the 'columns' list from the JSON data, verify and
|
|
pull out a dictionary mapping for the column names/numbers."""
|
|
columns = OrderedDict()
|
|
for c in colinfo:
|
|
col_num = c['index'] + 1 # skip timestamp
|
|
if (c['name'] in columns.keys() or col_num in columns.values()):
|
|
raise DataError("duplicated columns")
|
|
if (c['index'] < 0 or c['index'] >= streaminfo.layout_count):
|
|
raise DataError("bad column number")
|
|
columns[c['name']] = col_num
|
|
if not len(columns):
|
|
raise DataError("no columns")
|
|
return columns
|
|
|
|
class Exemplar(object):
|
|
def __init__(self, exinfo, min_rows = 10, max_rows = 100000):
|
|
"""Given a dictionary entry from the 'exemplars' input JSON,
|
|
verify the stream, columns, etc. Then, fetch all the data
|
|
into self.data."""
|
|
|
|
self.name = exinfo['name']
|
|
self.url = exinfo['url']
|
|
self.stream = exinfo['stream']
|
|
self.start = exinfo['start']
|
|
self.end = exinfo['end']
|
|
self.dest_column = exinfo['dest_column']
|
|
|
|
# Get stream info
|
|
self.client = nilmdb.client.numpyclient.NumpyClient(self.url)
|
|
self.info = nilmtools.filter.get_stream_info(self.client, self.stream)
|
|
if not self.info:
|
|
raise DataError(sprintf("exemplar stream '%s' does not exist " +
|
|
"on server '%s'", self.stream, self.url))
|
|
|
|
# Build up name => index mapping for the columns
|
|
self.columns = build_column_mapping(exinfo['columns'], self.info)
|
|
|
|
# Count points
|
|
self.count = self.client.stream_count(self.stream, self.start, self.end)
|
|
|
|
# Verify count
|
|
if self.count == 0:
|
|
raise DataError("No data in this exemplar!")
|
|
if self.count < min_rows:
|
|
raise DataError("Too few data points: " + str(self.count))
|
|
if self.count > max_rows:
|
|
raise DataError("Too many data points: " + str(self.count))
|
|
|
|
# Extract the data
|
|
datagen = self.client.stream_extract_numpy(self.stream,
|
|
self.start, self.end,
|
|
self.info.layout,
|
|
maxrows = self.count)
|
|
self.data = list(datagen)[0]
|
|
|
|
# Extract just the columns that were specified in self.columns,
|
|
# skipping the timestamp.
|
|
extract_columns = [ value for (key, value) in self.columns.items() ]
|
|
self.data = self.data[:,extract_columns]
|
|
|
|
# Fix the column indices in e.columns, since we removed/reordered
|
|
# columns in self.data
|
|
for n, k in enumerate(self.columns):
|
|
self.columns[k] = n
|
|
|
|
# Subtract the means from each column
|
|
self.data = self.data - self.data.mean(axis=0)
|
|
|
|
# Get scale factors for each column by computing dot product
|
|
# of each column with itself.
|
|
self.scale = inner1d(self.data.T, self.data.T)
|
|
|
|
# Ensure a minimum (nonzero) scale and convert to list
|
|
self.scale = np.maximum(self.scale, [1e-9]).tolist()
|
|
|
|
def __str__(self):
|
|
return sprintf("\"%s\" %s [%s] %s rows",
|
|
self.name, self.stream, ",".join(self.columns.keys()),
|
|
self.count)
|
|
|
|
def peak_detect(data, delta):
|
|
"""Simple min/max peak detection algorithm, taken from my code
|
|
in the disagg.m from the 10-8-5 paper"""
|
|
mins = [];
|
|
maxs = [];
|
|
cur_min = (None, np.inf)
|
|
cur_max = (None, -np.inf)
|
|
lookformax = False
|
|
for (n, p) in enumerate(data):
|
|
if p > cur_max[1]:
|
|
cur_max = (n, p)
|
|
if p < cur_min[1]:
|
|
cur_min = (n, p)
|
|
if lookformax:
|
|
if p < (cur_max[1] - delta):
|
|
maxs.append(cur_max)
|
|
cur_min = (n, p)
|
|
lookformax = False
|
|
else:
|
|
if p > (cur_min[1] + delta):
|
|
mins.append(cur_min)
|
|
cur_max = (n, p)
|
|
lookformax = True
|
|
return (mins, maxs)
|
|
|
|
def timestamp_to_short_human(timestamp):
|
|
dt = datetime_tz.datetime_tz.fromtimestamp(timestamp_to_seconds(timestamp))
|
|
return dt.strftime("%H:%M:%S")
|
|
|
|
def trainola_matcher(data, interval, args, insert_func, final_chunk):
|
|
"""Perform cross-correlation match"""
|
|
( src_columns, dest_count, exemplars ) = args
|
|
nrows = data.shape[0]
|
|
|
|
# We want at least 10% more points than the widest exemplar.
|
|
widest = max([ x.count for x in exemplars ])
|
|
if (widest * 1.1) > nrows:
|
|
return 0
|
|
|
|
# This is how many points we'll consider valid in the
|
|
# cross-correlation.
|
|
valid = nrows + 1 - widest
|
|
matches = collections.defaultdict(list)
|
|
|
|
# Try matching against each of the exemplars
|
|
for e in exemplars:
|
|
corrs = []
|
|
|
|
# Compute cross-correlation for each column
|
|
for col_name in e.columns:
|
|
a = data[:, src_columns[col_name]]
|
|
b = e.data[:, e.columns[col_name]]
|
|
corr = scipy.signal.fftconvolve(a, np.flipud(b), 'valid')[0:valid]
|
|
|
|
# Scale by the norm of the exemplar
|
|
corr = corr / e.scale[e.columns[col_name]]
|
|
corrs.append(corr)
|
|
|
|
# Find the peaks using the column with the largest amplitude
|
|
biggest = e.scale.index(max(e.scale))
|
|
peaks_minmax = peak_detect(corrs[biggest], 0.1)
|
|
peaks = [ p[0] for p in peaks_minmax[1] ]
|
|
|
|
# Now look at every peak
|
|
for row in peaks:
|
|
# Correlation for each column must be close enough to 1.
|
|
for (corr, scale) in zip(corrs, e.scale):
|
|
# The accepted distance from 1 is based on the relative
|
|
# amplitude of the column. Use a linear mapping:
|
|
# scale 1.0 -> distance 0.1
|
|
# scale 0.0 -> distance 1.0
|
|
distance = 1 - 0.9 * (scale / e.scale[biggest])
|
|
if abs(corr[row] - 1) > distance:
|
|
# No match
|
|
break
|
|
else:
|
|
# Successful match
|
|
matches[row].append(e)
|
|
|
|
# Insert matches into destination stream.
|
|
matched_rows = sorted(matches.keys())
|
|
out = np.zeros((len(matched_rows), dest_count + 1))
|
|
|
|
for n, row in enumerate(matched_rows):
|
|
# Fill timestamp
|
|
out[n][0] = data[row, 0]
|
|
|
|
# Mark matched exemplars
|
|
for exemplar in matches[row]:
|
|
out[n, exemplar.dest_column + 1] = 1.0
|
|
|
|
# Insert it
|
|
insert_func(out)
|
|
|
|
# Return how many rows we processed
|
|
valid = max(valid, 0)
|
|
printf(" [%s] matched %d exemplars in %d rows\n",
|
|
timestamp_to_short_human(data[0][0]), np.sum(out[:,1:]), valid)
|
|
return valid
|
|
|
|
def trainola(conf):
|
|
print "Trainola", nilmtools.__version__
|
|
|
|
# Load main stream data
|
|
url = conf['url']
|
|
src_path = conf['stream']
|
|
dest_path = conf['dest_stream']
|
|
start = conf['start']
|
|
end = conf['end']
|
|
|
|
# Get info for the src and dest streams
|
|
src_client = nilmdb.client.numpyclient.NumpyClient(url)
|
|
src = nilmtools.filter.get_stream_info(src_client, src_path)
|
|
if not src:
|
|
raise DataError("source path '" + src_path + "' does not exist")
|
|
src_columns = build_column_mapping(conf['columns'], src)
|
|
|
|
dest_client = nilmdb.client.numpyclient.NumpyClient(url)
|
|
dest = nilmtools.filter.get_stream_info(dest_client, dest_path)
|
|
if not dest:
|
|
raise DataError("destination path '" + dest_path + "' does not exist")
|
|
|
|
printf("Source:\n")
|
|
printf(" %s [%s]\n", src.path, ",".join(src_columns.keys()))
|
|
printf("Destination:\n")
|
|
printf(" %s (%s columns)\n", dest.path, dest.layout_count)
|
|
|
|
# Pull in the exemplar data
|
|
exemplars = []
|
|
for n, exinfo in enumerate(conf['exemplars']):
|
|
printf("Loading exemplar %d:\n", n)
|
|
e = Exemplar(exinfo)
|
|
col = e.dest_column
|
|
if col < 0 or col >= dest.layout_count:
|
|
raise DataError(sprintf("bad destination column number %d\n" +
|
|
"dest stream only has 0 through %d",
|
|
col, dest.layout_count - 1))
|
|
printf(" %s, output column %d\n", str(e), col)
|
|
exemplars.append(e)
|
|
if len(exemplars) == 0:
|
|
raise DataError("missing exemplars")
|
|
|
|
# Verify that the exemplar columns are all represented in the main data
|
|
for n, ex in enumerate(exemplars):
|
|
for col in ex.columns:
|
|
if col not in src_columns:
|
|
raise DataError(sprintf("Exemplar %d column %s is not "
|
|
"available in source data", n, col))
|
|
|
|
# Figure out which intervals we should process
|
|
intervals = ( Interval(s, e) for (s, e) in
|
|
src_client.stream_intervals(src_path,
|
|
diffpath = dest_path,
|
|
start = start, end = end) )
|
|
intervals = nilmdb.utils.interval.optimize(intervals)
|
|
|
|
# Do the processing
|
|
rows = 100000
|
|
extractor = functools.partial(src_client.stream_extract_numpy,
|
|
src.path, layout = src.layout, maxrows = rows)
|
|
inserter = functools.partial(dest_client.stream_insert_numpy_context,
|
|
dest.path)
|
|
start = time.time()
|
|
processed_time = 0
|
|
printf("Processing intervals:\n")
|
|
for interval in intervals:
|
|
printf("%s\n", interval.human_string())
|
|
nilmtools.filter.process_numpy_interval(
|
|
interval, extractor, inserter, rows * 3,
|
|
trainola_matcher, (src_columns, dest.layout_count, exemplars))
|
|
processed_time += (timestamp_to_seconds(interval.end) -
|
|
timestamp_to_seconds(interval.start))
|
|
elapsed = max(time.time() - start, 1e-3)
|
|
|
|
printf("Done. Processed %.2f seconds per second.\n",
|
|
processed_time / elapsed)
|
|
|
|
def main(argv = None):
|
|
import simplejson as json
|
|
import sys
|
|
|
|
if argv is None:
|
|
argv = sys.argv[1:]
|
|
if len(argv) != 1:
|
|
raise DataError("need one argument, either a dictionary or JSON string")
|
|
|
|
try:
|
|
# Passed in a JSON string (e.g. on the command line)
|
|
conf = json.loads(argv[0])
|
|
except TypeError as e:
|
|
# Passed in the config dictionary (e.g. from NilmRun)
|
|
conf = argv[0]
|
|
|
|
return trainola(conf)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|