nilmtools/nilmtools/insert.py
Jim Paris cfc66b6847 Fix flake8 errors throughout code
This found a small number of real bugs too, for example,
this one that looked weird because of a 2to3 conversion,
but was wrong both before and after:
-        except IndexError as TypeError:
+        except (IndexError, TypeError):
2020-08-06 17:58:41 -04:00

279 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
import nilmdb.client
from nilmdb.utils.printf import printf, sprintf
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds, seconds_to_timestamp,
rate_to_period, now as time_now)
import os
import nilmtools
import sys
import argparse
import subprocess
import textwrap
class ParseError(Exception):
def __init__(self, filename, error):
msg = filename + ": " + error
super(ParseError, self).__init__(msg)
def parse_args(argv=None):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
Insert large amount of data from an external source like ethstream.
This code tracks two timestamps:
(1) The 'data' timestamp is the precise timestamp corresponding to
a particular row of data, and is the timestamp that gets
inserted into the database. It increases by 'data_delta' for
every row of input.
'data_delta' can come from one of two sources. If '--delta'
is specified, it is pulled from the first column of data. If
'--rate' is specified, 'data_delta' is set to a fixed value of
(1 / rate).
(2) The 'clock' timestamp is the less precise timestamp that gives
the absolute time. It can come from two sources. If '--live'
is specified, it is pulled directly from the system clock. If
'--file' is specified, it is extracted from the input filename
every time a new file is opened for read, and from comments
that appear in the file.
Small discrepencies between 'data' and 'clock' are ignored. If
the 'data' timestamp ever differs from the 'clock' timestamp by
more than 'max_gap' seconds:
- If 'data' is running behind, there is a gap in the data, so it
is stepped forward to match 'clock'.
- If 'data' is running ahead, there is overlap in the data, and an
error is raised. If '--skip' is specified, the current file
is skipped instead of raising an error.
"""))
def_url = os.environ.get("NILMDB_URL", "http://localhost/nilmdb/")
parser.add_argument("-u", "--url", action="store", default=def_url,
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-v", "--version", action="version",
version=nilmtools.__version__)
group = parser.add_argument_group("Misc options")
group.add_argument("-D", "--dry-run", action="store_true",
help="Parse files, but don't insert any data")
group.add_argument("-s", "--skip", action="store_true",
help="Skip files if the data would overlap")
group.add_argument("-m", "--max-gap", action="store", default=10.0,
metavar="SEC", type=float,
help="Max discrepency between clock and data "
"timestamps (default: %(default)s)")
group = parser.add_argument_group("Data timestamp delta")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-r", "--rate", action="store", default=8000.0,
type=float,
help="Data_delta is constant 1/RATE "
"(default: %(default)s Hz)")
exc.add_argument("-d", "--delta", action="store_true",
help="Data_delta is the first number in each line")
group = parser.add_argument_group("Clock timestamp source")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-l", "--live", action="store_true",
help="Use live system time for clock timestamp")
exc.add_argument("-f", "--file", action="store_true", default=True,
help="Use filename or comments for clock timestamp")
group.add_argument("-o", "--offset-filename", metavar="SEC",
action="store", default=-3600.0, type=float,
help="Offset to add to filename timestamps "
"(default: %(default)s)")
group.add_argument("-O", "--offset-comment", metavar="SEC",
action="store", default=0.0, type=float,
help="Offset to add to comment timestamps "
"(default: %(default)s)")
group = parser.add_argument_group("Database path")
group.add_argument("path", action="store",
help="Path of stream, e.g. /foo/bar")
group = parser.add_argument_group("Input files")
group.add_argument("infile", type=argparse.FileType('rb'), nargs='*',
default=[sys.stdin],
help="Input files (default: stdin)")
args = parser.parse_args(argv)
printf(" Stream path: %s\n", args.path)
printf(" Data timestamp: ")
if args.delta:
printf("delta on each input line\n")
else:
printf("fixed rate %s Hz\n", repr(args.rate))
printf(" Clock timestamp: ")
if args.live:
printf("live system clock\n")
else:
printf("from filenames and comments\n")
printf(" Filename offset: %s seconds\n", repr(args.offset_filename))
printf(" Comment offset: %s seconds\n", repr(args.offset_comment))
printf(" Max gap: %s seconds\n", repr(args.max_gap))
if args.dry_run:
printf("Dry run (no data will be inserted)\n")
return args
def main(argv=None):
args = parse_args(argv)
client = nilmdb.client.Client(args.url)
# data_ts is the timestamp that we'll use for the current line
data_ts_base = 0
data_ts_inc = 0
data_ts_rate = args.rate
data_ts_delta = 0
def get_data_ts():
if args.delta:
return data_ts_base + data_ts_delta
else:
return data_ts_base + rate_to_period(data_ts_rate,
data_ts_inc)
# clock_ts is the imprecise "real" timestamp (from the filename,
# comments, or system clock)
clock_ts = None
def print_clock_updated():
printf("Clock timestamp updated to %s\n", timestamp_to_human(clock_ts))
if data_ts_base != 0:
diff = get_data_ts() - clock_ts
if diff >= 0:
printf(" (data timestamp ahead by %.6f sec)\n",
timestamp_to_seconds(diff))
else:
printf(" (data timestamp behind by %.6f sec)\n",
timestamp_to_seconds(-diff))
offset_filename = seconds_to_timestamp(args.offset_filename)
offset_comment = seconds_to_timestamp(args.offset_comment)
max_gap = seconds_to_timestamp(args.max_gap)
with client.stream_insert_context(args.path) as stream:
for f in args.infile:
filename = f.name
printf("Processing %s\n", filename)
# If the filename ends in .gz, re-open it with gzip to
# decompress.
if filename.endswith(".gz"):
p = subprocess.Popen(["gzip", "-dc"],
stdin=f, stdout=subprocess.PIPE)
f = p.stdout
# Try to get a real timestamp from the filename
try:
# Subtract 1 hour because files are created at the end
# of the hour. Hopefully, we'll be able to use
# internal comments and this value won't matter anyway.
clock_ts = parse_time(filename) + offset_filename
print_clock_updated()
except ValueError:
pass
# Read each line
for line in f:
# The last line in the file may be truncated.
# Ignore it; we shouldn't ever see more than one at the end.
if line[-1] != b'\n'[0]:
printf("Ignoring short line in %s\n", filename)
continue
# If no content other than the newline, skip it
if len(line) <= 1:
continue
# If line starts with a comment, look for a timestamp
if line[0] == b'#'[0]:
try:
comment = line[1:].decode('utf-8', errors='ignore')
clock_ts = parse_time(comment) + offset_comment
print_clock_updated()
except ValueError:
pass
# for some reason the following line doesn't show up as
# being covered, even though it definitely runs
continue # pragma: no cover
# If --delta mode, increment data_ts_delta by the
# delta from the file.
if args.delta:
try:
(delta, line) = line.split(None, 1)
data_ts_delta += float(delta)
except ValueError:
raise ParseError(filename, "can't parse delta")
# Calculate data_ts for this row
data_ts = get_data_ts()
# If inserting live, use clock timestamp
if args.live:
clock_ts = time_now()
# If we have a real timestamp, compare it to the data
# timestamp, and make sure things match up.
if clock_ts is not None:
if (data_ts - max_gap) > clock_ts:
# Accumulated line timestamps are in the future.
# If we were to set data_ts=clock_ts, we'd create
# an overlap, so we have to just bail out here.
err = sprintf("Data is coming in too fast: data time "
"is %s but clock time is only %s",
timestamp_to_human(data_ts),
timestamp_to_human(clock_ts))
if args.skip:
printf("%s\n", err)
printf("Skipping the remainder of this file\n")
break
raise ParseError(filename, err)
if (data_ts + max_gap) < clock_ts:
# Accumulated line timetamps are in the past. We
# can just skip some time and leave a gap in the
# data.
if data_ts_base != 0:
printf("Skipping data timestamp forward from "
"%s to %s to match clock time\n",
timestamp_to_human(data_ts),
timestamp_to_human(clock_ts))
stream.finalize()
data_ts_base = data_ts = clock_ts
data_ts_inc = data_ts_delta = 0
# Don't use this clock time anymore until we update it
clock_ts = None
if data_ts_base == 0:
raise ParseError(filename, "No idea what timestamp to use")
# This line is legit, so increment timestamp (for --rate)
data_ts_inc += 1
# Insert it
if not args.dry_run:
stream.insert(b"%d %s" % (data_ts, line))
print("Done")
if __name__ == "__main__":
main()