Speed up inserter by a factor of about 2; misc cleanups
This commit is contained in:
parent
fc6b2dfa5e
commit
a9db02d73e
117
insert.py
117
insert.py
|
@ -2,17 +2,17 @@
|
|||
|
||||
import nilmdb
|
||||
from nilmdb.utils.printf import *
|
||||
from nilmdb.utils import datetime_tz
|
||||
from nilmdb.utils.time import parse_time, format_time
|
||||
|
||||
import time
|
||||
import sys
|
||||
import re
|
||||
import argparse
|
||||
import subprocess
|
||||
|
||||
class ParseError(Exception):
|
||||
def __init__(self, filename, linenum, error):
|
||||
msg = filename + ":" + str(linenum+1) + ": " + error
|
||||
def __init__(self, filename, error):
|
||||
msg = filename + ": " + error
|
||||
super(ParseError, self).__init__(msg)
|
||||
|
||||
parser = argparse.ArgumentParser(description = """\
|
||||
|
@ -42,23 +42,33 @@ printf(" Data rate: %s Hz\n", repr(args.rate))
|
|||
|
||||
client = nilmdb.Client(args.url)
|
||||
|
||||
# Local copies to save dictionary lookups
|
||||
live = args.live
|
||||
|
||||
# data_ts is the timestamp that we'll use for the current line
|
||||
# (increased by 1 / rate every line)
|
||||
data_ts = None
|
||||
data_ts_base = 0
|
||||
data_ts_inc = 0
|
||||
data_ts_step = 1.0 / args.rate
|
||||
|
||||
# clock_ts is the imprecise "real" timestamp (from the filename,
|
||||
# comments, or or system clock)
|
||||
clock_ts = None
|
||||
|
||||
lines_ok = 0
|
||||
def print_clock_updated():
|
||||
printf("Clock time updated to %s\n", format_time(clock_ts))
|
||||
if data_ts_base != 0:
|
||||
diff = data_ts - clock_ts
|
||||
if diff >= 0:
|
||||
printf(" (data timestamp ahead by %.6fs)\n", diff)
|
||||
else:
|
||||
printf(" (data timestamp behind by %.6fs)\n", diff)
|
||||
|
||||
with client.stream_insert_context(args.path) as stream:
|
||||
for f in args.infile:
|
||||
filename = f.name
|
||||
printf("Processing %s\n", filename)
|
||||
|
||||
# If the filename ends in .gz, open it with gzcat just to be
|
||||
# nice.
|
||||
# If the filename ends in .gz, open it with gzcat instead.
|
||||
if filename.endswith(".gz"):
|
||||
p = subprocess.Popen(["gzip", "-dc"],
|
||||
stdin = f, stdout = subprocess.PIPE)
|
||||
|
@ -70,45 +80,35 @@ with client.stream_insert_context(args.path) as stream:
|
|||
# Hopefully, we'll be able to use internal comments and this value
|
||||
# won't matter anyway.
|
||||
clock_ts = parse_time(filename).totimestamp() - 3600
|
||||
printf("Clock time updated to %s\n", format_time(clock_ts))
|
||||
print_clock_updated()
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
truncated_lines = 0
|
||||
|
||||
# Read each line
|
||||
for (linenum, line) in enumerate(f):
|
||||
for line in f:
|
||||
data_ts = data_ts_base + data_ts_inc * data_ts_step
|
||||
|
||||
# Any comments?
|
||||
if line.find('#') >= 0:
|
||||
(line, comment) = line.split('#', 1)
|
||||
line += '\n'
|
||||
else:
|
||||
comment = None
|
||||
|
||||
# Figure out our best guess about the real time
|
||||
if args.live:
|
||||
clock_ts = datetime_tz.datetime_tz.now().totimestamp()
|
||||
elif comment:
|
||||
try:
|
||||
clock_ts = parse_time(comment).totimestamp()
|
||||
printf("Clock time updated to %s\n", format_time(clock_ts))
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# If no content other than the newline, skip this line
|
||||
# If no content other than the newline, skip it
|
||||
if len(line) <= 1:
|
||||
continue
|
||||
|
||||
# Make a timestamp for this line
|
||||
if data_ts is None:
|
||||
if clock_ts is None:
|
||||
err = "No idea what timestamp to use"
|
||||
raise ParseError(filename, linenum, err)
|
||||
data_ts = clock_ts
|
||||
else:
|
||||
data_ts += 1.0 / args.rate
|
||||
# If line starts with a comment, look for a timestamp
|
||||
if line[0] == '#':
|
||||
try:
|
||||
clock_ts = parse_time(line[1:]).totimestamp()
|
||||
print_clock_updated()
|
||||
except ValueError:
|
||||
pass
|
||||
continue
|
||||
|
||||
# If we have a real timestamp, compare it to the line
|
||||
# timestamp, and make sure things are working out.
|
||||
# If inserting live, use clock timestamp
|
||||
if live:
|
||||
clock_ts = time.time()
|
||||
|
||||
# If we have a real timestamp, compare it to the data
|
||||
# timestamp, and make sure things match up.
|
||||
if clock_ts is not None:
|
||||
if (data_ts - 10) > clock_ts:
|
||||
# Accumulated line timestamps are in the future.
|
||||
|
@ -117,32 +117,39 @@ with client.stream_insert_context(args.path) as stream:
|
|||
err = sprintf("Data is coming in too fast: data time is %s "
|
||||
"but clock time is only %s",
|
||||
format_time(data_ts), format_time(clock_ts))
|
||||
raise ParserError(filename, linenum, err)
|
||||
raise ParseError(filename, err)
|
||||
|
||||
if (data_ts + 10) < clock_ts:
|
||||
# Accumulated line timetamps are in the past. We
|
||||
# can just skip some time and leave a gap in the
|
||||
# data.
|
||||
printf("Skipping data timestamp forward from %s to %s "
|
||||
"to match clock time\n",
|
||||
format_time(data_ts), format_time(clock_ts))
|
||||
if data_ts_base != 0:
|
||||
printf("Skipping data timestamp forward from %s to %s "
|
||||
"to match clock time\n",
|
||||
format_time(data_ts), format_time(clock_ts))
|
||||
stream.finalize()
|
||||
data_ts = clock_ts
|
||||
data_ts_base = data_ts = clock_ts
|
||||
data_ts_inc = 0
|
||||
|
||||
# Don't use this clock time anymore until we update it
|
||||
clock_ts = None
|
||||
|
||||
# Once in a while a line might be truncated, if we're at
|
||||
# the end of a file. Accept it, but only if we've been
|
||||
# getting good lines too
|
||||
if line[-1] != '\n':
|
||||
if lines_ok > 10:
|
||||
printf("Ignoring short line at %s:%d\n", filename, linenum)
|
||||
continue
|
||||
else:
|
||||
raise ParserError(filename, linenum, "short line")
|
||||
lines_ok = 0
|
||||
if data_ts_base == 0:
|
||||
raise ParseError(filename, "No idea what timestamp to use")
|
||||
|
||||
stream.insert_line(repr(data_ts) + " " + line)
|
||||
lines_ok += 1
|
||||
# This line is legit, so increment timestamp
|
||||
data_ts_inc += 1
|
||||
|
||||
# Once in a while a line might be truncated, if we're at
|
||||
# the end of a file. Ignore it, but if we ignore too many,
|
||||
# bail out.
|
||||
if line[-1] != '\n':
|
||||
truncated_lines += 1
|
||||
if truncated_lines > 3:
|
||||
raise ParseError(filename, "too many short lines")
|
||||
printf("Ignoring short line in %s\n", filename)
|
||||
continue
|
||||
|
||||
# Insert it
|
||||
stream.insert("%.6f %s" % (data_ts, line))
|
||||
print "Done"
|
||||
|
|
Loading…
Reference in New Issue
Block a user