Compare commits

...

5 Commits

Author SHA1 Message Date
d23fa9ee78 Remove unnecessary option group 2013-03-12 19:02:29 -04:00
2b9ecc6697 Add nilm-copy tool 2013-03-12 18:52:39 -04:00
54f8c34f8e Decimate seems to work pretty well right now 2013-03-12 18:09:39 -04:00
9d38d6c21b work in progress 2013-03-11 20:38:33 -04:00
4243301434 Lots of updates to generic filter, specific decimate module 2013-03-11 19:48:09 -04:00
6 changed files with 300 additions and 28 deletions

View File

@@ -1,3 +1,9 @@
test:
nilmtool remove /lees-compressor/noleak/raw~4 -s 2000 -e 2020
nilmtool remove /lees-compressor/noleak/raw~16 -s 2000 -e 2020
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/noleak/raw /lees-compressor/noleak/raw~4
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/noleak/raw~4 /lees-compressor/noleak/raw~16
all:
@echo "Try 'make install'"

36
nilmtools/copy.py Normal file
View File

@@ -0,0 +1,36 @@
#!/usr/bin/python
import nilmtools.filter
import nilmdb.client
import numpy as np
import sys
def main():
f = nilmtools.filter.Filter()
parser = f.setup_parser("Copy a stream")
# Parse arguments
try:
args = f.parse_args()
except nilmtools.filter.MissingDestination as e:
print "Source is %s (%s)" % (e.src, e.layout)
print "Destination %s doesn't exist" % (e.dest)
print "You could make it with a command like:"
print " nilmtool create", e.dest, e.layout
raise SystemExit(1)
# Copy metadata
meta = f.client.stream_get_metadata(args.srcpath)
f.check_dest_metadata(meta)
# Copy all rows of data as ASCII strings
extractor = nilmdb.client.Client(args.url).stream_extract
inserter = nilmdb.client.Client(args.url).stream_insert_context
for (start, end) in f.intervals():
print "Processing", f.interval_string((start, end))
with inserter(args.destpath, start, end) as insert_ctx:
for row in extractor(args.srcpath, start, end):
insert_ctx.insert(row + "\n")
if __name__ == "__main__":
main()

70
nilmtools/decimate.py Executable file
View File

@@ -0,0 +1,70 @@
#!/usr/bin/python
import nilmtools.filter
import nilmdb.client
import numpy as np
def main():
f = nilmtools.filter.Filter()
parser = f.setup_parser("Decimate a stream")
group = parser.add_argument_group("Decimate options")
group.add_argument('-f', '--factor', action='store', default=4, type=int,
help='Decimation factor (default: %(default)s)')
# Parse arguments
try:
args = f.parse_args()
except nilmtools.filter.MissingDestination as e:
# If no destination, suggest how to create it by figuring out
# a recommended layout.
print "Source is %s (%s)" % (e.src, e.layout)
print "Destination %s doesn't exist" % (e.dest)
if "decimate_source" in f.client.stream_get_metadata(e.src):
rec = e.layout
elif 'int32' in e.layout_type or 'float64' in e.layout_type:
rec = 'float64_' + str(e.layout_count * 3)
else:
rec = 'float32_' + str(e.layout_count * 3)
print "You could make it with a command like:"
print " nilmtool create", e.dest, rec
raise SystemExit(1)
f.check_dest_metadata({ "decimate_source": args.srcpath,
"decimate_factor": args.factor })
# If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client.stream_get_metadata(args.srcpath):
f.process(function = decimate_again, rows = args.factor)
else:
f.process(function = decimate_first, rows = args.factor)
def decimate_first(data):
"""Decimate original data -- result has 3 times as many columns"""
data = np.array(data)
rows, cols = data.shape
n = cols - 1
out = np.zeros(1 + 3 * n)
out[0] = np.mean(data[:, 0], 0)
out[ 1 : n+1 ] = np.mean(data[:, 1 : n+1], 0)
out[ n+1 : 2*n+1] = np.min( data[:, 1 : n+1], 0)
out[2*n+1 : 3*n+1] = np.max( data[:, 1 : n+1], 0)
return [out]
def decimate_again(data):
"""Decimate already-decimated data -- result has the same number
of columns"""
data = np.array(data)
rows, cols = data.shape
n = (cols - 1) // 3
out = np.zeros(1 + 3 * n)
out[0] = np.mean(data[:, 0], 0)
out[ 1 : n+1 ] = np.mean(data[:, 1 : n+1], 0)
out[ n+1 : 2*n+1] = np.min( data[:, n+1 : 2*n+1], 0)
out[2*n+1 : 3*n+1] = np.max( data[:, 2*n+1 : 3*n+1], 0)
return [out]
if __name__ == "__main__":
main()

