Browse Source

Improve nilm-insert to support deltas, etc, for accelerometer data

Jim Paris 9 years ago
2 changed files with 142 additions and 48 deletions
  1. +4
  2. +138

+ 4
- 0
Makefile View File

@@ -9,6 +9,10 @@ else

@make install >/dev/null
src/ --file --dry-run /test/foo </dev/null

@make install >/dev/null
src/ -U "" -D /lees*

+ 138
- 48
src/ View File

@@ -12,6 +12,7 @@ import sys
import re
import argparse
import subprocess
import textwrap

class ParseError(Exception):
def __init__(self, filename, error):
@@ -22,32 +23,103 @@ def parse_args(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = """\
Insert data from ethstream, either live (using the system time as a
reference) or prerecorded (using comments in the file as a reference).

The data is assumed to have been recorded at the specified rate.
Small discrepencies between the accumulated timestamps and the
reference time are ignored; larger discrepencies cause gaps to be
created in the stream. Overlapping data returns an error.
description = textwrap.dedent("""\
Insert large amount of data from an external source like ethstream.

This code tracks two timestamps:

(1) The 'data' timestamp is the precise timestamp corresponding to
a particular row of data, and is the timestamp that gets
inserted into the database. It increases by 'data_delta' for
every row of input.

'data_delta' can come from one of two sources. If '--delta'
is specified, it is pulled from the first column of data. If
'--rate' is specified, 'data_delta' is set to a fixed value of
(1 / rate).

(2) The 'clock' timestamp is the less precise timestamp that gives
the absolute time. It can come from two sources. If '--live'
is specified, it is pulled directly from the system clock. If
'--file' is specified, it is extracted from the input filename
every time a new file is opened for read, and from comments
that appear in the file.

Small discrepencies between 'data' and 'clock' are ignored. If
the 'data' timestamp ever differs from the 'clock' timestamp by
more than 'max_gap' seconds:

- If 'data' is running behind, there is a gap in the data, so it
is stepped forward to match 'clock'.

- If 'data' is running ahead, there is overlap in the data, and an
error is raised.
parser.add_argument("-u", "--url", action="store",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-r", "--rate", action="store", default=8000,
help="Data rate in Hz (default: %(default)s)")
parser.add_argument("-l", "--live", action="store_true",
help="Live capture; use system time to verify rate")
parser.add_argument("path", action="store",
help="Path of stream, e.g. /foo/bar")
parser.add_argument("infile", type=argparse.FileType('r'), nargs='*',
help="Input files (default: stdin)")
group = parser.add_argument_group("Misc options")
group.add_argument("-D", "--dry-run", action="store_true",
help="Parse files, but don't insert any data")
group.add_argument("-m", "--max-gap", action="store", default=10.0,
metavar="SEC", type=float,
help="Max discrepency between clock and data "
"timestamps (default: %(default)s)")

group = parser.add_argument_group("Data timestamp delta")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-r", "--rate", action="store", default=8000.0,
help="Data_delta is constant 1/RATE "
"(default: %(default)s Hz)")
exc.add_argument("-d", "--delta", action="store_true",
help="Data_delta is the first number in each line")

group = parser.add_argument_group("Clock timestamp source")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-l", "--live", action="store_true",
help="Use live system time for clock timestamp")
exc.add_argument("-f", "--file", action="store_true", default=True,
help="Use filename or comments for clock timestamp")
group.add_argument("-o", "--offset-filename", metavar="SEC",
action="store", default=-3600.0, type=float,
help="Offset to add to filename timestamps "
"(default: %(default)s)")
group.add_argument("-O", "--offset-comment", metavar="SEC",
action="store", default=0.0, type=float,
help="Offset to add to comment timestamps "
"(default: %(default)s)")

group = parser.add_argument_group("Database path")
group.add_argument("path", action="store",
help="Path of stream, e.g. /foo/bar")

group = parser.add_argument_group("Input files")
group.add_argument("infile", type=argparse.FileType('r'), nargs='*',
help="Input files (default: stdin)")

args = parser.parse_args(argv)

printf("Stream path: %s\n", args.path)
printf(" Data rate: %s Hz\n", repr(args.rate))
printf(" Stream path: %s\n", args.path)

printf(" Data timestamp: ")
printf("delta on each input line\n")
printf("fixed rate %s Hz\n", repr(args.rate))

printf(" Clock timestamp: ")
printf("live system clock\n")
printf("from filenames and comments\n")
printf(" Filename offset: %s seconds\n", repr(args.offset_filename))
printf(" Comment offset: %s seconds\n", repr(args.offset_comment))

printf(" Max gap: %s seconds\n", repr(args.max_gap))
if args.dry_run:
printf("Dry run (no data will be inserted)\n")

return args

