|
- #!/usr/bin/python
-
- import nilmdb.client
- from nilmdb.utils.printf import *
- from nilmdb.utils.time import (parse_time, timestamp_to_human,
- timestamp_to_seconds, seconds_to_timestamp,
- rate_to_period, now as time_now)
-
- import nilmtools
- import time
- import sys
- import re
- import argparse
- import subprocess
- import textwrap
-
- class ParseError(Exception):
- def __init__(self, filename, error):
- msg = filename + ": " + error
- super(ParseError, self).__init__(msg)
-
- def parse_args(argv = None):
- parser = argparse.ArgumentParser(
- formatter_class = argparse.RawDescriptionHelpFormatter,
- version = nilmtools.__version__,
- description = textwrap.dedent("""\
- Insert large amount of data from an external source like ethstream.
-
- This code tracks two timestamps:
-
- (1) The 'data' timestamp is the precise timestamp corresponding to
- a particular row of data, and is the timestamp that gets
- inserted into the database. It increases by 'data_delta' for
- every row of input.
-
- 'data_delta' can come from one of two sources. If '--delta'
- is specified, it is pulled from the first column of data. If
- '--rate' is specified, 'data_delta' is set to a fixed value of
- (1 / rate).
-
- (2) The 'clock' timestamp is the less precise timestamp that gives
- the absolute time. It can come from two sources. If '--live'
- is specified, it is pulled directly from the system clock. If
- '--file' is specified, it is extracted from the input filename
- every time a new file is opened for read, and from comments
- that appear in the file.
-
- Small discrepencies between 'data' and 'clock' are ignored. If
- the 'data' timestamp ever differs from the 'clock' timestamp by
- more than 'max_gap' seconds:
-
- - If 'data' is running behind, there is a gap in the data, so it
- is stepped forward to match 'clock'.
-
- - If 'data' is running ahead, there is overlap in the data, and an
- error is raised.
- """))
- parser.add_argument("-u", "--url", action="store",
- default="http://localhost/nilmdb/",
- help="NilmDB server URL (default: %(default)s)")
- group = parser.add_argument_group("Misc options")
- group.add_argument("-D", "--dry-run", action="store_true",
- help="Parse files, but don't insert any data")
- group.add_argument("-m", "--max-gap", action="store", default=10.0,
- metavar="SEC", type=float,
- help="Max discrepency between clock and data "
- "timestamps (default: %(default)s)")
-
- group = parser.add_argument_group("Data timestamp delta")
- exc = group.add_mutually_exclusive_group()
- exc.add_argument("-r", "--rate", action="store", default=8000.0,
- type=float,
- help="Data_delta is constant 1/RATE "
- "(default: %(default)s Hz)")
- exc.add_argument("-d", "--delta", action="store_true",
- help="Data_delta is the first number in each line")
-
- group = parser.add_argument_group("Clock timestamp source")
- exc = group.add_mutually_exclusive_group()
- exc.add_argument("-l", "--live", action="store_true",
- help="Use live system time for clock timestamp")
- exc.add_argument("-f", "--file", action="store_true", default=True,
- help="Use filename or comments for clock timestamp")
- group.add_argument("-o", "--offset-filename", metavar="SEC",
- action="store", default=-3600.0, type=float,
- help="Offset to add to filename timestamps "
- "(default: %(default)s)")
- group.add_argument("-O", "--offset-comment", metavar="SEC",
- action="store", default=0.0, type=float,
- help="Offset to add to comment timestamps "
- "(default: %(default)s)")
-
- group = parser.add_argument_group("Database path")
- group.add_argument("path", action="store",
- help="Path of stream, e.g. /foo/bar")
-
- group = parser.add_argument_group("Input files")
- group.add_argument("infile", type=argparse.FileType('r'), nargs='*',
- default=[sys.stdin],
- help="Input files (default: stdin)")
-
- args = parser.parse_args(argv)
-
- printf(" Stream path: %s\n", args.path)
-
- printf(" Data timestamp: ")
- if args.delta:
- printf("delta on each input line\n")
- else:
- printf("fixed rate %s Hz\n", repr(args.rate))
-
- printf(" Clock timestamp: ")
- if args.live:
- printf("live system clock\n")
- else:
- printf("from filenames and comments\n")
- printf(" Filename offset: %s seconds\n", repr(args.offset_filename))
- printf(" Comment offset: %s seconds\n", repr(args.offset_comment))
-
- printf(" Max gap: %s seconds\n", repr(args.max_gap))
- if args.dry_run:
- printf("Dry run (no data will be inserted)\n")
-
- return args
-
- def main(argv = None):
- args = parse_args(argv)
-
- client = nilmdb.client.Client(args.url)
-
- # data_ts is the timestamp that we'll use for the current line
- data_ts_base = 0
- data_ts_inc = 0
- data_ts_rate = args.rate
- data_ts_delta = 0
- def get_data_ts():
- if args.delta:
- return data_ts_base + data_ts_delta
- else:
- return data_ts_base + rate_to_period(data_ts_rate,
- data_ts_inc)
-
- # clock_ts is the imprecise "real" timestamp (from the filename,
- # comments, or system clock)
- clock_ts = None
-
- def print_clock_updated():
- printf("Clock timestamp updated to %s\n", timestamp_to_human(clock_ts))
- if data_ts_base != 0:
- diff = get_data_ts() - clock_ts
- if diff >= 0:
- printf(" (data timestamp ahead by %.6f sec)\n",
- timestamp_to_seconds(diff))
- else:
- printf(" (data timestamp behind by %.6f sec)\n",
- timestamp_to_seconds(-diff))
-
- offset_filename = seconds_to_timestamp(args.offset_filename)
- offset_comment = seconds_to_timestamp(args.offset_comment)
- max_gap = seconds_to_timestamp(args.max_gap)
-
- with client.stream_insert_context(args.path) as stream:
- for f in args.infile:
- filename = f.name
- printf("Processing %s\n", filename)
-
- # If the filename ends in .gz, re-open it with gzip to
- # decompress.
- if filename.endswith(".gz"):
- p = subprocess.Popen(["gzip", "-dc"],
- stdin = f, stdout = subprocess.PIPE)
- f = p.stdout
-
- # Try to get a real timestamp from the filename
- try:
- # Subtract 1 hour because files are created at the end
- # of the hour. Hopefully, we'll be able to use
- # internal comments and this value won't matter anyway.
- clock_ts = parse_time(filename) + offset_filename
- print_clock_updated()
- except ValueError:
- pass
-
- truncated_lines = 0
-
- # Read each line
- for line in f:
- # Once in a while a line might be truncated, if we're
- # at the end of a file. Ignore it, but if we ignore
- # too many, bail out.
- if line[-1] != '\n':
- truncated_lines += 1
- if truncated_lines > 3:
- raise ParseError(filename, "too many short lines")
- printf("Ignoring short line in %s\n", filename)
- continue
-
- # If no content other than the newline, skip it
- if len(line) <= 1:
- continue
-
- # If line starts with a comment, look for a timestamp
- if line[0] == '#':
- try:
- clock_ts = parse_time(line[1:]) + offset_comment
- print_clock_updated()
- except ValueError:
- pass
- continue
-
- # If --delta mode, increment data_ts_delta by the
- # delta from the file.
- if args.delta:
- try:
- (delta, line) = line.split(None, 1)
- data_ts_delta += float(delta)
- except ValueError:
- raise ParseError(filename, "can't parse delta")
-
- # Calculate data_ts for this row
- data_ts = get_data_ts()
-
- # If inserting live, use clock timestamp
- if args.live:
- clock_ts = time_now()
-
- # If we have a real timestamp, compare it to the data
- # timestamp, and make sure things match up.
- if clock_ts is not None:
- if (data_ts - max_gap) > clock_ts:
- # Accumulated line timestamps are in the future.
- # If we were to set data_ts=clock_ts, we'd create
- # an overlap, so we have to just bail out here.
- err = sprintf("Data is coming in too fast: data time "
- "is %s but clock time is only %s",
- timestamp_to_human(data_ts),
- timestamp_to_human(clock_ts))
- raise ParseError(filename, err)
-
- if (data_ts + max_gap) < clock_ts:
- # Accumulated line timetamps are in the past. We
- # can just skip some time and leave a gap in the
- # data.
- if data_ts_base != 0:
- printf("Skipping data timestamp forward from "
- "%s to %s to match clock time\n",
- timestamp_to_human(data_ts),
- timestamp_to_human(clock_ts))
- stream.finalize()
- data_ts_base = data_ts = clock_ts
- data_ts_inc = data_ts_delta = 0
-
- # Don't use this clock time anymore until we update it
- clock_ts = None
-
- if data_ts_base == 0:
- raise ParseError(filename, "No idea what timestamp to use")
-
- # This line is legit, so increment timestamp (for --rate)
- data_ts_inc += 1
-
- # Insert it
- if not args.dry_run:
- stream.insert("%d %s" % (data_ts, line))
- print "Done"
-
- if __name__ == "__main__":
- main()
|