diff --git a/Makefile b/Makefile index 523c978..7dd7d06 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,10 @@ test: - nilmtool remove /lees-compressor/noleak/raw~4 -s 2000 -e 2020 - nilmtool remove /lees-compressor/noleak/raw~16 -s 2000 -e 2020 - python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/noleak/raw /lees-compressor/noleak/raw~4 - python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/noleak/raw~4 /lees-compressor/noleak/raw~16 + -@nilmtool destroy /lees-compressor/no-leak/raw/4 || true + -@nilmtool destroy /lees-compressor/no-leak/raw/16 || true + -@nilmtool create /lees-compressor/no-leak/raw/4 float32_18 || true + -@nilmtool create /lees-compressor/no-leak/raw/16 float32_18 || true + time python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4 + python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16 all: @echo "Try 'make install'" diff --git a/nilmtools/decimate.py b/nilmtools/decimate.py index 14f4488..e449923 100755 --- a/nilmtools/decimate.py +++ b/nilmtools/decimate.py @@ -3,6 +3,7 @@ import nilmtools.filter import nilmdb.client import numpy as np +import operator def main(): f = nilmtools.filter.Filter() @@ -16,55 +17,63 @@ def main(): except nilmtools.filter.MissingDestination as e: # If no destination, suggest how to create it by figuring out # a recommended layout. - print "Source is %s (%s)" % (e.src, e.layout) - print "Destination %s doesn't exist" % (e.dest) - if "decimate_source" in f.client.stream_get_metadata(e.src): - rec = e.layout - elif 'int32' in e.layout_type or 'float64' in e.layout_type: - rec = 'float64_' + str(e.layout_count * 3) + src = e.src + dest = e.dest + print "Source is %s (%s)" % (src.path, src.layout) + print "Destination %s doesn't exist" % (dest.path) + if "decimate_source" in f.client.stream_get_metadata(src.path): + rec = src.layout + elif 'int32' in src.layout_type or 'float64' in src.layout_type: + rec = 'float64_' + str(src.layout_count * 3) else: - rec = 'float32_' + str(e.layout_count * 3) + rec = 'float32_' + str(src.layout_count * 3) print "You could make it with a command like:" - print " nilmtool create", e.dest, rec + print " nilmtool create", dest.path, rec raise SystemExit(1) + if not (args.factor >= 2): + raise Exception("factor needs to be 2 or more") + f.check_dest_metadata({ "decimate_source": args.srcpath, "decimate_factor": args.factor }) # If source is decimated, we have to decimate a bit differently if "decimate_source" in f.client.stream_get_metadata(args.srcpath): - f.process(function = decimate_again, rows = args.factor) + n = f.src.layout_count // 3 + f.process_python(function = decimate_again, rows = args.factor, + args = (n,)) else: - f.process(function = decimate_first, rows = args.factor) + n = f.src.layout_count + f.process_python(function = decimate_first, rows = args.factor, + args = (n,)) -def decimate_first(data): +def decimate_first(data, n): """Decimate original data -- result has 3 times as many columns""" - data = np.array(data) - rows, cols = data.shape - n = cols - 1 - out = np.zeros(1 + 3 * n) - - out[0] = np.mean(data[:, 0], 0) - out[ 1 : n+1 ] = np.mean(data[:, 1 : n+1], 0) - out[ n+1 : 2*n+1] = np.min( data[:, 1 : n+1], 0) - out[2*n+1 : 3*n+1] = np.max( data[:, 1 : n+1], 0) - - return [out] + # For this simple calculation, converting to a Numpy array + # and doing the math is slower than just doing it directly. + rows = iter(data) + r_sum = r_min = r_max = rows.next() + for row in rows: + r_sum = map(operator.add, r_sum, row) + r_min = map(min, r_min, row) + r_max = map(max, r_max, row) + r_mean = [ x / len(data) for x in r_sum ] + return [ [ r_mean[0] ] + r_mean[1:] + r_min[1:] + r_max[1:] ] -def decimate_again(data): +def decimate_again(data, n): """Decimate already-decimated data -- result has the same number of columns""" - data = np.array(data) - rows, cols = data.shape - n = (cols - 1) // 3 - out = np.zeros(1 + 3 * n) - - out[0] = np.mean(data[:, 0], 0) - out[ 1 : n+1 ] = np.mean(data[:, 1 : n+1], 0) - out[ n+1 : 2*n+1] = np.min( data[:, n+1 : 2*n+1], 0) - out[2*n+1 : 3*n+1] = np.max( data[:, 2*n+1 : 3*n+1], 0) - - return [out] + rows = iter(data) + r = rows.next() + r_sum = r[0:(n+1)] + r_min = r[(n+1):(2*n+1)] + r_max = r[(2*n+1):(3*n+1)] + for r in rows: + r_sum = map(operator.add, r_sum, r[0:(n+1)]) + r_min = map(min, r_min, r[(n+1):(2*n+1)]) + r_max = map(max, r_max, r[(2*n+1):(3*n+1)]) + r_mean = [ x / len(data) for x in r_sum ] + return [ r_mean + r_min + r_max ] if __name__ == "__main__": main() diff --git a/nilmtools/filter.py b/nilmtools/filter.py index bbb01d4..84d47c8 100644 --- a/nilmtools/filter.py +++ b/nilmtools/filter.py @@ -2,7 +2,9 @@ import nilmdb.client from nilmdb.utils.printf import * -from nilmdb.utils.time import parse_time, format_time +from nilmdb.utils.time import (parse_time, timestamp_to_human, + timestamp_to_seconds) +from nilmdb.utils.interval import Interval import nilmtools @@ -13,13 +15,32 @@ import re import argparse class MissingDestination(Exception): - def __init__(self, src, layout, dest): + def __init__(self, src, dest): self.src = src - self.layout = layout - self.layout_type = layout.split('_')[0] - self.layout_count = int(layout.split('_')[1]) self.dest = dest - Exception.__init__(self, "destination path " + dest + " not found") + Exception.__init__(self, "destination path " + dest.path + " not found") + +class StreamInfo(object): + def __init__(self, info): + self.info = info + try: + self.path = info[0] + self.layout = info[1] + self.layout_type = self.layout.split('_')[0] + self.layout_count = int(self.layout.split('_')[1]) + self.total_count = self.layout_count + 1 + self.timestamp_min = info[2] + self.timestamp_max = info[3] + self.rows = info[4] + self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5]) + except IndexError, TypeError: + pass + + def __str__(self): + """Print stream info as a string""" + return sprintf("%s (%s), %.2fM rows, %.2f hours", + self.path, self.layout, self.rows / 1e6, + self.seconds / 3600.0) class Filter(object): @@ -28,8 +49,8 @@ class Filter(object): self._args = None self._client = None self._using_client = False - self.srcinfo = None - self.destinfo = None + self.src = None + self.dest = None @property def client(self): @@ -65,6 +86,11 @@ class Filter(object): self._parser = parser return parser + def interval_string(self, interval): + return sprintf("[ %s -> %s ]", + timestamp_to_human(interval.start), + timestamp_to_human(interval.end)) + def parse_args(self): args = self._parser.parse_args() self._args = args @@ -77,16 +103,15 @@ class Filter(object): src = self._client.stream_list(args.srcpath, extended = True) if len(src) != 1: raise Exception("source path " + args.srcpath + " not found") - self.srcinfo = src[0] + self.src = StreamInfo(src[0]) dest = self._client.stream_list(args.destpath, extended = True) if len(dest) != 1: - raise MissingDestination(self.srcinfo[0], self.srcinfo[1], - args.destpath) - self.destinfo = dest[0] + raise MissingDestination(self.src, StreamInfo([args.destpath])) + self.dest = StreamInfo(dest[0]) - print "Source:", self.stream_info_string(self.srcinfo) - print " Dest:", self.stream_info_string(self.destinfo) + print "Source:", self.src + print " Dest:", self.dest if args.dry_run: for interval in self.intervals(): @@ -98,40 +123,39 @@ class Filter(object): def intervals(self): """Generate all the intervals that this filter should process""" self._using_client = True - for i in self._client.stream_intervals( + saved_int = None + for (start, end) in self._client.stream_intervals( self._args.srcpath, diffpath = self._args.destpath, start = self._args.start, end = self._args.end): - yield i + + # Join adjacent intervals + if saved_int is not None: + if saved_int.end == start: + start = saved_int.start + else: + yield saved_int + saved_int = Interval(start, end) + if saved_int is not None: + yield saved_int self._using_client = False # Misc helpers def arg_time(self, toparse): """Parse a time string argument""" try: - return nilmdb.utils.time.parse_time(toparse).totimestamp() + return nilmdb.utils.time.parse_time(toparse) except ValueError as e: raise argparse.ArgumentTypeError(sprintf("%s \"%s\"", str(e), toparse)) - def stream_info_string(self, info): - """Print stream info as a string""" - return sprintf("%s (%s), %.2fM rows, %.2f hours", - info[0], info[1], info[4] / 1e6, info[5] / 3600) - - def interval_string(self, interval): - """Print interval as a string""" - return sprintf("[ %s -> %s ]", format_time(interval[0]), - format_time(interval[1])) - def check_dest_metadata(self, data): """See if the metadata jives, and complain if it doesn't. If there's no conflict, update the metadata to match 'data'.""" metadata = self._client.stream_get_metadata(self._args.destpath) - rows = self.destinfo[4] for key in data: wanted = str(data[key]) val = metadata.get(key, wanted) - if val != wanted and rows > 0: + if val != wanted and self.dest.rows > 0: m = "Metadata in destination stream:\n" m += " %s = %s\n" % (key, val) m += "doesn't match desired data:\n" @@ -145,9 +169,12 @@ class Filter(object): self._client.stream_update_metadata(self._args.destpath, data) # Main processing helper - def process(self, function, rows, args = None, partial = False): + def process_python(self, function, rows, args = None, partial = False): """Process data in chunks of 'rows' data at a time. + This provides data as nested Python lists and expects the same + back. + function: function to process the data rows: maximum number of rows to pass to 'function' at once args: tuple containing extra arguments to pass to 'function' @@ -157,10 +184,10 @@ class Filter(object): 'function' should be defined like: function(data, *args) - It will be passed an array containing up to 'rows' rows of + It will be passed a list containing up to 'rows' rows of data from the source stream, and any arguments passed in 'args'. It should transform the data as desired, and return a - new array of data, which will be inserted into the destination + new list of rdata, which will be inserted into the destination stream. """ if args is None: @@ -170,22 +197,21 @@ class Filter(object): src = self._args.srcpath dest = self._args.destpath - # Figure out how to format output data - dest_layout = self.destinfo[1].split('_')[1] - def int_formatter(row): - return ("%.6f " % row[0]) + " ".join(str(int(x)) for x in row[1:]) - def float_formatter(row): - return ("%.6f " % row[0]) + " ".join(repr(x) for x in row[1:]) - if "int" in dest_layout: - formatter = int_formatter + # Parse input data. We use homogenous types for now, which + # means the timestamp type will be either float or int. + if "int" in self.src.layout_type: + parser = lambda line: [ int(x) for x in line.split() ] else: - formatter = float_formatter + parser = lambda line: [ float(x) for x in line.split() ] + + # Format output data. + formatter = lambda row: " ".join([repr(x) for x in row]) + "\n" - for (start, end) in self.intervals(): - print "Processing", self.interval_string((start, end)) - with inserter(dest, start, end) as insert_ctx: + for interval in self.intervals(): + print "Processing", self.interval_string(interval) + with inserter(dest, interval.start, interval.end) as insert_ctx: src_array = [] - for line in extractor(src, start, end): + for line in extractor(src, interval.start, interval.end): # Read in data src_array.append([ float(x) for x in line.split() ]) @@ -195,7 +221,7 @@ class Filter(object): # Write result to destination out = [ formatter(row) for row in dest_array ] - insert_ctx.insert("\n".join(out) + "\n") + insert_ctx.insert("".join(out)) # Clear source array src_array = [] @@ -204,7 +230,7 @@ class Filter(object): if len(src_array) and partial: dest_array = function(src_array, *args) out = [ formatter(row) for row in dest_array ] - insert_ctx.insert("\n".join(out) + "\n") + insert_ctx.insert("".join(out)) def main(): # This is just a dummy function; actual filters can use the other @@ -212,8 +238,8 @@ def main(): f = Filter() parser = f.setup_parser() args = f.parse_args() - for (start, end) in f.intervals(): - print "Generic filter: need to handle", start, " to ", end + for i in f.intervals(): + print "Generic filter: need to handle", f.interval_string(i) if __name__ == "__main__": main() diff --git a/nilmtools/insert.py b/nilmtools/insert.py index 9c8ce72..fd81e1e 100755 --- a/nilmtools/insert.py +++ b/nilmtools/insert.py @@ -2,7 +2,9 @@ import nilmdb.client from nilmdb.utils.printf import * -from nilmdb.utils.time import parse_time, format_time +from nilmdb.utils.time import (parse_time, timestamp_to_human, + timestamp_to_seconds, seconds_to_timestamp, + rate_to_period, now as time_now) import nilmtools import time @@ -61,20 +63,22 @@ def main(args = None): # data_ts is the timestamp that we'll use for the current line data_ts_base = 0 data_ts_inc = 0 - data_ts_step = 1.0 / args.rate + data_ts_rate = args.rate # clock_ts is the imprecise "real" timestamp (from the filename, # comments, or or system clock) clock_ts = None def print_clock_updated(): - printf("Clock time updated to %s\n", format_time(clock_ts)) + printf("Clock time updated to %s\n", timestamp_to_human(clock_ts)) if data_ts_base != 0: diff = data_ts - clock_ts if diff >= 0: - printf(" (data timestamp ahead by %.6f sec)\n", diff) + printf(" (data timestamp ahead by %.6f sec)\n", + timestamp_to_seconds(diff)) else: - printf(" (data timestamp behind by %.6f sec)\n", -diff) + printf(" (data timestamp behind by %.6f sec)\n", + timestamp_to_seconds(-diff)) with client.stream_insert_context(args.path) as stream: for f in args.infile: @@ -92,7 +96,7 @@ def main(args = None): # Subtract 1 hour because files are created at the end # of the hour. Hopefully, we'll be able to use # internal comments and this value won't matter anyway. - clock_ts = parse_time(filename).totimestamp() - 3600 + clock_ts = parse_time(filename) - seconds_to_timestamp(3600) print_clock_updated() except ValueError: pass @@ -101,7 +105,8 @@ def main(args = None): # Read each line for line in f: - data_ts = data_ts_base + data_ts_inc * data_ts_step + data_ts = data_ts_base + rate_to_period(data_ts_rate, + data_ts_inc) # If no content other than the newline, skip it if len(line) <= 1: @@ -110,7 +115,7 @@ def main(args = None): # If line starts with a comment, look for a timestamp if line[0] == '#': try: - clock_ts = parse_time(line[1:]).totimestamp() + clock_ts = parse_time(line[1:]) print_clock_updated() except ValueError: pass @@ -118,30 +123,30 @@ def main(args = None): # If inserting live, use clock timestamp if live: - clock_ts = time.time() + clock_ts = time_now() # If we have a real timestamp, compare it to the data # timestamp, and make sure things match up. if clock_ts is not None: - if (data_ts - 10) > clock_ts: + if (data_ts - seconds_to_timestamp(10)) > clock_ts: # Accumulated line timestamps are in the future. # If we were to set data_ts=clock_ts, we'd create # an overlap, so we have to just bail out here. err = sprintf("Data is coming in too fast: data time " "is %s but clock time is only %s", - format_time(data_ts), - format_time(clock_ts)) + timestamp_to_human(data_ts), + timestamp_to_human(clock_ts)) raise ParseError(filename, err) - if (data_ts + 10) < clock_ts: + if (data_ts + seconds_to_timestamp(10)) < clock_ts: # Accumulated line timetamps are in the past. We # can just skip some time and leave a gap in the # data. if data_ts_base != 0: printf("Skipping data timestamp forward from " "%s to %s to match clock time\n", - format_time(data_ts), - format_time(clock_ts)) + timestamp_to_human(data_ts), + timestamp_to_human(clock_ts)) stream.finalize() data_ts_base = data_ts = clock_ts data_ts_inc = 0 @@ -166,7 +171,7 @@ def main(args = None): continue # Insert it - stream.insert("%.6f %s" % (data_ts, line)) + stream.insert("%d %s" % (data_ts, line)) print "Done" if __name__ == "__main__": diff --git a/scripts/decimate-it.sh b/scripts/decimate-it.sh new file mode 100755 index 0000000..eb1d65c --- /dev/null +++ b/scripts/decimate-it.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +for i in "$@"; do + nilmtool create /lees-compressor/no-leak/raw/$((4**$i)) float32_18 + /usr/bin/time -v nilm-decimate /lees-compressor/no-leak/raw/$((4**($i-1))) /lees-compressor/no-leak/raw/$((4**$i)) +done