|
|
@@ -0,0 +1,121 @@ |
|
|
|
#!/usr/bin/python |
|
|
|
|
|
|
|
import nilmdb |
|
|
|
from nilmdb.utils.printf import * |
|
|
|
from nilmdb.utils import datetime_tz |
|
|
|
from nilmdb.utils.time import parse_time, format_time |
|
|
|
|
|
|
|
import sys |
|
|
|
import re |
|
|
|
import argparse |
|
|
|
|
|
|
|
class ParseError(Exception): |
|
|
|
def __init__(self, filename, linenum, error): |
|
|
|
s = filename + ":" + str(linenum) + ": " + error |
|
|
|
super(ParseError, self).__init__(error) |
|
|
|
|
|
|
|
parser = argparse.ArgumentParser(description = """\ |
|
|
|
Insert data from ethstream, either live (using the system time as a |
|
|
|
reference) or prerecorded (using comments in the file as a reference). |
|
|
|
|
|
|
|
The data is assumed to have been recorded at the specified rate. |
|
|
|
Small discrepencies between the accumulated timestamps and the |
|
|
|
reference time are ignored; larger discrepencies cause gaps to be |
|
|
|
created in the stream. Overlapping data returns an error. |
|
|
|
""", formatter_class = argparse.RawDescriptionHelpFormatter) |
|
|
|
parser.add_argument("-u", "--url", action="store", |
|
|
|
default="http://localhost:12380/", |
|
|
|
help="NilmDB server URL (default: %(default)s)") |
|
|
|
parser.add_argument("-r", "--rate", action="store", default=8000, type=float, |
|
|
|
help="Data rate in Hz (default: %(default)s)") |
|
|
|
parser.add_argument("-l", "--live", action="store_true", |
|
|
|
help="Live capture; use system time to verify rate") |
|
|
|
parser.add_argument("path", action="store", |
|
|
|
help="Path of stream, e.g. /foo/bar") |
|
|
|
parser.add_argument("infile", type=argparse.FileType('r'), nargs='*', |
|
|
|
default=[sys.stdin], help="Input files (default: stdin)") |
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
printf("Stream path: %s\n", args.path) |
|
|
|
printf(" Data rate: %s Hz\n", repr(args.rate)) |
|
|
|
|
|
|
|
client = nilmdb.Client(args.url) |
|
|
|
|
|
|
|
# data_ts is the timestamp that we'll use for the current line |
|
|
|
# (increased by 1 / rate every line) |
|
|
|
data_ts = None |
|
|
|
|
|
|
|
# clock_ts is the imprecise "real" timestamp (from the filename, |
|
|
|
# comments, or or system clock) |
|
|
|
clock_ts = None |
|
|
|
|
|
|
|
with client.stream_insert_context(args.path) as stream: |
|
|
|
for f in args.infile: |
|
|
|
printf("Processing %s\n", f.name) |
|
|
|
|
|
|
|
# Try to get a real timestamp from the filename |
|
|
|
try: |
|
|
|
clock_ts = parse_time(f.name).totimestamp() |
|
|
|
printf("Clock time updated to %s\n", format_time(clock_ts)) |
|
|
|
except ValueError: |
|
|
|
pass |
|
|
|
|
|
|
|
# Read each line |
|
|
|
for (linenum, line) in enumerate(f): |
|
|
|
|
|
|
|
# Any comments? |
|
|
|
if line.contains('#'): |
|
|
|
(line, comment) = line.split('#', 1) |
|
|
|
line += '\n' |
|
|
|
else: |
|
|
|
comment = None |
|
|
|
|
|
|
|
# Figure out our best guess about the real time |
|
|
|
if args.live: |
|
|
|
clock_ts = datetime_tz.datetime_tz.now().totimestamp() |
|
|
|
elif comment: |
|
|
|
try: |
|
|
|
clock_ts = parse_time(comment).totimestamp() |
|
|
|
printf("Clock time updated to %s\n", format_time(clock_ts)) |
|
|
|
except ValueError: |
|
|
|
pass |
|
|
|
|
|
|
|
# If no content other than the newline, skip this line |
|
|
|
if len(line) <= 1: |
|
|
|
continue |
|
|
|
|
|
|
|
# Make a timestamp for this line |
|
|
|
if data_ts is None: |
|
|
|
if clock_ts is None: |
|
|
|
err = "No idea what timestamp to use for the first line" |
|
|
|
raise ParseError(f.name, linenum, err) |
|
|
|
clock_ts = data_ts |
|
|
|
else: |
|
|
|
data_ts += 1.0 / args.rate |
|
|
|
|
|
|
|
# If we have a real timestamp, compare it to the line |
|
|
|
# timestamp, and make sure things are working out. |
|
|
|
if clock_ts is not None: |
|
|
|
if (data_ts - 10) > clock_ts: |
|
|
|
# Accumulated line timestamps are in the future. |
|
|
|
# If we were to set data_ts=clock_ts, we'd create |
|
|
|
# an overlap, so we have to just bail out here. |
|
|
|
err = sprintf("Data is coming in too fast: data time is %s " |
|
|
|
"but clock time is only %s", |
|
|
|
format_time(data_ts), format_time(clock_ts)) |
|
|
|
raise ParserError(f.name, linenum, err) |
|
|
|
|
|
|
|
if (data_ts + 10) < clock_ts: |
|
|
|
# Accumulated line timetamps are in the past. We |
|
|
|
# can just skip some time and leave a gap in the |
|
|
|
# data. |
|
|
|
printf("Skipping data timestamp forward from %s to %s " |
|
|
|
"to match clock time\n", |
|
|
|
format_time(data_ts), format_time(clock_ts)) |
|
|
|
stream.finalize() |
|
|
|
data_ts = clock_ts |
|
|
|
|
|
|
|
# Don't use this clock time anymore until we update it |
|
|
|
clock_ts = None |
|
|
|
|
|
|
|
stream.insert_line(line) |