This found a small number of real bugs too, for example, this one that looked weird because of a 2to3 conversion, but was wrong both before and after: - except IndexError as TypeError: + except (IndexError, TypeError):
279 lines
11 KiB
Python
Executable File
279 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import nilmdb.client
|
|
from nilmdb.utils.printf import printf, sprintf
|
|
from nilmdb.utils.time import (parse_time, timestamp_to_human,
|
|
timestamp_to_seconds, seconds_to_timestamp,
|
|
rate_to_period, now as time_now)
|
|
|
|
import os
|
|
import nilmtools
|
|
import sys
|
|
import argparse
|
|
import subprocess
|
|
import textwrap
|
|
|
|
|
|
class ParseError(Exception):
|
|
def __init__(self, filename, error):
|
|
msg = filename + ": " + error
|
|
super(ParseError, self).__init__(msg)
|
|
|
|
|
|
def parse_args(argv=None):
|
|
parser = argparse.ArgumentParser(
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
description=textwrap.dedent("""\
|
|
Insert large amount of data from an external source like ethstream.
|
|
|
|
This code tracks two timestamps:
|
|
|
|
(1) The 'data' timestamp is the precise timestamp corresponding to
|
|
a particular row of data, and is the timestamp that gets
|
|
inserted into the database. It increases by 'data_delta' for
|
|
every row of input.
|
|
|
|
'data_delta' can come from one of two sources. If '--delta'
|
|
is specified, it is pulled from the first column of data. If
|
|
'--rate' is specified, 'data_delta' is set to a fixed value of
|
|
(1 / rate).
|
|
|
|
(2) The 'clock' timestamp is the less precise timestamp that gives
|
|
the absolute time. It can come from two sources. If '--live'
|
|
is specified, it is pulled directly from the system clock. If
|
|
'--file' is specified, it is extracted from the input filename
|
|
every time a new file is opened for read, and from comments
|
|
that appear in the file.
|
|
|
|
Small discrepencies between 'data' and 'clock' are ignored. If
|
|
the 'data' timestamp ever differs from the 'clock' timestamp by
|
|
more than 'max_gap' seconds:
|
|
|
|
- If 'data' is running behind, there is a gap in the data, so it
|
|
is stepped forward to match 'clock'.
|
|
|
|
- If 'data' is running ahead, there is overlap in the data, and an
|
|
error is raised. If '--skip' is specified, the current file
|
|
is skipped instead of raising an error.
|
|
"""))
|
|
def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
|
|
parser.add_argument("-u", "--url", action="store", default=def_url,
|
|
help="NilmDB server URL (default: %(default)s)")
|
|
parser.add_argument("-v", "--version", action="version",
|
|
version=nilmtools.__version__)
|
|
|
|
group = parser.add_argument_group("Misc options")
|
|
group.add_argument("-D", "--dry-run", action="store_true",
|
|
help="Parse files, but don't insert any data")
|
|
group.add_argument("-s", "--skip", action="store_true",
|
|
help="Skip files if the data would overlap")
|
|
group.add_argument("-m", "--max-gap", action="store", default=10.0,
|
|
metavar="SEC", type=float,
|
|
help="Max discrepency between clock and data "
|
|
"timestamps (default: %(default)s)")
|
|
|
|
group = parser.add_argument_group("Data timestamp delta")
|
|
exc = group.add_mutually_exclusive_group()
|
|
exc.add_argument("-r", "--rate", action="store", default=8000.0,
|
|
type=float,
|
|
help="Data_delta is constant 1/RATE "
|
|
"(default: %(default)s Hz)")
|
|
exc.add_argument("-d", "--delta", action="store_true",
|
|
help="Data_delta is the first number in each line")
|
|
|
|
group = parser.add_argument_group("Clock timestamp source")
|
|
exc = group.add_mutually_exclusive_group()
|
|
exc.add_argument("-l", "--live", action="store_true",
|
|
help="Use live system time for clock timestamp")
|
|
exc.add_argument("-f", "--file", action="store_true", default=True,
|
|
help="Use filename or comments for clock timestamp")
|
|
group.add_argument("-o", "--offset-filename", metavar="SEC",
|
|
action="store", default=-3600.0, type=float,
|
|
help="Offset to add to filename timestamps "
|
|
"(default: %(default)s)")
|
|
group.add_argument("-O", "--offset-comment", metavar="SEC",
|
|
action="store", default=0.0, type=float,
|
|
help="Offset to add to comment timestamps "
|
|
"(default: %(default)s)")
|
|
|
|
group = parser.add_argument_group("Database path")
|
|
group.add_argument("path", action="store",
|
|
help="Path of stream, e.g. /foo/bar")
|
|
|
|
group = parser.add_argument_group("Input files")
|
|
group.add_argument("infile", type=argparse.FileType('rb'), nargs='*',
|
|
default=[sys.stdin],
|
|
help="Input files (default: stdin)")
|
|
|
|
args = parser.parse_args(argv)
|
|
|
|
printf(" Stream path: %s\n", args.path)
|
|
|
|
printf(" Data timestamp: ")
|
|
if args.delta:
|
|
printf("delta on each input line\n")
|
|
else:
|
|
printf("fixed rate %s Hz\n", repr(args.rate))
|
|
|
|
printf(" Clock timestamp: ")
|
|
if args.live:
|
|
printf("live system clock\n")
|
|
else:
|
|
printf("from filenames and comments\n")
|
|
printf(" Filename offset: %s seconds\n", repr(args.offset_filename))
|
|
printf(" Comment offset: %s seconds\n", repr(args.offset_comment))
|
|
|
|
printf(" Max gap: %s seconds\n", repr(args.max_gap))
|
|
if args.dry_run:
|
|
printf("Dry run (no data will be inserted)\n")
|
|
|
|
return args
|
|
|
|
|
|
def main(argv=None):
|
|
args = parse_args(argv)
|
|
|
|
client = nilmdb.client.Client(args.url)
|
|
|
|
# data_ts is the timestamp that we'll use for the current line
|
|
data_ts_base = 0
|
|
data_ts_inc = 0
|
|
data_ts_rate = args.rate
|
|
data_ts_delta = 0
|
|
|
|
def get_data_ts():
|
|
if args.delta:
|
|
return data_ts_base + data_ts_delta
|
|
else:
|
|
return data_ts_base + rate_to_period(data_ts_rate,
|
|
data_ts_inc)
|
|
|
|
# clock_ts is the imprecise "real" timestamp (from the filename,
|
|
# comments, or system clock)
|
|
clock_ts = None
|
|
|
|
def print_clock_updated():
|
|
printf("Clock timestamp updated to %s\n", timestamp_to_human(clock_ts))
|
|
if data_ts_base != 0:
|
|
diff = get_data_ts() - clock_ts
|
|
if diff >= 0:
|
|
printf(" (data timestamp ahead by %.6f sec)\n",
|
|
timestamp_to_seconds(diff))
|
|
else:
|
|
printf(" (data timestamp behind by %.6f sec)\n",
|
|
timestamp_to_seconds(-diff))
|
|
|
|
offset_filename = seconds_to_timestamp(args.offset_filename)
|
|
offset_comment = seconds_to_timestamp(args.offset_comment)
|
|
max_gap = seconds_to_timestamp(args.max_gap)
|
|
|
|
with client.stream_insert_context(args.path) as stream:
|
|
for f in args.infile:
|
|
filename = f.name
|
|
printf("Processing %s\n", filename)
|
|
|
|
# If the filename ends in .gz, re-open it with gzip to
|
|
# decompress.
|
|
if filename.endswith(".gz"):
|
|
p = subprocess.Popen(["gzip", "-dc"],
|
|
stdin=f, stdout=subprocess.PIPE)
|
|
f = p.stdout
|
|
|
|
# Try to get a real timestamp from the filename
|
|
try:
|
|
# Subtract 1 hour because files are created at the end
|
|
# of the hour. Hopefully, we'll be able to use
|
|
# internal comments and this value won't matter anyway.
|
|
clock_ts = parse_time(filename) + offset_filename
|
|
print_clock_updated()
|
|
except ValueError:
|
|
pass
|
|
|
|
# Read each line
|
|
for line in f:
|
|
# The last line in the file may be truncated.
|
|
# Ignore it; we shouldn't ever see more than one at the end.
|
|
if line[-1] != b'\n'[0]:
|
|
printf("Ignoring short line in %s\n", filename)
|
|
continue
|
|
|
|
# If no content other than the newline, skip it
|
|
if len(line) <= 1:
|
|
continue
|
|
|
|
# If line starts with a comment, look for a timestamp
|
|
if line[0] == b'#'[0]:
|
|
try:
|
|
comment = line[1:].decode('utf-8', errors='ignore')
|
|
clock_ts = parse_time(comment) + offset_comment
|
|
print_clock_updated()
|
|
except ValueError:
|
|
pass
|
|
# for some reason the following line doesn't show up as
|
|
# being covered, even though it definitely runs
|
|
continue # pragma: no cover
|
|
|
|
# If --delta mode, increment data_ts_delta by the
|
|
# delta from the file.
|
|
if args.delta:
|
|
try:
|
|
(delta, line) = line.split(None, 1)
|
|
data_ts_delta += float(delta)
|
|
except ValueError:
|
|
raise ParseError(filename, "can't parse delta")
|
|
|
|
# Calculate data_ts for this row
|
|
data_ts = get_data_ts()
|
|
|
|
# If inserting live, use clock timestamp
|
|
if args.live:
|
|
clock_ts = time_now()
|
|
|
|
# If we have a real timestamp, compare it to the data
|
|
# timestamp, and make sure things match up.
|
|
if clock_ts is not None:
|
|
if (data_ts - max_gap) > clock_ts:
|
|
# Accumulated line timestamps are in the future.
|
|
# If we were to set data_ts=clock_ts, we'd create
|
|
# an overlap, so we have to just bail out here.
|
|
err = sprintf("Data is coming in too fast: data time "
|
|
"is %s but clock time is only %s",
|
|
timestamp_to_human(data_ts),
|
|
timestamp_to_human(clock_ts))
|
|
if args.skip:
|
|
printf("%s\n", err)
|
|
printf("Skipping the remainder of this file\n")
|
|
break
|
|
raise ParseError(filename, err)
|
|
|
|
if (data_ts + max_gap) < clock_ts:
|
|
# Accumulated line timetamps are in the past. We
|
|
# can just skip some time and leave a gap in the
|
|
# data.
|
|
if data_ts_base != 0:
|
|
printf("Skipping data timestamp forward from "
|
|
"%s to %s to match clock time\n",
|
|
timestamp_to_human(data_ts),
|
|
timestamp_to_human(clock_ts))
|
|
stream.finalize()
|
|
data_ts_base = data_ts = clock_ts
|
|
data_ts_inc = data_ts_delta = 0
|
|
|
|
# Don't use this clock time anymore until we update it
|
|
clock_ts = None
|
|
|
|
if data_ts_base == 0:
|
|
raise ParseError(filename, "No idea what timestamp to use")
|
|
|
|
# This line is legit, so increment timestamp (for --rate)
|
|
data_ts_inc += 1
|
|
|
|
# Insert it
|
|
if not args.dry_run:
|
|
stream.insert(b"%d %s" % (data_ts, line))
|
|
print("Done")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|