diff --git a/insert.py b/insert.py index d803cf5..a08a19d 100755 --- a/insert.py +++ b/insert.py @@ -2,17 +2,17 @@ import nilmdb from nilmdb.utils.printf import * -from nilmdb.utils import datetime_tz from nilmdb.utils.time import parse_time, format_time +import time import sys import re import argparse import subprocess class ParseError(Exception): - def __init__(self, filename, linenum, error): - msg = filename + ":" + str(linenum+1) + ": " + error + def __init__(self, filename, error): + msg = filename + ": " + error super(ParseError, self).__init__(msg) parser = argparse.ArgumentParser(description = """\ @@ -42,23 +42,33 @@ printf(" Data rate: %s Hz\n", repr(args.rate)) client = nilmdb.Client(args.url) +# Local copies to save dictionary lookups +live = args.live + # data_ts is the timestamp that we'll use for the current line -# (increased by 1 / rate every line) -data_ts = None +data_ts_base = 0 +data_ts_inc = 0 +data_ts_step = 1.0 / args.rate # clock_ts is the imprecise "real" timestamp (from the filename, # comments, or or system clock) clock_ts = None -lines_ok = 0 +def print_clock_updated(): + printf("Clock time updated to %s\n", format_time(clock_ts)) + if data_ts_base != 0: + diff = data_ts - clock_ts + if diff >= 0: + printf(" (data timestamp ahead by %.6fs)\n", diff) + else: + printf(" (data timestamp behind by %.6fs)\n", diff) with client.stream_insert_context(args.path) as stream: for f in args.infile: filename = f.name printf("Processing %s\n", filename) - # If the filename ends in .gz, open it with gzcat just to be - # nice. + # If the filename ends in .gz, open it with gzcat instead. if filename.endswith(".gz"): p = subprocess.Popen(["gzip", "-dc"], stdin = f, stdout = subprocess.PIPE) @@ -70,45 +80,35 @@ with client.stream_insert_context(args.path) as stream: # Hopefully, we'll be able to use internal comments and this value # won't matter anyway. clock_ts = parse_time(filename).totimestamp() - 3600 - printf("Clock time updated to %s\n", format_time(clock_ts)) + print_clock_updated() except ValueError: pass + truncated_lines = 0 + # Read each line - for (linenum, line) in enumerate(f): - - # Any comments? - if line.find('#') >= 0: - (line, comment) = line.split('#', 1) - line += '\n' - else: - comment = None - - # Figure out our best guess about the real time - if args.live: - clock_ts = datetime_tz.datetime_tz.now().totimestamp() - elif comment: + for line in f: + data_ts = data_ts_base + data_ts_inc * data_ts_step + + # If no content other than the newline, skip it + if len(line) <= 1: + continue + + # If line starts with a comment, look for a timestamp + if line[0] == '#': try: - clock_ts = parse_time(comment).totimestamp() - printf("Clock time updated to %s\n", format_time(clock_ts)) + clock_ts = parse_time(line[1:]).totimestamp() + print_clock_updated() except ValueError: pass - - # If no content other than the newline, skip this line - if len(line) <= 1: continue - # Make a timestamp for this line - if data_ts is None: - if clock_ts is None: - err = "No idea what timestamp to use" - raise ParseError(filename, linenum, err) - data_ts = clock_ts - else: - data_ts += 1.0 / args.rate - - # If we have a real timestamp, compare it to the line - # timestamp, and make sure things are working out. + # If inserting live, use clock timestamp + if live: + clock_ts = time.time() + + # If we have a real timestamp, compare it to the data + # timestamp, and make sure things match up. if clock_ts is not None: if (data_ts - 10) > clock_ts: # Accumulated line timestamps are in the future. @@ -117,32 +117,39 @@ with client.stream_insert_context(args.path) as stream: err = sprintf("Data is coming in too fast: data time is %s " "but clock time is only %s", format_time(data_ts), format_time(clock_ts)) - raise ParserError(filename, linenum, err) + raise ParseError(filename, err) if (data_ts + 10) < clock_ts: # Accumulated line timetamps are in the past. We # can just skip some time and leave a gap in the # data. - printf("Skipping data timestamp forward from %s to %s " - "to match clock time\n", - format_time(data_ts), format_time(clock_ts)) + if data_ts_base != 0: + printf("Skipping data timestamp forward from %s to %s " + "to match clock time\n", + format_time(data_ts), format_time(clock_ts)) stream.finalize() - data_ts = clock_ts + data_ts_base = data_ts = clock_ts + data_ts_inc = 0 # Don't use this clock time anymore until we update it clock_ts = None + if data_ts_base == 0: + raise ParseError(filename, "No idea what timestamp to use") + + # This line is legit, so increment timestamp + data_ts_inc += 1 + # Once in a while a line might be truncated, if we're at - # the end of a file. Accept it, but only if we've been - # getting good lines too + # the end of a file. Ignore it, but if we ignore too many, + # bail out. if line[-1] != '\n': - if lines_ok > 10: - printf("Ignoring short line at %s:%d\n", filename, linenum) - continue - else: - raise ParserError(filename, linenum, "short line") - lines_ok = 0 - - stream.insert_line(repr(data_ts) + " " + line) - lines_ok += 1 + truncated_lines += 1 + if truncated_lines > 3: + raise ParseError(filename, "too many short lines") + printf("Ignoring short line in %s\n", filename) + continue + + # Insert it + stream.insert("%.6f %s" % (data_ts, line)) print "Done"