@@ -56,22 +128,25 @@ def main(argv = None):

client = nilmdb.client.Client(args.url)

# Local copies to save dictionary lookups
live =

# data_ts is the timestamp that we'll use for the current line
data_ts_base = 0
data_ts_inc = 0
data_ts_rate = args.rate
def get_data_ts():
return data_ts_base
return data_ts_base + rate_to_period(data_ts_rate,

# clock_ts is the imprecise "real" timestamp (from the filename,
# comments, or or system clock)
# comments, or system clock)
clock_ts = None

def print_clock_updated():
printf("Clock time updated to %s\n", timestamp_to_human(clock_ts))
printf("Clock timestamp updated to %s\n", timestamp_to_human(clock_ts))
if data_ts_base != 0:
diff = data_ts - clock_ts
diff = get_data_ts() - clock_ts
if diff >= 0:
printf(" (data timestamp ahead by %.6f sec)\n",
@@ -79,12 +154,17 @@ def main(argv = None):
printf(" (data timestamp behind by %.6f sec)\n",

offset_filename = seconds_to_timestamp(args.offset_filename)
offset_comment = seconds_to_timestamp(args.offset_comment)
max_gap = seconds_to_timestamp(args.max_gap)

with client.stream_insert_context(args.path) as stream:
for f in args.infile:
filename =
printf("Processing %s\n", filename)

# If the filename ends in .gz, open it with gzcat instead.
# If the filename ends in .gz, re-open it with gzip to
# decompress.
if filename.endswith(".gz"):
p = subprocess.Popen(["gzip", "-dc"],
stdin = f, stdout = subprocess.PIPE)
@@ -95,7 +175,7 @@ def main(argv = None):
# Subtract 1 hour because files are created at the end
# of the hour. Hopefully, we'll be able to use
# internal comments and this value won't matter anyway.
clock_ts = parse_time(filename) - seconds_to_timestamp(3600)
clock_ts = parse_time(filename) + offset_filename
except ValueError:
@@ -104,8 +184,15 @@ def main(argv = None):

# Read each line
for line in f:
data_ts = data_ts_base + rate_to_period(data_ts_rate,
# Once in a while a line might be truncated, if we're
# at the end of a file. Ignore it, but if we ignore
# too many, bail out.
if line[-1] != '\n':
truncated_lines += 1
if truncated_lines > 3:
raise ParseError(filename, "too many short lines")
printf("Ignoring short line in %s\n", filename)

# If no content other than the newline, skip it
if len(line) <= 1:
@@ -114,20 +201,32 @@ def main(argv = None):
# If line starts with a comment, look for a timestamp
if line[0] == '#':
clock_ts = parse_time(line[1:])
clock_ts = parse_time(line[1:]) + offset_comment
except ValueError:

# If --delta mode, increment data_ts_base by the delta
# from the file.
(delta, line) = line.split(None, 1)
data_ts_base += float(delta)
except ValueError:
raise ParseError(filename, "can't parse delta")

# Calculate data_ts for this row
data_ts = get_data_ts()

# If inserting live, use clock timestamp
if live:
clock_ts = time_now()

# If we have a real timestamp, compare it to the data
# timestamp, and make sure things match up.
if clock_ts is not None:
if (data_ts - seconds_to_timestamp(10)) > clock_ts:
if (data_ts - max_gap) > clock_ts:
# Accumulated line timestamps are in the future.
# If we were to set data_ts=clock_ts, we'd create
# an overlap, so we have to just bail out here.
@@ -137,7 +236,7 @@ def main(argv = None):
raise ParseError(filename, err)

if (data_ts + seconds_to_timestamp(10)) < clock_ts:
if (data_ts + max_gap) < clock_ts:
# Accumulated line timetamps are in the past. We
# can just skip some time and leave a gap in the
# data.
@@ -147,7 +246,7 @@ def main(argv = None):
data_ts_base = data_ts = clock_ts
data_ts_base = clock_ts
data_ts_inc = 0

# Don't use this clock time anymore until we update it
@@ -156,21 +255,12 @@ def main(argv = None):
if data_ts_base == 0:
raise ParseError(filename, "No idea what timestamp to use")

# This line is legit, so increment timestamp
# This line is legit, so increment timestamp (for --rate)
data_ts_inc += 1

# Once in a while a line might be truncated, if we're at
# the end of a file. Ignore it, but if we ignore too many,
# bail out.
if line[-1] != '\n':
truncated_lines += 1
if truncated_lines > 3:
raise ParseError(filename, "too many short lines")
printf("Ignoring short line in %s\n", filename)

# Insert it
stream.insert("%d %s" % (data_ts, line))
if not args.dry_run:
stream.insert("%d %s" % (data_ts, line))
print "Done"

if __name__ == "__main__":