#!/usr/bin/python import nilmdb.client from nilmdb.utils.printf import * from nilmdb.utils.time import (parse_time, timestamp_to_human, timestamp_to_seconds, seconds_to_timestamp, rate_to_period, now as time_now) import nilmtools import time import sys import re import argparse import subprocess import textwrap class ParseError(Exception): def __init__(self, filename, error): msg = filename + ": " + error super(ParseError, self).__init__(msg) def parse_args(argv = None): parser = argparse.ArgumentParser( formatter_class = argparse.RawDescriptionHelpFormatter, version = nilmtools.__version__, description = textwrap.dedent("""\ Insert large amount of data from an external source like ethstream. This code tracks two timestamps: (1) The 'data' timestamp is the precise timestamp corresponding to a particular row of data, and is the timestamp that gets inserted into the database. It increases by 'data_delta' for every row of input. 'data_delta' can come from one of two sources. If '--delta' is specified, it is pulled from the first column of data. If '--rate' is specified, 'data_delta' is set to a fixed value of (1 / rate). (2) The 'clock' timestamp is the less precise timestamp that gives the absolute time. It can come from two sources. If '--live' is specified, it is pulled directly from the system clock. If '--file' is specified, it is extracted from the input filename every time a new file is opened for read, and from comments that appear in the file. Small discrepencies between 'data' and 'clock' are ignored. If the 'data' timestamp ever differs from the 'clock' timestamp by more than 'max_gap' seconds: - If 'data' is running behind, there is a gap in the data, so it is stepped forward to match 'clock'. - If 'data' is running ahead, there is overlap in the data, and an error is raised. """)) parser.add_argument("-u", "--url", action="store", default="http://localhost/nilmdb/", help="NilmDB server URL (default: %(default)s)") group = parser.add_argument_group("Misc options") group.add_argument("-D", "--dry-run", action="store_true", help="Parse files, but don't insert any data") group.add_argument("-m", "--max-gap", action="store", default=10.0, metavar="SEC", type=float, help="Max discrepency between clock and data " "timestamps (default: %(default)s)") group = parser.add_argument_group("Data timestamp delta") exc = group.add_mutually_exclusive_group() exc.add_argument("-r", "--rate", action="store", default=8000.0, type=float, help="Data_delta is constant 1/RATE " "(default: %(default)s Hz)") exc.add_argument("-d", "--delta", action="store_true", help="Data_delta is the first number in each line") group = parser.add_argument_group("Clock timestamp source") exc = group.add_mutually_exclusive_group() exc.add_argument("-l", "--live", action="store_true", help="Use live system time for clock timestamp") exc.add_argument("-f", "--file", action="store_true", default=True, help="Use filename or comments for clock timestamp") group.add_argument("-o", "--offset-filename", metavar="SEC", action="store", default=-3600.0, type=float, help="Offset to add to filename timestamps " "(default: %(default)s)") group.add_argument("-O", "--offset-comment", metavar="SEC", action="store", default=0.0, type=float, help="Offset to add to comment timestamps " "(default: %(default)s)") group = parser.add_argument_group("Database path") group.add_argument("path", action="store", help="Path of stream, e.g. /foo/bar") group = parser.add_argument_group("Input files") group.add_argument("infile", type=argparse.FileType('r'), nargs='*', default=[sys.stdin], help="Input files (default: stdin)") args = parser.parse_args(argv) printf(" Stream path: %s\n", args.path) printf(" Data timestamp: ") if args.delta: printf("delta on each input line\n") else: printf("fixed rate %s Hz\n", repr(args.rate)) printf(" Clock timestamp: ") if args.live: printf("live system clock\n") else: printf("from filenames and comments\n") printf(" Filename offset: %s seconds\n", repr(args.offset_filename)) printf(" Comment offset: %s seconds\n", repr(args.offset_comment)) printf(" Max gap: %s seconds\n", repr(args.max_gap)) if args.dry_run: printf("Dry run (no data will be inserted)\n") return args def main(argv = None): args = parse_args(argv) client = nilmdb.client.Client(args.url) # data_ts is the timestamp that we'll use for the current line data_ts_base = 0 data_ts_inc = 0 data_ts_rate = args.rate data_ts_delta = 0 def get_data_ts(): if args.delta: return data_ts_base + data_ts_delta else: return data_ts_base + rate_to_period(data_ts_rate, data_ts_inc) # clock_ts is the imprecise "real" timestamp (from the filename, # comments, or system clock) clock_ts = None def print_clock_updated(): printf("Clock timestamp updated to %s\n", timestamp_to_human(clock_ts)) if data_ts_base != 0: diff = get_data_ts() - clock_ts if diff >= 0: printf(" (data timestamp ahead by %.6f sec)\n", timestamp_to_seconds(diff)) else: printf(" (data timestamp behind by %.6f sec)\n", timestamp_to_seconds(-diff)) offset_filename = seconds_to_timestamp(args.offset_filename) offset_comment = seconds_to_timestamp(args.offset_comment) max_gap = seconds_to_timestamp(args.max_gap) with client.stream_insert_context(args.path) as stream: for f in args.infile: filename = f.name printf("Processing %s\n", filename) # If the filename ends in .gz, re-open it with gzip to # decompress. if filename.endswith(".gz"): p = subprocess.Popen(["gzip", "-dc"], stdin = f, stdout = subprocess.PIPE) f = p.stdout # Try to get a real timestamp from the filename try: # Subtract 1 hour because files are created at the end # of the hour. Hopefully, we'll be able to use # internal comments and this value won't matter anyway. clock_ts = parse_time(filename) + offset_filename print_clock_updated() except ValueError: pass truncated_lines = 0 # Read each line for line in f: # Once in a while a line might be truncated, if we're # at the end of a file. Ignore it, but if we ignore # too many, bail out. if line[-1] != '\n': truncated_lines += 1 if truncated_lines > 3: raise ParseError(filename, "too many short lines") printf("Ignoring short line in %s\n", filename) continue # If no content other than the newline, skip it if len(line) <= 1: continue # If line starts with a comment, look for a timestamp if line[0] == '#': try: clock_ts = parse_time(line[1:]) + offset_comment print_clock_updated() except ValueError: pass continue # If --delta mode, increment data_ts_delta by the # delta from the file. if args.delta: try: (delta, line) = line.split(None, 1) data_ts_delta += float(delta) except ValueError: raise ParseError(filename, "can't parse delta") # Calculate data_ts for this row data_ts = get_data_ts() # If inserting live, use clock timestamp if args.live: clock_ts = time_now() # If we have a real timestamp, compare it to the data # timestamp, and make sure things match up. if clock_ts is not None: if (data_ts - max_gap) > clock_ts: # Accumulated line timestamps are in the future. # If we were to set data_ts=clock_ts, we'd create # an overlap, so we have to just bail out here. err = sprintf("Data is coming in too fast: data time " "is %s but clock time is only %s", timestamp_to_human(data_ts), timestamp_to_human(clock_ts)) raise ParseError(filename, err) if (data_ts + max_gap) < clock_ts: # Accumulated line timetamps are in the past. We # can just skip some time and leave a gap in the # data. if data_ts_base != 0: printf("Skipping data timestamp forward from " "%s to %s to match clock time\n", timestamp_to_human(data_ts), timestamp_to_human(clock_ts)) stream.finalize() data_ts_base = data_ts = clock_ts data_ts_inc = data_ts_delta = 0 # Don't use this clock time anymore until we update it clock_ts = None if data_ts_base == 0: raise ParseError(filename, "No idea what timestamp to use") # This line is legit, so increment timestamp (for --rate) data_ts_inc += 1 # Insert it if not args.dry_run: stream.insert("%d %s" % (data_ts, line)) print "Done" if __name__ == "__main__": main()