201
nilmtools/filter.py Executable file → Normal file
View File

@@ -4,63 +4,216 @@ import nilmdb.client
from nilmdb.utils.printf import *
from nilmdb.utils.time import parse_time, format_time
import nilmtools
import itertools
import time
import sys
import re
import argparse
class MissingDestination(Exception):
def __init__(self, src, layout, dest):
self.src = src
self.layout = layout
self.layout_type = layout.split('_')[0]
self.layout_count = int(layout.split('_')[1])
self.dest = dest
Exception.__init__(self, "destination path " + dest + " not found")
class Filter(object):
def __init__(self, description = "Filter data"):
self.args = None
def __init__(self):
self._parser = None
self._args = None
self._client = None
self.parse_args(description)
self._using_client = False
self.srcinfo = None
self.destinfo = None
def parse_args(self, description):
@property
def client(self):
if self._using_client:
raise Exception("Filter client is in use; make another")
return self._client
def setup_parser(self, description = "Filter data"):
parser = argparse.ArgumentParser(
description = description,
formatter_class = argparse.RawDescriptionHelpFormatter)
parser.add_argument("-u", "--url", action="store",
default="http://localhost:12380/",
help="Server URL (default: %(default)s)")
parser.add_argument("srcpath", action="store",
help="Path of source stream, e.g. /foo/bar")
parser.add_argument("destpath", action="store",
help="Path of destination stream, e.g. /foo/bar")
self.args = parser.parse_args()
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = description)
group = parser.add_argument_group("General filter arguments")
group.add_argument("-u", "--url", action="store",
default="http://localhost:12380/",
help="Server URL (default: %(default)s)")
group.add_argument("-D", "--dry-run", action="store_true",
default = False,
help="Just print intervals that would be "
"processed")
group.add_argument("-s", "--start",
metavar="TIME", type=self.arg_time,
help="Starting timestamp for intervals "
"(free-form, inclusive)")
group.add_argument("-e", "--end",
metavar="TIME", type=self.arg_time,
help="Ending timestamp for intervals "
"(free-form, noninclusive)")
group.add_argument("srcpath", action="store",
help="Path of source stream, e.g. /foo/bar")
group.add_argument("destpath", action="store",
help="Path of destination stream, e.g. /foo/bar")
self._parser = parser
return parser
def parse_args(self):
args = self._parser.parse_args()
self._args = args
self._client = nilmdb.client.Client(args.url)
if args.srcpath == args.destpath:
raise Exception("source and destination path must be different")
# Open and print info about the streams
def stream_info_string(info):
return sprintf("%s (%s), %.2fM rows, %.2f hours\n",
info[0], info[1], info[4] / 1e6, info[5] / 3600)
src = self._client.stream_list(args.srcpath, extended = True)
if len(src) != 1:
raise Exception("source path " + args.srcpath + " not found")
print "Source:", stream_info_string(src[0])
self.srcinfo = src[0]
dest = self._client.stream_list(args.destpath, extended = True)
if len(dest) != 1:
raise Exception("destination path " + args.destpath + " not found")
print " Dest:", stream_info_string(dest[0])
raise MissingDestination(self.srcinfo[0], self.srcinfo[1],
args.destpath)
self.destinfo = dest[0]
print "Source:", self.stream_info_string(self.srcinfo)
print " Dest:", self.stream_info_string(self.destinfo)
if args.dry_run:
for interval in self.intervals():
print self.interval_string(interval)
raise SystemExit(0)
return args
def intervals(self):
"""Generate all the intervals that this filter should process"""
self._using_client = True
for i in self._client.stream_intervals(
args.srcpath, diffpath = args.destpath):
self._args.srcpath, diffpath = self._args.destpath,
start = self._args.start, end = self._args.end):
yield i
self._using_client = False
# Misc helpers
def arg_time(self, toparse):
"""Parse a time string argument"""
try:
return nilmdb.utils.time.parse_time(toparse).totimestamp()
except ValueError as e:
raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
str(e), toparse))
def stream_info_string(self, info):
"""Print stream info as a string"""
return sprintf("%s (%s), %.2fM rows, %.2f hours",
info[0], info[1], info[4] / 1e6, info[5] / 3600)
def interval_string(self, interval):
"""Print interval as a string"""
return sprintf("[ %s -> %s ]", format_time(interval[0]),
format_time(interval[1]))
def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. If
there's no conflict, update the metadata to match 'data'."""
metadata = self._client.stream_get_metadata(self._args.destpath)
rows = self.destinfo[4]
for key in data:
wanted = str(data[key])
val = metadata.get(key, wanted)
if val != wanted and rows > 0:
m = "Metadata in destination stream:\n"
m += " %s = %s\n" % (key, val)
m += "doesn't match desired data:\n"
m += " %s = %s\n" % (key, wanted)
m += "Refusing to change it. You can change the stream's "
m += "metadata manually, or\n"
m += "remove existing data from the stream, to prevent "
m += "this error.\n"
raise Exception(m)
# All good -- write the metadata in case it's not already there
self._client.stream_update_metadata(self._args.destpath, data)
# Main processing helper
def process(self, function, rows, args = None, partial = False):
"""Process data in chunks of 'rows' data at a time.
function: function to process the data
rows: maximum number of rows to pass to 'function' at once
args: tuple containing extra arguments to pass to 'function'
partial: if true, less than 'rows' may be passed to 'function'.
if false, partial data at the end of an interval will
be dropped.
'function' should be defined like:
function(data, *args)
It will be passed an array containing up to 'rows' rows of
data from the source stream, and any arguments passed in
'args'. It should transform the data as desired, and return a
new array of data, which will be inserted into the destination
stream.
"""
if args is None:
args = []
extractor = nilmdb.client.Client(self._args.url).stream_extract
inserter = nilmdb.client.Client(self._args.url).stream_insert_context
src = self._args.srcpath
dest = self._args.destpath
# Figure out how to format output data
dest_layout = self.destinfo[1].split('_')[1]
def int_formatter(row):
return ("%.6f " % row[0]) + " ".join(str(int(x)) for x in row[1:])
def float_formatter(row):
return ("%.6f " % row[0]) + " ".join(repr(x) for x in row[1:])
if "int" in dest_layout:
formatter = int_formatter
else:
formatter = float_formatter
for (start, end) in self.intervals():
print "Processing", self.interval_string((start, end))
with inserter(dest, start, end) as insert_ctx:
src_array = []
for line in extractor(src, start, end):
# Read in data
src_array.append([ float(x) for x in line.split() ])
if len(src_array) == rows:
# Pass through filter function
dest_array = function(src_array, *args)
# Write result to destination
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("\n".join(out) + "\n")
# Clear source array
src_array = []
# Take care of partial chunk
if len(src_array) and partial:
dest_array = function(src_array, *args)
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("\n".join(out) + "\n")
def main():
# This is just a dummy function; actual filters can use the other
# functions to prepare stuff, and then do something with the data.
f = Filter()
for interval in f.intervals():
print "Generic filter: need to handle interval:", interval
parser = f.setup_parser()
args = f.parse_args()
for (start, end) in f.intervals():
print "Generic filter: need to handle", start, " to ", end
if __name__ == "__main__":
main()

