Compare commits

..

2 Commits

Author SHA1 Message Date
de68956f76 Update copy tool 2013-03-16 23:13:45 -04:00
e73dd313d5 Reworked things to match nilmdb updates; a bit faster 2013-03-16 14:46:49 -04:00
6 changed files with 159 additions and 111 deletions

View File

@@ -1,8 +1,10 @@
test: test:
nilmtool remove /lees-compressor/noleak/raw~4 -s 2000 -e 2020 -@nilmtool destroy /lees-compressor/no-leak/raw/4 || true
nilmtool remove /lees-compressor/noleak/raw~16 -s 2000 -e 2020 -@nilmtool destroy /lees-compressor/no-leak/raw/16 || true
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/noleak/raw /lees-compressor/noleak/raw~4 -@nilmtool create /lees-compressor/no-leak/raw/4 float32_18 || true
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/noleak/raw~4 /lees-compressor/noleak/raw~16 -@nilmtool create /lees-compressor/no-leak/raw/16 float32_18 || true
time python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
all: all:
@echo "Try 'make install'" @echo "Try 'make install'"

16
nilmtools/copy.py Normal file → Executable file
View File

@@ -13,23 +13,23 @@ def main():
try: try:
args = f.parse_args() args = f.parse_args()
except nilmtools.filter.MissingDestination as e: except nilmtools.filter.MissingDestination as e:
print "Source is %s (%s)" % (e.src, e.layout) print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest) print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:" print "You could make it with a command like:"
print " nilmtool create", e.dest, e.layout print " nilmtool create", e.dest.path, e.src.layout
raise SystemExit(1) raise SystemExit(1)
# Copy metadata # Copy metadata
meta = f.client.stream_get_metadata(args.srcpath) meta = f.client.stream_get_metadata(f.src.path)
f.check_dest_metadata(meta) f.check_dest_metadata(meta)
# Copy all rows of data as ASCII strings # Copy all rows of data as ASCII strings
extractor = nilmdb.client.Client(args.url).stream_extract extractor = nilmdb.client.Client(args.url).stream_extract
inserter = nilmdb.client.Client(args.url).stream_insert_context inserter = nilmdb.client.Client(args.url).stream_insert_context
for (start, end) in f.intervals(): for i in f.intervals():
print "Processing", f.interval_string((start, end)) print "Processing", f.interval_string(i)
with inserter(args.destpath, start, end) as insert_ctx: with inserter(f.dest.path, i.start, i.end) as insert_ctx:
for row in extractor(args.srcpath, start, end): for row in extractor(f.src.path, i.start, i.end):
insert_ctx.insert(row + "\n") insert_ctx.insert(row + "\n")
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -3,6 +3,7 @@
import nilmtools.filter import nilmtools.filter
import nilmdb.client import nilmdb.client
import numpy as np import numpy as np
import operator
def main(): def main():
f = nilmtools.filter.Filter() f = nilmtools.filter.Filter()
@@ -16,55 +17,63 @@ def main():
except nilmtools.filter.MissingDestination as e: except nilmtools.filter.MissingDestination as e:
# If no destination, suggest how to create it by figuring out # If no destination, suggest how to create it by figuring out
# a recommended layout. # a recommended layout.
print "Source is %s (%s)" % (e.src, e.layout) src = e.src
print "Destination %s doesn't exist" % (e.dest) dest = e.dest
if "decimate_source" in f.client.stream_get_metadata(e.src): print "Source is %s (%s)" % (src.path, src.layout)
rec = e.layout print "Destination %s doesn't exist" % (dest.path)
elif 'int32' in e.layout_type or 'float64' in e.layout_type: if "decimate_source" in f.client.stream_get_metadata(src.path):
rec = 'float64_' + str(e.layout_count * 3) rec = src.layout
elif 'int32' in src.layout_type or 'float64' in src.layout_type:
rec = 'float64_' + str(src.layout_count * 3)
else: else:
rec = 'float32_' + str(e.layout_count * 3) rec = 'float32_' + str(src.layout_count * 3)
print "You could make it with a command like:" print "You could make it with a command like:"
print " nilmtool create", e.dest, rec print " nilmtool create", dest.path, rec
raise SystemExit(1) raise SystemExit(1)
if not (args.factor >= 2):
raise Exception("factor needs to be 2 or more")
f.check_dest_metadata({ "decimate_source": args.srcpath, f.check_dest_metadata({ "decimate_source": args.srcpath,
"decimate_factor": args.factor }) "decimate_factor": args.factor })
# If source is decimated, we have to decimate a bit differently # If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client.stream_get_metadata(args.srcpath): if "decimate_source" in f.client.stream_get_metadata(args.srcpath):
f.process(function = decimate_again, rows = args.factor) n = f.src.layout_count // 3
f.process_python(function = decimate_again, rows = args.factor,
args = (n,))
else: else:
f.process(function = decimate_first, rows = args.factor) n = f.src.layout_count
f.process_python(function = decimate_first, rows = args.factor,
args = (n,))
def decimate_first(data): def decimate_first(data, n):
"""Decimate original data -- result has 3 times as many columns""" """Decimate original data -- result has 3 times as many columns"""
data = np.array(data) # For this simple calculation, converting to a Numpy array
rows, cols = data.shape # and doing the math is slower than just doing it directly.
n = cols - 1 rows = iter(data)
out = np.zeros(1 + 3 * n) r_sum = r_min = r_max = rows.next()
for row in rows:
r_sum = map(operator.add, r_sum, row)
r_min = map(min, r_min, row)
r_max = map(max, r_max, row)
r_mean = [ x / len(data) for x in r_sum ]
return [ [ r_mean[0] ] + r_mean[1:] + r_min[1:] + r_max[1:] ]
out[0] = np.mean(data[:, 0], 0) def decimate_again(data, n):
out[ 1 : n+1 ] = np.mean(data[:, 1 : n+1], 0)
out[ n+1 : 2*n+1] = np.min( data[:, 1 : n+1], 0)
out[2*n+1 : 3*n+1] = np.max( data[:, 1 : n+1], 0)
return [out]
def decimate_again(data):
"""Decimate already-decimated data -- result has the same number """Decimate already-decimated data -- result has the same number
of columns""" of columns"""
data = np.array(data) rows = iter(data)
rows, cols = data.shape r = rows.next()
n = (cols - 1) // 3 r_sum = r[0:(n+1)]
out = np.zeros(1 + 3 * n) r_min = r[(n+1):(2*n+1)]
r_max = r[(2*n+1):(3*n+1)]
out[0] = np.mean(data[:, 0], 0) for r in rows:
out[ 1 : n+1 ] = np.mean(data[:, 1 : n+1], 0) r_sum = map(operator.add, r_sum, r[0:(n+1)])
out[ n+1 : 2*n+1] = np.min( data[:, n+1 : 2*n+1], 0) r_min = map(min, r_min, r[(n+1):(2*n+1)])
out[2*n+1 : 3*n+1] = np.max( data[:, 2*n+1 : 3*n+1], 0) r_max = map(max, r_max, r[(2*n+1):(3*n+1)])
r_mean = [ x / len(data) for x in r_sum ]
return [out] return [ r_mean + r_min + r_max ]
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -2,7 +2,9 @@
import nilmdb.client import nilmdb.client
from nilmdb.utils.printf import * from nilmdb.utils.printf import *
from nilmdb.utils.time import parse_time, format_time from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds)
from nilmdb.utils.interval import Interval
import nilmtools import nilmtools
@@ -13,13 +15,32 @@ import re
import argparse import argparse
class MissingDestination(Exception): class MissingDestination(Exception):
def __init__(self, src, layout, dest): def __init__(self, src, dest):
self.src = src self.src = src
self.layout = layout
self.layout_type = layout.split('_')[0]
self.layout_count = int(layout.split('_')[1])
self.dest = dest self.dest = dest
Exception.__init__(self, "destination path " + dest + " not found") Exception.__init__(self, "destination path " + dest.path + " not found")
class StreamInfo(object):
def __init__(self, info):
self.info = info
try:
self.path = info[0]
self.layout = info[1]
self.layout_type = self.layout.split('_')[0]
self.layout_count = int(self.layout.split('_')[1])
self.total_count = self.layout_count + 1
self.timestamp_min = info[2]
self.timestamp_max = info[3]
self.rows = info[4]
self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5])
except IndexError, TypeError:
pass
def __str__(self):
"""Print stream info as a string"""
return sprintf("%s (%s), %.2fM rows, %.2f hours",
self.path, self.layout, self.rows / 1e6,
self.seconds / 3600.0)
class Filter(object): class Filter(object):
@@ -28,8 +49,8 @@ class Filter(object):
self._args = None self._args = None
self._client = None self._client = None
self._using_client = False self._using_client = False
self.srcinfo = None self.src = None
self.destinfo = None self.dest = None
@property @property
def client(self): def client(self):
@@ -65,6 +86,11 @@ class Filter(object):
self._parser = parser self._parser = parser
return parser return parser
def interval_string(self, interval):
return sprintf("[ %s -> %s ]",
timestamp_to_human(interval.start),
timestamp_to_human(interval.end))
def parse_args(self): def parse_args(self):
args = self._parser.parse_args() args = self._parser.parse_args()
self._args = args self._args = args
@@ -77,16 +103,15 @@ class Filter(object):
src = self._client.stream_list(args.srcpath, extended = True) src = self._client.stream_list(args.srcpath, extended = True)
if len(src) != 1: if len(src) != 1:
raise Exception("source path " + args.srcpath + " not found") raise Exception("source path " + args.srcpath + " not found")
self.srcinfo = src[0] self.src = StreamInfo(src[0])
dest = self._client.stream_list(args.destpath, extended = True) dest = self._client.stream_list(args.destpath, extended = True)
if len(dest) != 1: if len(dest) != 1:
raise MissingDestination(self.srcinfo[0], self.srcinfo[1], raise MissingDestination(self.src, StreamInfo([args.destpath]))
args.destpath) self.dest = StreamInfo(dest[0])
self.destinfo = dest[0]
print "Source:", self.stream_info_string(self.srcinfo) print "Source:", self.src
print " Dest:", self.stream_info_string(self.destinfo) print " Dest:", self.dest
if args.dry_run: if args.dry_run:
for interval in self.intervals(): for interval in self.intervals():
@@ -98,40 +123,39 @@ class Filter(object):
def intervals(self): def intervals(self):
"""Generate all the intervals that this filter should process""" """Generate all the intervals that this filter should process"""
self._using_client = True self._using_client = True
for i in self._client.stream_intervals( saved_int = None
for (start, end) in self._client.stream_intervals(
self._args.srcpath, diffpath = self._args.destpath, self._args.srcpath, diffpath = self._args.destpath,
start = self._args.start, end = self._args.end): start = self._args.start, end = self._args.end):
yield i
# Join adjacent intervals
if saved_int is not None:
if saved_int.end == start:
start = saved_int.start
else:
yield saved_int
saved_int = Interval(start, end)
if saved_int is not None:
yield saved_int
self._using_client = False self._using_client = False
# Misc helpers # Misc helpers
def arg_time(self, toparse): def arg_time(self, toparse):
"""Parse a time string argument""" """Parse a time string argument"""
try: try:
return nilmdb.utils.time.parse_time(toparse).totimestamp() return nilmdb.utils.time.parse_time(toparse)
except ValueError as e: except ValueError as e:
raise argparse.ArgumentTypeError(sprintf("%s \"%s\"", raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
str(e), toparse)) str(e), toparse))
def stream_info_string(self, info):
"""Print stream info as a string"""
return sprintf("%s (%s), %.2fM rows, %.2f hours",
info[0], info[1], info[4] / 1e6, info[5] / 3600)
def interval_string(self, interval):
"""Print interval as a string"""
return sprintf("[ %s -> %s ]", format_time(interval[0]),
format_time(interval[1]))
def check_dest_metadata(self, data): def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. If """See if the metadata jives, and complain if it doesn't. If
there's no conflict, update the metadata to match 'data'.""" there's no conflict, update the metadata to match 'data'."""
metadata = self._client.stream_get_metadata(self._args.destpath) metadata = self._client.stream_get_metadata(self._args.destpath)
rows = self.destinfo[4]
for key in data: for key in data:
wanted = str(data[key]) wanted = str(data[key])
val = metadata.get(key, wanted) val = metadata.get(key, wanted)
if val != wanted and rows > 0: if val != wanted and self.dest.rows > 0:
m = "Metadata in destination stream:\n" m = "Metadata in destination stream:\n"
m += " %s = %s\n" % (key, val) m += " %s = %s\n" % (key, val)
m += "doesn't match desired data:\n" m += "doesn't match desired data:\n"
@@ -145,9 +169,12 @@ class Filter(object):
self._client.stream_update_metadata(self._args.destpath, data) self._client.stream_update_metadata(self._args.destpath, data)
# Main processing helper # Main processing helper
def process(self, function, rows, args = None, partial = False): def process_python(self, function, rows, args = None, partial = False):
"""Process data in chunks of 'rows' data at a time. """Process data in chunks of 'rows' data at a time.
This provides data as nested Python lists and expects the same
back.
function: function to process the data function: function to process the data
rows: maximum number of rows to pass to 'function' at once rows: maximum number of rows to pass to 'function' at once
args: tuple containing extra arguments to pass to 'function' args: tuple containing extra arguments to pass to 'function'
@@ -157,10 +184,10 @@ class Filter(object):
'function' should be defined like: 'function' should be defined like:
function(data, *args) function(data, *args)
It will be passed an array containing up to 'rows' rows of It will be passed a list containing up to 'rows' rows of
data from the source stream, and any arguments passed in data from the source stream, and any arguments passed in
'args'. It should transform the data as desired, and return a 'args'. It should transform the data as desired, and return a
new array of data, which will be inserted into the destination new list of rdata, which will be inserted into the destination
stream. stream.
""" """
if args is None: if args is None:
@@ -170,22 +197,21 @@ class Filter(object):
src = self._args.srcpath src = self._args.srcpath
dest = self._args.destpath dest = self._args.destpath
# Figure out how to format output data # Parse input data. We use homogenous types for now, which
dest_layout = self.destinfo[1].split('_')[1] # means the timestamp type will be either float or int.
def int_formatter(row): if "int" in self.src.layout_type:
return ("%.6f " % row[0]) + " ".join(str(int(x)) for x in row[1:]) parser = lambda line: [ int(x) for x in line.split() ]
def float_formatter(row):
return ("%.6f " % row[0]) + " ".join(repr(x) for x in row[1:])
if "int" in dest_layout:
formatter = int_formatter
else: else:
formatter = float_formatter parser = lambda line: [ float(x) for x in line.split() ]
for (start, end) in self.intervals(): # Format output data.
print "Processing", self.interval_string((start, end)) formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"
with inserter(dest, start, end) as insert_ctx:
for interval in self.intervals():
print "Processing", self.interval_string(interval)
with inserter(dest, interval.start, interval.end) as insert_ctx:
src_array = [] src_array = []
for line in extractor(src, start, end): for line in extractor(src, interval.start, interval.end):
# Read in data # Read in data
src_array.append([ float(x) for x in line.split() ]) src_array.append([ float(x) for x in line.split() ])
@@ -195,7 +221,7 @@ class Filter(object):
# Write result to destination # Write result to destination
out = [ formatter(row) for row in dest_array ] out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("\n".join(out) + "\n") insert_ctx.insert("".join(out))
# Clear source array # Clear source array
src_array = [] src_array = []
@@ -204,7 +230,7 @@ class Filter(object):
if len(src_array) and partial: if len(src_array) and partial:
dest_array = function(src_array, *args) dest_array = function(src_array, *args)
out = [ formatter(row) for row in dest_array ] out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("\n".join(out) + "\n") insert_ctx.insert("".join(out))
def main(): def main():
# This is just a dummy function; actual filters can use the other # This is just a dummy function; actual filters can use the other
@@ -212,8 +238,8 @@ def main():
f = Filter() f = Filter()
parser = f.setup_parser() parser = f.setup_parser()
args = f.parse_args() args = f.parse_args()
for (start, end) in f.intervals(): for i in f.intervals():
print "Generic filter: need to handle", start, " to ", end print "Generic filter: need to handle", f.interval_string(i)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -2,7 +2,9 @@
import nilmdb.client import nilmdb.client
from nilmdb.utils.printf import * from nilmdb.utils.printf import *
from nilmdb.utils.time import parse_time, format_time from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds, seconds_to_timestamp,
rate_to_period, now as time_now)
import nilmtools import nilmtools
import time import time
@@ -61,20 +63,22 @@ def main(args = None):
# data_ts is the timestamp that we'll use for the current line # data_ts is the timestamp that we'll use for the current line
data_ts_base = 0 data_ts_base = 0
data_ts_inc = 0 data_ts_inc = 0
data_ts_step = 1.0 / args.rate data_ts_rate = args.rate
# clock_ts is the imprecise "real" timestamp (from the filename, # clock_ts is the imprecise "real" timestamp (from the filename,
# comments, or or system clock) # comments, or or system clock)
clock_ts = None clock_ts = None
def print_clock_updated(): def print_clock_updated():
printf("Clock time updated to %s\n", format_time(clock_ts)) printf("Clock time updated to %s\n", timestamp_to_human(clock_ts))
if data_ts_base != 0: if data_ts_base != 0:
diff = data_ts - clock_ts diff = data_ts - clock_ts
if diff >= 0: if diff >= 0:
printf(" (data timestamp ahead by %.6f sec)\n", diff) printf(" (data timestamp ahead by %.6f sec)\n",
timestamp_to_seconds(diff))
else: else:
printf(" (data timestamp behind by %.6f sec)\n", -diff) printf(" (data timestamp behind by %.6f sec)\n",
timestamp_to_seconds(-diff))
with client.stream_insert_context(args.path) as stream: with client.stream_insert_context(args.path) as stream:
for f in args.infile: for f in args.infile:
@@ -92,7 +96,7 @@ def main(args = None):
# Subtract 1 hour because files are created at the end # Subtract 1 hour because files are created at the end
# of the hour. Hopefully, we'll be able to use # of the hour. Hopefully, we'll be able to use
# internal comments and this value won't matter anyway. # internal comments and this value won't matter anyway.
clock_ts = parse_time(filename).totimestamp() - 3600 clock_ts = parse_time(filename) - seconds_to_timestamp(3600)
print_clock_updated() print_clock_updated()
except ValueError: except ValueError:
pass pass
@@ -101,7 +105,8 @@ def main(args = None):
# Read each line # Read each line
for line in f: for line in f:
data_ts = data_ts_base + data_ts_inc * data_ts_step data_ts = data_ts_base + rate_to_period(data_ts_rate,
data_ts_inc)
# If no content other than the newline, skip it # If no content other than the newline, skip it
if len(line) <= 1: if len(line) <= 1:
@@ -110,7 +115,7 @@ def main(args = None):
# If line starts with a comment, look for a timestamp # If line starts with a comment, look for a timestamp
if line[0] == '#': if line[0] == '#':
try: try:
clock_ts = parse_time(line[1:]).totimestamp() clock_ts = parse_time(line[1:])
print_clock_updated() print_clock_updated()
except ValueError: except ValueError:
pass pass
@@ -118,30 +123,30 @@ def main(args = None):
# If inserting live, use clock timestamp # If inserting live, use clock timestamp
if live: if live:
clock_ts = time.time() clock_ts = time_now()
# If we have a real timestamp, compare it to the data # If we have a real timestamp, compare it to the data
# timestamp, and make sure things match up. # timestamp, and make sure things match up.
if clock_ts is not None: if clock_ts is not None:
if (data_ts - 10) > clock_ts: if (data_ts - seconds_to_timestamp(10)) > clock_ts:
# Accumulated line timestamps are in the future. # Accumulated line timestamps are in the future.
# If we were to set data_ts=clock_ts, we'd create # If we were to set data_ts=clock_ts, we'd create
# an overlap, so we have to just bail out here. # an overlap, so we have to just bail out here.
err = sprintf("Data is coming in too fast: data time " err = sprintf("Data is coming in too fast: data time "
"is %s but clock time is only %s", "is %s but clock time is only %s",
format_time(data_ts), timestamp_to_human(data_ts),
format_time(clock_ts)) timestamp_to_human(clock_ts))
raise ParseError(filename, err) raise ParseError(filename, err)
if (data_ts + 10) < clock_ts: if (data_ts + seconds_to_timestamp(10)) < clock_ts:
# Accumulated line timetamps are in the past. We # Accumulated line timetamps are in the past. We
# can just skip some time and leave a gap in the # can just skip some time and leave a gap in the
# data. # data.
if data_ts_base != 0: if data_ts_base != 0:
printf("Skipping data timestamp forward from " printf("Skipping data timestamp forward from "
"%s to %s to match clock time\n", "%s to %s to match clock time\n",
format_time(data_ts), timestamp_to_human(data_ts),
format_time(clock_ts)) timestamp_to_human(clock_ts))
stream.finalize() stream.finalize()
data_ts_base = data_ts = clock_ts data_ts_base = data_ts = clock_ts
data_ts_inc = 0 data_ts_inc = 0
@@ -166,7 +171,7 @@ def main(args = None):
continue continue
# Insert it # Insert it
stream.insert("%.6f %s" % (data_ts, line)) stream.insert("%d %s" % (data_ts, line))
print "Done" print "Done"
if __name__ == "__main__": if __name__ == "__main__":

6
scripts/decimate-it.sh Executable file
View File

@@ -0,0 +1,6 @@
#!/bin/bash
for i in "$@"; do
nilmtool create /lees-compressor/no-leak/raw/$((4**$i)) float32_18
/usr/bin/time -v nilm-decimate /lees-compressor/no-leak/raw/$((4**($i-1))) /lees-compressor/no-leak/raw/$((4**$i))
done