Compare commits
7 Commits
nilmtools-
...
nilmtools-
Author | SHA1 | Date | |
---|---|---|---|
5b67b68fd2 | |||
97503b73b9 | |||
4e64c804bf | |||
189fb9df3a | |||
3323c997a7 | |||
e09153e34b | |||
5c56e9d075 |
@@ -8,7 +8,7 @@ Prerequisites:
|
|||||||
sudo apt-get install python2.7 python2.7-dev python-setuptools
|
sudo apt-get install python2.7 python2.7-dev python-setuptools
|
||||||
sudo apt-get install python-numpy python-scipy python-matplotlib
|
sudo apt-get install python-numpy python-scipy python-matplotlib
|
||||||
|
|
||||||
nilmdb (1.3.1+)
|
nilmdb (1.5.0+)
|
||||||
|
|
||||||
Install:
|
Install:
|
||||||
|
|
||||||
|
2
setup.py
2
setup.py
@@ -61,7 +61,7 @@ setup(name='nilmtools',
|
|||||||
long_description = "NILM Database Tools",
|
long_description = "NILM Database Tools",
|
||||||
license = "Proprietary",
|
license = "Proprietary",
|
||||||
author_email = 'jim@jtan.com',
|
author_email = 'jim@jtan.com',
|
||||||
install_requires = [ 'nilmdb >= 1.4.6',
|
install_requires = [ 'nilmdb >= 1.5.0',
|
||||||
'numpy',
|
'numpy',
|
||||||
'scipy',
|
'scipy',
|
||||||
'matplotlib',
|
'matplotlib',
|
||||||
|
@@ -5,6 +5,7 @@
|
|||||||
|
|
||||||
import nilmtools.filter
|
import nilmtools.filter
|
||||||
import nilmdb.client
|
import nilmdb.client
|
||||||
|
from nilmdb.client.numpyclient import NumpyClient
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
@@ -27,14 +28,14 @@ def main(argv = None):
|
|||||||
meta = f.client_src.stream_get_metadata(f.src.path)
|
meta = f.client_src.stream_get_metadata(f.src.path)
|
||||||
f.check_dest_metadata(meta)
|
f.check_dest_metadata(meta)
|
||||||
|
|
||||||
# Copy all rows of data as ASCII strings
|
# Copy all rows of data using the faster Numpy interfaces
|
||||||
extractor = nilmdb.client.Client(f.src.url).stream_extract
|
extractor = NumpyClient(f.src.url).stream_extract_numpy
|
||||||
inserter = nilmdb.client.Client(f.dest.url).stream_insert_context
|
inserter = NumpyClient(f.dest.url).stream_insert_numpy_context
|
||||||
for i in f.intervals():
|
for i in f.intervals():
|
||||||
print "Processing", f.interval_string(i)
|
print "Processing", f.interval_string(i)
|
||||||
with inserter(f.dest.path, i.start, i.end) as insert_ctx:
|
with inserter(f.dest.path, i.start, i.end) as insert_ctx:
|
||||||
for row in extractor(f.src.path, i.start, i.end):
|
for data in extractor(f.src.path, i.start, i.end):
|
||||||
insert_ctx.insert(row + "\n")
|
insert_ctx.insert(data)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
@@ -71,7 +71,7 @@ def decimate(data, interval, args, insert_function, final):
|
|||||||
data = data[:n,:]
|
data = data[:n,:]
|
||||||
|
|
||||||
# Reshape it into 3D so we can process 'factor' rows at a time
|
# Reshape it into 3D so we can process 'factor' rows at a time
|
||||||
data.shape = (n // factor, factor, m)
|
data = data.reshape(n // factor, factor, m)
|
||||||
|
|
||||||
# Fill the result
|
# Fill the result
|
||||||
out = np.c_[ np.mean(data[:,:,mean_col], axis=1),
|
out = np.c_[ np.mean(data[:,:,mean_col], axis=1),
|
||||||
|
@@ -4,6 +4,7 @@ from __future__ import absolute_import
|
|||||||
|
|
||||||
import nilmdb.client
|
import nilmdb.client
|
||||||
from nilmdb.client import Client
|
from nilmdb.client import Client
|
||||||
|
from nilmdb.client.numpyclient import NumpyClient
|
||||||
from nilmdb.utils.printf import *
|
from nilmdb.utils.printf import *
|
||||||
from nilmdb.utils.time import (parse_time, timestamp_to_human,
|
from nilmdb.utils.time import (parse_time, timestamp_to_human,
|
||||||
timestamp_to_seconds)
|
timestamp_to_seconds)
|
||||||
@@ -247,72 +248,7 @@ class Filter(object):
|
|||||||
# All good -- write the metadata in case it's not already there
|
# All good -- write the metadata in case it's not already there
|
||||||
self._client_dest.stream_update_metadata(self.dest.path, data)
|
self._client_dest.stream_update_metadata(self.dest.path, data)
|
||||||
|
|
||||||
# Main processing helper
|
# The main filter processing method.
|
||||||
def process_python(self, function, rows, args = None, partial = False):
|
|
||||||
"""Process data in chunks of 'rows' data at a time.
|
|
||||||
|
|
||||||
This provides data as nested Python lists and expects the same
|
|
||||||
back.
|
|
||||||
|
|
||||||
function: function to process the data
|
|
||||||
rows: maximum number of rows to pass to 'function' at once
|
|
||||||
args: tuple containing extra arguments to pass to 'function'
|
|
||||||
partial: if true, less than 'rows' may be passed to 'function'.
|
|
||||||
if false, partial data at the end of an interval will
|
|
||||||
be dropped.
|
|
||||||
|
|
||||||
'function' should be defined like:
|
|
||||||
function(data, *args)
|
|
||||||
It will be passed a list containing up to 'rows' rows of
|
|
||||||
data from the source stream, and any arguments passed in
|
|
||||||
'args'. It should transform the data as desired, and return a
|
|
||||||
new list of rdata, which will be inserted into the destination
|
|
||||||
stream.
|
|
||||||
"""
|
|
||||||
if args is None:
|
|
||||||
args = []
|
|
||||||
extractor = Client(self.src.url).stream_extract
|
|
||||||
inserter = Client(self.dest.url).stream_insert_context
|
|
||||||
|
|
||||||
# Parse input data. We use homogenous types for now, which
|
|
||||||
# means the timestamp type will be either float or int.
|
|
||||||
if "int" in self.src.layout_type:
|
|
||||||
parser = lambda line: [ int(x) for x in line.split() ]
|
|
||||||
else:
|
|
||||||
parser = lambda line: [ float(x) for x in line.split() ]
|
|
||||||
|
|
||||||
# Format output data.
|
|
||||||
formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"
|
|
||||||
|
|
||||||
for interval in self.intervals():
|
|
||||||
print "Processing", self.interval_string(interval)
|
|
||||||
with inserter(self.dest.path,
|
|
||||||
interval.start, interval.end) as insert_ctx:
|
|
||||||
src_array = []
|
|
||||||
for line in extractor(self.src.path,
|
|
||||||
interval.start, interval.end):
|
|
||||||
# Read in data
|
|
||||||
src_array.append([ float(x) for x in line.split() ])
|
|
||||||
|
|
||||||
if len(src_array) == rows:
|
|
||||||
# Pass through filter function
|
|
||||||
dest_array = function(src_array, *args)
|
|
||||||
|
|
||||||
# Write result to destination
|
|
||||||
out = [ formatter(row) for row in dest_array ]
|
|
||||||
insert_ctx.insert("".join(out))
|
|
||||||
|
|
||||||
# Clear source array
|
|
||||||
src_array = []
|
|
||||||
|
|
||||||
# Take care of partial chunk
|
|
||||||
if len(src_array) and partial:
|
|
||||||
dest_array = function(src_array, *args)
|
|
||||||
out = [ formatter(row) for row in dest_array ]
|
|
||||||
insert_ctx.insert("".join(out))
|
|
||||||
|
|
||||||
# Like process_python, but provides Numpy arrays and allows for
|
|
||||||
# partial processing.
|
|
||||||
def process_numpy(self, function, args = None, rows = 100000):
|
def process_numpy(self, function, args = None, rows = 100000):
|
||||||
"""For all intervals that exist in self.src but don't exist in
|
"""For all intervals that exist in self.src but don't exist in
|
||||||
self.dest, call 'function' with a Numpy array corresponding to
|
self.dest, call 'function' with a Numpy array corresponding to
|
||||||
@@ -342,8 +278,8 @@ class Filter(object):
|
|||||||
"""
|
"""
|
||||||
if args is None:
|
if args is None:
|
||||||
args = []
|
args = []
|
||||||
extractor = Client(self.src.url).stream_extract
|
extractor = NumpyClient(self.src.url).stream_extract_numpy
|
||||||
inserter = Client(self.dest.url).stream_insert_context
|
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
|
||||||
|
|
||||||
# Format output data.
|
# Format output data.
|
||||||
formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"
|
formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"
|
||||||
@@ -357,22 +293,12 @@ class Filter(object):
|
|||||||
print "Processing", self.interval_string(interval)
|
print "Processing", self.interval_string(interval)
|
||||||
with inserter(self.dest.path,
|
with inserter(self.dest.path,
|
||||||
interval.start, interval.end) as insert_ctx:
|
interval.start, interval.end) as insert_ctx:
|
||||||
def insert_function(array):
|
insert_function = insert_ctx.insert
|
||||||
s = cStringIO.StringIO()
|
|
||||||
if len(np.shape(array)) != 2:
|
|
||||||
raise Exception("array must be 2-dimensional")
|
|
||||||
np.savetxt(s, array)
|
|
||||||
insert_ctx.insert(s.getvalue())
|
|
||||||
|
|
||||||
extract = extractor(self.src.path, interval.start, interval.end)
|
|
||||||
old_array = np.array([])
|
old_array = np.array([])
|
||||||
for batched in batch(extract, rows):
|
for new_array in extractor(self.src.path,
|
||||||
# Read in this batch of data. This turns out to
|
interval.start, interval.end,
|
||||||
# be a very fast way to read and convert it (order
|
layout = self.src.layout,
|
||||||
# of magnitude faster than numpy.loadtxt)
|
maxrows = rows):
|
||||||
new_array = np.fromstring("\n".join(batched), sep=' ')
|
|
||||||
new_array = new_array.reshape(-1, self.src.total_count)
|
|
||||||
|
|
||||||
# If we still had old data left, combine it
|
# If we still had old data left, combine it
|
||||||
if old_array.shape[0] != 0:
|
if old_array.shape[0] != 0:
|
||||||
array = np.vstack((old_array, new_array))
|
array = np.vstack((old_array, new_array))
|
||||||
|
@@ -8,7 +8,7 @@ import nilmdb.client
|
|||||||
from numpy import *
|
from numpy import *
|
||||||
import scipy.fftpack
|
import scipy.fftpack
|
||||||
import scipy.signal
|
import scipy.signal
|
||||||
from matplotlib import pyplot as p
|
#from matplotlib import pyplot as p
|
||||||
import bisect
|
import bisect
|
||||||
|
|
||||||
def main(argv = None):
|
def main(argv = None):
|
||||||
|
Reference in New Issue
Block a user