View File

@@ -4,6 +4,7 @@ import nilmdb.client
from nilmdb.utils.printf import *
from nilmdb.utils.time import parse_time, format_time
import nilmtools
import time
import sys
import re
@@ -16,7 +17,10 @@ class ParseError(Exception):
super(ParseError, self).__init__(msg)
def parse_args():
parser = argparse.ArgumentParser(description = """\
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = """\
Insert data from ethstream, either live (using the system time as a
reference) or prerecorded (using comments in the file as a reference).
@@ -24,18 +28,20 @@ def parse_args():
Small discrepencies between the accumulated timestamps and the
reference time are ignored; larger discrepencies cause gaps to be
created in the stream. Overlapping data returns an error.
""", formatter_class = argparse.RawDescriptionHelpFormatter)
""")
parser.add_argument("-u", "--url", action="store",
default="http://localhost:12380/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-r", "--rate", action="store", default=8000, type=float,
parser.add_argument("-r", "--rate", action="store", default=8000,
type=float,
help="Data rate in Hz (default: %(default)s)")
parser.add_argument("-l", "--live", action="store_true",
help="Live capture; use system time to verify rate")
parser.add_argument("path", action="store",
help="Path of stream, e.g. /foo/bar")
parser.add_argument("infile", type=argparse.FileType('r'), nargs='*',
default=[sys.stdin], help="Input files (default: stdin)")
default=[sys.stdin],
help="Input files (default: stdin)")
args = parser.parse_args()
printf("Stream path: %s\n", args.path)

View File

@@ -69,6 +69,7 @@ setup(name='nilmtools',
'console_scripts': [
'nilm-decimate = nilmtools.decimate:main',
'nilm-insert = nilmtools.insert:main',
'nilm-copy = nilmtools.copy:main',
],
},
zip_safe = False,