|
- #!/usr/bin/python
-
- import nilmdb
- from nilmdb.utils.printf import *
- from nilmdb.utils import datetime_tz
- from nilmdb.utils.time import parse_time, format_time
-
- import sys
- import re
- import argparse
- import subprocess
-
- class ParseError(Exception):
- def __init__(self, filename, linenum, error):
- msg = filename + ":" + str(linenum+1) + ": " + error
- super(ParseError, self).__init__(msg)
-
- parser = argparse.ArgumentParser(description = """\
- Insert data from ethstream, either live (using the system time as a
- reference) or prerecorded (using comments in the file as a reference).
-
- The data is assumed to have been recorded at the specified rate.
- Small discrepencies between the accumulated timestamps and the
- reference time are ignored; larger discrepencies cause gaps to be
- created in the stream. Overlapping data returns an error.
- """, formatter_class = argparse.RawDescriptionHelpFormatter)
- parser.add_argument("-u", "--url", action="store",
- default="http://localhost:12380/",
- help="NilmDB server URL (default: %(default)s)")
- parser.add_argument("-r", "--rate", action="store", default=8000, type=float,
- help="Data rate in Hz (default: %(default)s)")
- parser.add_argument("-l", "--live", action="store_true",
- help="Live capture; use system time to verify rate")
- parser.add_argument("path", action="store",
- help="Path of stream, e.g. /foo/bar")
- parser.add_argument("infile", type=argparse.FileType('r'), nargs='*',
- default=[sys.stdin], help="Input files (default: stdin)")
- args = parser.parse_args()
-
- printf("Stream path: %s\n", args.path)
- printf(" Data rate: %s Hz\n", repr(args.rate))
-
- client = nilmdb.Client(args.url)
-
- # data_ts is the timestamp that we'll use for the current line
- # (increased by 1 / rate every line)
- data_ts = None
-
- # clock_ts is the imprecise "real" timestamp (from the filename,
- # comments, or or system clock)
- clock_ts = None
-
- lines_ok = 0
-
- with client.stream_insert_context(args.path) as stream:
- for f in args.infile:
- filename = f.name
- printf("Processing %s\n", filename)
-
- # If the filename ends in .gz, open it with gzcat just to be
- # nice.
- if filename.endswith(".gz"):
- p = subprocess.Popen(["gzip", "-dc"],
- stdin = f, stdout = subprocess.PIPE)
- f = p.stdout
-
- # Try to get a real timestamp from the filename
- try:
- # Subtract 1 hour because files are created at the end of the hour.
- # Hopefully, we'll be able to use internal comments and this value
- # won't matter anyway.
- clock_ts = parse_time(filename).totimestamp() - 3600
- printf("Clock time updated to %s\n", format_time(clock_ts))
- except ValueError:
- pass
-
- # Read each line
- for (linenum, line) in enumerate(f):
-
- # Any comments?
- if line.find('#') >= 0:
- (line, comment) = line.split('#', 1)
- line += '\n'
- else:
- comment = None
-
- # Figure out our best guess about the real time
- if args.live:
- clock_ts = datetime_tz.datetime_tz.now().totimestamp()
- elif comment:
- try:
- clock_ts = parse_time(comment).totimestamp()
- printf("Clock time updated to %s\n", format_time(clock_ts))
- except ValueError:
- pass
-
- # If no content other than the newline, skip this line
- if len(line) <= 1:
- continue
-
- # Make a timestamp for this line
- if data_ts is None:
- if clock_ts is None:
- err = "No idea what timestamp to use"
- raise ParseError(filename, linenum, err)
- data_ts = clock_ts
- else:
- data_ts += 1.0 / args.rate
-
- # If we have a real timestamp, compare it to the line
- # timestamp, and make sure things are working out.
- if clock_ts is not None:
- if (data_ts - 10) > clock_ts:
- # Accumulated line timestamps are in the future.
- # If we were to set data_ts=clock_ts, we'd create
- # an overlap, so we have to just bail out here.
- err = sprintf("Data is coming in too fast: data time is %s "
- "but clock time is only %s",
- format_time(data_ts), format_time(clock_ts))
- raise ParserError(filename, linenum, err)
-
- if (data_ts + 10) < clock_ts:
- # Accumulated line timetamps are in the past. We
- # can just skip some time and leave a gap in the
- # data.
- printf("Skipping data timestamp forward from %s to %s "
- "to match clock time\n",
- format_time(data_ts), format_time(clock_ts))
- stream.finalize()
- data_ts = clock_ts
-
- # Don't use this clock time anymore until we update it
- clock_ts = None
-
- # Once in a while a line might be truncated, if we're at
- # the end of a file. Accept it, but only if we've been
- # getting good lines too
- if line[-1] != '\n':
- if lines_ok > 10:
- printf("Ignoring short line at %s:%d\n", filename, linenum)
- continue
- else:
- raise ParserError(filename, linenum, "short line")
- lines_ok = 0
-
- stream.insert_line(repr(data_ts) + " " + line)
- lines_ok += 1
- print "Done"
|