Compare commits

...

10 Commits

7 changed files with 62 additions and 124 deletions

View File

@@ -9,6 +9,9 @@ else
endif
test:
src/decimate.py
test_insert:
@make install >/dev/null
src/insert.py --file --dry-run /test/foo </dev/null

View File

@@ -8,7 +8,7 @@ Prerequisites:
sudo apt-get install python2.7 python2.7-dev python-setuptools
sudo apt-get install python-numpy python-scipy python-matplotlib
nilmdb (1.3.1+)
nilmdb (1.5.0+)
Install:

View File

@@ -61,7 +61,7 @@ setup(name='nilmtools',
long_description = "NILM Database Tools",
license = "Proprietary",
author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.4.6',
install_requires = [ 'nilmdb >= 1.5.0',
'numpy',
'scipy',
'matplotlib',

View File

@@ -5,6 +5,7 @@
import nilmtools.filter
import nilmdb.client
from nilmdb.client.numpyclient import NumpyClient
import numpy as np
import sys
@@ -27,14 +28,14 @@ def main(argv = None):
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata(meta)
# Copy all rows of data as ASCII strings
extractor = nilmdb.client.Client(f.src.url).stream_extract
inserter = nilmdb.client.Client(f.dest.url).stream_insert_context
# Copy all rows of data using the faster Numpy interfaces
extractor = NumpyClient(f.src.url).stream_extract_numpy
inserter = NumpyClient(f.dest.url).stream_insert_numpy_context
for i in f.intervals():
print "Processing", f.interval_string(i)
with inserter(f.dest.path, i.start, i.end) as insert_ctx:
for row in extractor(f.src.path, i.start, i.end):
insert_ctx.insert(row + "\n")
for data in extractor(f.src.path, i.start, i.end):
insert_ctx.insert(data)
if __name__ == "__main__":
main()

View File

@@ -41,41 +41,45 @@ def main(argv = None):
# If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath):
n = f.src.layout_count // 3
f.process_python(function = decimate_again, rows = args.factor,
args = (n,))
again = True
else:
n = f.src.layout_count
f.process_python(function = decimate_first, rows = args.factor,
args = (n,))
again = False
f.process_numpy(decimate, args = (args.factor, again))
def decimate_first(data, n):
"""Decimate original data -- result has 3 times as many columns"""
# For this simple calculation, converting to a Numpy array
# and doing the math is slower than just doing it directly.
rows = iter(data)
r_sum = r_min = r_max = rows.next()
for row in rows:
r_sum = map(operator.add, r_sum, row)
r_min = map(min, r_min, row)
r_max = map(max, r_max, row)
r_mean = [ x / len(data) for x in r_sum ]
return [ [ r_mean[0] ] + r_mean[1:] + r_min[1:] + r_max[1:] ]
def decimate(data, interval, args, insert_function, final):
"""Decimate data"""
(factor, again) = args
(n, m) = data.shape
def decimate_again(data, n):
"""Decimate already-decimated data -- result has the same number
of columns"""
rows = iter(data)
r = rows.next()
r_sum = r[0:(n+1)]
r_min = r[(n+1):(2*n+1)]
r_max = r[(2*n+1):(3*n+1)]
for r in rows:
r_sum = map(operator.add, r_sum, r[0:(n+1)])
r_min = map(min, r_min, r[(n+1):(2*n+1)])
r_max = map(max, r_max, r[(2*n+1):(3*n+1)])
r_mean = [ x / len(data) for x in r_sum ]
return [ r_mean + r_min + r_max ]
# Figure out which columns to use as the source for mean, min, and max,
# depending on whether this is the first decimation or we're decimating
# again. Note that we include the timestamp in the means.
if again:
c = (m - 1) // 3
# e.g. c = 3
# ts mean1 mean2 mean3 min1 min2 min3 max1 max2 max3
mean_col = slice(0, c + 1)
min_col = slice(c + 1, 2 * c + 1)
max_col = slice(2 * c + 1, 3 * c + 1)
else:
mean_col = slice(0, m)
min_col = slice(1, m)
max_col = slice(1, m)
# Discard extra rows that aren't a multiple of factor
n = n // factor * factor
data = data[:n,:]
# Reshape it into 3D so we can process 'factor' rows at a time
data = data.reshape(n // factor, factor, m)
# Fill the result
out = np.c_[ np.mean(data[:,:,mean_col], axis=1),
np.min(data[:,:,min_col], axis=1),
np.max(data[:,:,max_col], axis=1) ]
insert_function(out)
return n
if __name__ == "__main__":
main()

View File

@@ -4,6 +4,7 @@ from __future__ import absolute_import
import nilmdb.client
from nilmdb.client import Client
from nilmdb.client.numpyclient import NumpyClient
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds)
@@ -247,72 +248,7 @@ class Filter(object):
# All good -- write the metadata in case it's not already there
self._client_dest.stream_update_metadata(self.dest.path, data)
# Main processing helper
def process_python(self, function, rows, args = None, partial = False):
"""Process data in chunks of 'rows' data at a time.
This provides data as nested Python lists and expects the same
back.
function: function to process the data
rows: maximum number of rows to pass to 'function' at once
args: tuple containing extra arguments to pass to 'function'
partial: if true, less than 'rows' may be passed to 'function'.
if false, partial data at the end of an interval will
be dropped.
'function' should be defined like:
function(data, *args)
It will be passed a list containing up to 'rows' rows of
data from the source stream, and any arguments passed in
'args'. It should transform the data as desired, and return a
new list of rdata, which will be inserted into the destination
stream.
"""
if args is None:
args = []
extractor = Client(self.src.url).stream_extract
inserter = Client(self.dest.url).stream_insert_context
# Parse input data. We use homogenous types for now, which
# means the timestamp type will be either float or int.
if "int" in self.src.layout_type:
parser = lambda line: [ int(x) for x in line.split() ]
else:
parser = lambda line: [ float(x) for x in line.split() ]
# Format output data.
formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"
for interval in self.intervals():
print "Processing", self.interval_string(interval)
with inserter(self.dest.path,
interval.start, interval.end) as insert_ctx:
src_array = []
for line in extractor(self.src.path,
interval.start, interval.end):
# Read in data
src_array.append([ float(x) for x in line.split() ])
if len(src_array) == rows:
# Pass through filter function
dest_array = function(src_array, *args)
# Write result to destination
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("".join(out))
# Clear source array
src_array = []
# Take care of partial chunk
if len(src_array) and partial:
dest_array = function(src_array, *args)
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("".join(out))
# Like process_python, but provides Numpy arrays and allows for
# partial processing.
# The main filter processing method.
def process_numpy(self, function, args = None, rows = 100000):
"""For all intervals that exist in self.src but don't exist in
self.dest, call 'function' with a Numpy array corresponding to
@@ -342,8 +278,8 @@ class Filter(object):
"""
if args is None:
args = []
extractor = Client(self.src.url).stream_extract
inserter = Client(self.dest.url).stream_insert_context
extractor = NumpyClient(self.src.url).stream_extract_numpy
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
# Format output data.
formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"
@@ -357,19 +293,12 @@ class Filter(object):
print "Processing", self.interval_string(interval)
with inserter(self.dest.path,
interval.start, interval.end) as insert_ctx:
def insert_function(array):
s = cStringIO.StringIO()
if len(np.shape(array)) != 2:
raise Exception("array must be 2-dimensional")
np.savetxt(s, array)
insert_ctx.insert(s.getvalue())
extract = extractor(self.src.path, interval.start, interval.end)
insert_function = insert_ctx.insert
old_array = np.array([])
for batched in batch(extract, rows):
# Read in this batch of data
new_array = np.loadtxt(batched)
for new_array in extractor(self.src.path,
interval.start, interval.end,
layout = self.src.layout,
maxrows = rows):
# If we still had old data left, combine it
if old_array.shape[0] != 0:
array = np.vstack((old_array, new_array))

View File

@@ -132,9 +132,10 @@ def main(argv = None):
data_ts_base = 0
data_ts_inc = 0
data_ts_rate = args.rate
data_ts_delta = 0
def get_data_ts():
if args.delta:
return data_ts_base
return data_ts_base + data_ts_delta
else:
return data_ts_base + rate_to_period(data_ts_rate,
data_ts_inc)
@@ -207,12 +208,12 @@ def main(argv = None):
pass
continue
# If --delta mode, increment data_ts_base by the delta
# from the file.
# If --delta mode, increment data_ts_delta by the
# delta from the file.
if args.delta:
try:
(delta, line) = line.split(None, 1)
data_ts_base += float(delta)
data_ts_delta += float(delta)
except ValueError:
raise ParseError(filename, "can't parse delta")
@@ -247,7 +248,7 @@ def main(argv = None):
timestamp_to_human(clock_ts))
stream.finalize()
data_ts_base = data_ts = clock_ts
data_ts_inc = 0
data_ts_inc = data_ts_delta = 0
# Don't use this clock time anymore until we update it
clock_ts = None