|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- #!/usr/bin/python
-
- import nilmdb.client
- from nilmdb.utils.printf import *
- from nilmdb.utils.time import parse_time, format_time
-
- import time
- import sys
- import re
- import argparse
- import subprocess
-
- class ParseError(Exception):
- def __init__(self, filename, error):
- msg = filename + ": " + error
- super(ParseError, self).__init__(msg)
-
- parser = argparse.ArgumentParser(description = """\
- Insert data from ethstream, either live (using the system time as a
- reference) or prerecorded (using comments in the file as a reference).
-
- The data is assumed to have been recorded at the specified rate.
- Small discrepencies between the accumulated timestamps and the
- reference time are ignored; larger discrepencies cause gaps to be
- created in the stream. Overlapping data returns an error.
- """, formatter_class = argparse.RawDescriptionHelpFormatter)
- parser.add_argument("-u", "--url", action="store",
- default="http://localhost:12380/",
- help="NilmDB server URL (default: %(default)s)")
- parser.add_argument("-r", "--rate", action="store", default=8000, type=float,
- help="Data rate in Hz (default: %(default)s)")
- parser.add_argument("-l", "--live", action="store_true",
- help="Live capture; use system time to verify rate")
- parser.add_argument("path", action="store",
- help="Path of stream, e.g. /foo/bar")
- parser.add_argument("infile", type=argparse.FileType('r'), nargs='*',
- default=[sys.stdin], help="Input files (default: stdin)")
- args = parser.parse_args()
-
- printf("Stream path: %s\n", args.path)
- printf(" Data rate: %s Hz\n", repr(args.rate))
-
- client = nilmdb.client.Client(args.url)
-
- # Local copies to save dictionary lookups
- live = args.live
-
- # data_ts is the timestamp that we'll use for the current line
- data_ts_base = 0
- data_ts_inc = 0
- data_ts_step = 1.0 / args.rate
-
- # clock_ts is the imprecise "real" timestamp (from the filename,
- # comments, or or system clock)
- clock_ts = None
-
- def print_clock_updated():
- printf("Clock time updated to %s\n", format_time(clock_ts))
- if data_ts_base != 0:
- diff = data_ts - clock_ts
- if diff >= 0:
- printf(" (data timestamp ahead by %.6fs)\n", diff)
- else:
- printf(" (data timestamp behind by %.6fs)\n", diff)
-
- with client.stream_insert_context(args.path) as stream:
- for f in args.infile:
- filename = f.name
- printf("Processing %s\n", filename)
-
- # If the filename ends in .gz, open it with gzcat instead.
- if filename.endswith(".gz"):
- p = subprocess.Popen(["gzip", "-dc"],
- stdin = f, stdout = subprocess.PIPE)
- f = p.stdout
-
- # Try to get a real timestamp from the filename
- try:
- # Subtract 1 hour because files are created at the end of the hour.
- # Hopefully, we'll be able to use internal comments and this value
- # won't matter anyway.
- clock_ts = parse_time(filename).totimestamp() - 3600
- print_clock_updated()
- except ValueError:
- pass
-
- truncated_lines = 0
-
- # Read each line
- for line in f:
- data_ts = data_ts_base + data_ts_inc * data_ts_step
-
- # If no content other than the newline, skip it
- if len(line) <= 1:
- continue
-
- # If line starts with a comment, look for a timestamp
- if line[0] == '#':
- try:
- clock_ts = parse_time(line[1:]).totimestamp()
- print_clock_updated()
- except ValueError:
- pass
- continue
-
- # If inserting live, use clock timestamp
- if live:
- clock_ts = time.time()
-
- # If we have a real timestamp, compare it to the data
- # timestamp, and make sure things match up.
- if clock_ts is not None:
- if (data_ts - 10) > clock_ts:
- # Accumulated line timestamps are in the future.
- # If we were to set data_ts=clock_ts, we'd create
- # an overlap, so we have to just bail out here.
- err = sprintf("Data is coming in too fast: data time is %s "
- "but clock time is only %s",
- format_time(data_ts), format_time(clock_ts))
- raise ParseError(filename, err)
-
- if (data_ts + 10) < clock_ts:
- # Accumulated line timetamps are in the past. We
- # can just skip some time and leave a gap in the
- # data.
- if data_ts_base != 0:
- printf("Skipping data timestamp forward from %s to %s "
- "to match clock time\n",
- format_time(data_ts), format_time(clock_ts))
- stream.finalize()
- data_ts_base = data_ts = clock_ts
- data_ts_inc = 0
-
- # Don't use this clock time anymore until we update it
- clock_ts = None
-
- if data_ts_base == 0:
- raise ParseError(filename, "No idea what timestamp to use")
-
- # This line is legit, so increment timestamp
- data_ts_inc += 1
-
- # Once in a while a line might be truncated, if we're at
- # the end of a file. Ignore it, but if we ignore too many,
- # bail out.
- if line[-1] != '\n':
- truncated_lines += 1
- if truncated_lines > 3:
- raise ParseError(filename, "too many short lines")
- printf("Ignoring short line in %s\n", filename)
- continue
-
- # Insert it
- stream.insert("%.6f %s" % (data_ts, line))
- print "Done"
|