Compare commits
10 Commits
nilmtools-
...
nilmtools-
Author | SHA1 | Date | |
---|---|---|---|
60f09427cf | |||
d6d31190eb | |||
2ec574c59d | |||
1988955671 | |||
36e5af4be1 | |||
ca175bd9dd | |||
aa9656bc10 | |||
10ab2cc2de | |||
eb6d7a8809 | |||
c8be6755ae |
7
Makefile
7
Makefile
@@ -9,6 +9,13 @@ else
|
||||
endif
|
||||
|
||||
test:
|
||||
src/decimate.py
|
||||
|
||||
test_insert:
|
||||
@make install >/dev/null
|
||||
src/insert.py --file --dry-run /test/foo </dev/null
|
||||
|
||||
test_copy:
|
||||
@make install >/dev/null
|
||||
src/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees*
|
||||
|
||||
|
@@ -6,6 +6,7 @@ Prerequisites:
|
||||
|
||||
# Runtime and build environments
|
||||
sudo apt-get install python2.7 python2.7-dev python-setuptools
|
||||
sudo apt-get install python-numpy python-scipy python-matplotlib
|
||||
|
||||
nilmdb (1.3.1+)
|
||||
|
||||
|
1
setup.py
1
setup.py
@@ -75,6 +75,7 @@ setup(name='nilmtools',
|
||||
'nilm-decimate-auto = nilmtools.decimate_auto:main',
|
||||
'nilm-insert = nilmtools.insert:main',
|
||||
'nilm-copy = nilmtools.copy_one:main',
|
||||
'nilm-prep = nilmtools.prep:main',
|
||||
'nilm-copy-wildcard = nilmtools.copy_wildcard:main',
|
||||
'nilm-sinefit = nilmtools.sinefit:main',
|
||||
],
|
||||
|
@@ -41,41 +41,45 @@ def main(argv = None):
|
||||
|
||||
# If source is decimated, we have to decimate a bit differently
|
||||
if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath):
|
||||
n = f.src.layout_count // 3
|
||||
f.process_python(function = decimate_again, rows = args.factor,
|
||||
args = (n,))
|
||||
again = True
|
||||
else:
|
||||
n = f.src.layout_count
|
||||
f.process_python(function = decimate_first, rows = args.factor,
|
||||
args = (n,))
|
||||
again = False
|
||||
f.process_numpy(decimate, args = (args.factor, again))
|
||||
|
||||
def decimate_first(data, n):
|
||||
"""Decimate original data -- result has 3 times as many columns"""
|
||||
# For this simple calculation, converting to a Numpy array
|
||||
# and doing the math is slower than just doing it directly.
|
||||
rows = iter(data)
|
||||
r_sum = r_min = r_max = rows.next()
|
||||
for row in rows:
|
||||
r_sum = map(operator.add, r_sum, row)
|
||||
r_min = map(min, r_min, row)
|
||||
r_max = map(max, r_max, row)
|
||||
r_mean = [ x / len(data) for x in r_sum ]
|
||||
return [ [ r_mean[0] ] + r_mean[1:] + r_min[1:] + r_max[1:] ]
|
||||
def decimate(data, interval, args, insert_function, final):
|
||||
"""Decimate data"""
|
||||
(factor, again) = args
|
||||
(n, m) = data.shape
|
||||
|
||||
def decimate_again(data, n):
|
||||
"""Decimate already-decimated data -- result has the same number
|
||||
of columns"""
|
||||
rows = iter(data)
|
||||
r = rows.next()
|
||||
r_sum = r[0:(n+1)]
|
||||
r_min = r[(n+1):(2*n+1)]
|
||||
r_max = r[(2*n+1):(3*n+1)]
|
||||
for r in rows:
|
||||
r_sum = map(operator.add, r_sum, r[0:(n+1)])
|
||||
r_min = map(min, r_min, r[(n+1):(2*n+1)])
|
||||
r_max = map(max, r_max, r[(2*n+1):(3*n+1)])
|
||||
r_mean = [ x / len(data) for x in r_sum ]
|
||||
return [ r_mean + r_min + r_max ]
|
||||
# Figure out which columns to use as the source for mean, min, and max,
|
||||
# depending on whether this is the first decimation or we're decimating
|
||||
# again. Note that we include the timestamp in the means.
|
||||
if again:
|
||||
c = (m - 1) // 3
|
||||
# e.g. c = 3
|
||||
# ts mean1 mean2 mean3 min1 min2 min3 max1 max2 max3
|
||||
mean_col = slice(0, c + 1)
|
||||
min_col = slice(c + 1, 2 * c + 1)
|
||||
max_col = slice(2 * c + 1, 3 * c + 1)
|
||||
else:
|
||||
mean_col = slice(0, m)
|
||||
min_col = slice(1, m)
|
||||
max_col = slice(1, m)
|
||||
|
||||
# Discard extra rows that aren't a multiple of factor
|
||||
n = n // factor * factor
|
||||
data = data[:n,:]
|
||||
|
||||
# Reshape it into 3D so we can process 'factor' rows at a time
|
||||
data.shape = (n // factor, factor, m)
|
||||
|
||||
# Fill the result
|
||||
out = np.c_[ np.mean(data[:,:,mean_col], axis=1),
|
||||
np.min(data[:,:,min_col], axis=1),
|
||||
np.max(data[:,:,max_col], axis=1) ]
|
||||
|
||||
insert_function(out)
|
||||
return n
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
@@ -367,8 +367,11 @@ class Filter(object):
|
||||
extract = extractor(self.src.path, interval.start, interval.end)
|
||||
old_array = np.array([])
|
||||
for batched in batch(extract, rows):
|
||||
# Read in this batch of data
|
||||
new_array = np.loadtxt(batched)
|
||||
# Read in this batch of data. This turns out to
|
||||
# be a very fast way to read and convert it (order
|
||||
# of magnitude faster than numpy.loadtxt)
|
||||
new_array = np.fromstring("\n".join(batched), sep=' ')
|
||||
new_array = new_array.reshape(-1, self.src.total_count)
|
||||
|
||||
# If we still had old data left, combine it
|
||||
if old_array.shape[0] != 0:
|
||||
|
185
src/insert.py
185
src/insert.py
@@ -12,6 +12,7 @@ import sys
|
||||
import re
|
||||
import argparse
|
||||
import subprocess
|
||||
import textwrap
|
||||
|
||||
class ParseError(Exception):
|
||||
def __init__(self, filename, error):
|
||||
@@ -22,32 +23,103 @@ def parse_args(argv = None):
|
||||
parser = argparse.ArgumentParser(
|
||||
formatter_class = argparse.RawDescriptionHelpFormatter,
|
||||
version = nilmtools.__version__,
|
||||
description = """\
|
||||
Insert data from ethstream, either live (using the system time as a
|
||||
reference) or prerecorded (using comments in the file as a reference).
|
||||
description = textwrap.dedent("""\
|
||||
Insert large amount of data from an external source like ethstream.
|
||||
|
||||
The data is assumed to have been recorded at the specified rate.
|
||||
Small discrepencies between the accumulated timestamps and the
|
||||
reference time are ignored; larger discrepencies cause gaps to be
|
||||
created in the stream. Overlapping data returns an error.
|
||||
""")
|
||||
This code tracks two timestamps:
|
||||
|
||||
(1) The 'data' timestamp is the precise timestamp corresponding to
|
||||
a particular row of data, and is the timestamp that gets
|
||||
inserted into the database. It increases by 'data_delta' for
|
||||
every row of input.
|
||||
|
||||
'data_delta' can come from one of two sources. If '--delta'
|
||||
is specified, it is pulled from the first column of data. If
|
||||
'--rate' is specified, 'data_delta' is set to a fixed value of
|
||||
(1 / rate).
|
||||
|
||||
(2) The 'clock' timestamp is the less precise timestamp that gives
|
||||
the absolute time. It can come from two sources. If '--live'
|
||||
is specified, it is pulled directly from the system clock. If
|
||||
'--file' is specified, it is extracted from the input filename
|
||||
every time a new file is opened for read, and from comments
|
||||
that appear in the file.
|
||||
|
||||
Small discrepencies between 'data' and 'clock' are ignored. If
|
||||
the 'data' timestamp ever differs from the 'clock' timestamp by
|
||||
more than 'max_gap' seconds:
|
||||
|
||||
- If 'data' is running behind, there is a gap in the data, so it
|
||||
is stepped forward to match 'clock'.
|
||||
|
||||
- If 'data' is running ahead, there is overlap in the data, and an
|
||||
error is raised.
|
||||
"""))
|
||||
parser.add_argument("-u", "--url", action="store",
|
||||
default="http://localhost/nilmdb/",
|
||||
help="NilmDB server URL (default: %(default)s)")
|
||||
parser.add_argument("-r", "--rate", action="store", default=8000,
|
||||
type=float,
|
||||
help="Data rate in Hz (default: %(default)s)")
|
||||
parser.add_argument("-l", "--live", action="store_true",
|
||||
help="Live capture; use system time to verify rate")
|
||||
parser.add_argument("path", action="store",
|
||||
help="Path of stream, e.g. /foo/bar")
|
||||
parser.add_argument("infile", type=argparse.FileType('r'), nargs='*',
|
||||
default=[sys.stdin],
|
||||
help="Input files (default: stdin)")
|
||||
group = parser.add_argument_group("Misc options")
|
||||
group.add_argument("-D", "--dry-run", action="store_true",
|
||||
help="Parse files, but don't insert any data")
|
||||
group.add_argument("-m", "--max-gap", action="store", default=10.0,
|
||||
metavar="SEC", type=float,
|
||||
help="Max discrepency between clock and data "
|
||||
"timestamps (default: %(default)s)")
|
||||
|
||||
group = parser.add_argument_group("Data timestamp delta")
|
||||
exc = group.add_mutually_exclusive_group()
|
||||
exc.add_argument("-r", "--rate", action="store", default=8000.0,
|
||||
type=float,
|
||||
help="Data_delta is constant 1/RATE "
|
||||
"(default: %(default)s Hz)")
|
||||
exc.add_argument("-d", "--delta", action="store_true",
|
||||
help="Data_delta is the first number in each line")
|
||||
|
||||
group = parser.add_argument_group("Clock timestamp source")
|
||||
exc = group.add_mutually_exclusive_group()
|
||||
exc.add_argument("-l", "--live", action="store_true",
|
||||
help="Use live system time for clock timestamp")
|
||||
exc.add_argument("-f", "--file", action="store_true", default=True,
|
||||
help="Use filename or comments for clock timestamp")
|
||||
group.add_argument("-o", "--offset-filename", metavar="SEC",
|
||||
action="store", default=-3600.0, type=float,
|
||||
help="Offset to add to filename timestamps "
|
||||
"(default: %(default)s)")
|
||||
group.add_argument("-O", "--offset-comment", metavar="SEC",
|
||||
action="store", default=0.0, type=float,
|
||||
help="Offset to add to comment timestamps "
|
||||
"(default: %(default)s)")
|
||||
|
||||
group = parser.add_argument_group("Database path")
|
||||
group.add_argument("path", action="store",
|
||||
help="Path of stream, e.g. /foo/bar")
|
||||
|
||||
group = parser.add_argument_group("Input files")
|
||||
group.add_argument("infile", type=argparse.FileType('r'), nargs='*',
|
||||
default=[sys.stdin],
|
||||
help="Input files (default: stdin)")
|
||||
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
printf("Stream path: %s\n", args.path)
|
||||
printf(" Data rate: %s Hz\n", repr(args.rate))
|
||||
printf(" Stream path: %s\n", args.path)
|
||||
|
||||
printf(" Data timestamp: ")
|
||||
if args.delta:
|
||||
printf("delta on each input line\n")
|
||||
else:
|
||||
printf("fixed rate %s Hz\n", repr(args.rate))
|
||||
|
||||
printf(" Clock timestamp: ")
|
||||
if args.live:
|
||||
printf("live system clock\n")
|
||||
else:
|
||||
printf("from filenames and comments\n")
|
||||
printf(" Filename offset: %s seconds\n", repr(args.offset_filename))
|
||||
printf(" Comment offset: %s seconds\n", repr(args.offset_comment))
|
||||
|
||||
printf(" Max gap: %s seconds\n", repr(args.max_gap))
|
||||
if args.dry_run:
|
||||
printf("Dry run (no data will be inserted)\n")
|
||||
|
||||
return args
|
||||
|
||||
@@ -56,22 +128,26 @@ def main(argv = None):
|
||||
|
||||
client = nilmdb.client.Client(args.url)
|
||||
|
||||
# Local copies to save dictionary lookups
|
||||
live = args.live
|
||||
|
||||
# data_ts is the timestamp that we'll use for the current line
|
||||
data_ts_base = 0
|
||||
data_ts_inc = 0
|
||||
data_ts_rate = args.rate
|
||||
data_ts_delta = 0
|
||||
def get_data_ts():
|
||||
if args.delta:
|
||||
return data_ts_base + data_ts_delta
|
||||
else:
|
||||
return data_ts_base + rate_to_period(data_ts_rate,
|
||||
data_ts_inc)
|
||||
|
||||
# clock_ts is the imprecise "real" timestamp (from the filename,
|
||||
# comments, or or system clock)
|
||||
# comments, or system clock)
|
||||
clock_ts = None
|
||||
|
||||
def print_clock_updated():
|
||||
printf("Clock time updated to %s\n", timestamp_to_human(clock_ts))
|
||||
printf("Clock timestamp updated to %s\n", timestamp_to_human(clock_ts))
|
||||
if data_ts_base != 0:
|
||||
diff = data_ts - clock_ts
|
||||
diff = get_data_ts() - clock_ts
|
||||
if diff >= 0:
|
||||
printf(" (data timestamp ahead by %.6f sec)\n",
|
||||
timestamp_to_seconds(diff))
|
||||
@@ -79,12 +155,17 @@ def main(argv = None):
|
||||
printf(" (data timestamp behind by %.6f sec)\n",
|
||||
timestamp_to_seconds(-diff))
|
||||
|
||||
offset_filename = seconds_to_timestamp(args.offset_filename)
|
||||
offset_comment = seconds_to_timestamp(args.offset_comment)
|
||||
max_gap = seconds_to_timestamp(args.max_gap)
|
||||
|
||||
with client.stream_insert_context(args.path) as stream:
|
||||
for f in args.infile:
|
||||
filename = f.name
|
||||
printf("Processing %s\n", filename)
|
||||
|
||||
# If the filename ends in .gz, open it with gzcat instead.
|
||||
# If the filename ends in .gz, re-open it with gzip to
|
||||
# decompress.
|
||||
if filename.endswith(".gz"):
|
||||
p = subprocess.Popen(["gzip", "-dc"],
|
||||
stdin = f, stdout = subprocess.PIPE)
|
||||
@@ -95,7 +176,7 @@ def main(argv = None):
|
||||
# Subtract 1 hour because files are created at the end
|
||||
# of the hour. Hopefully, we'll be able to use
|
||||
# internal comments and this value won't matter anyway.
|
||||
clock_ts = parse_time(filename) - seconds_to_timestamp(3600)
|
||||
clock_ts = parse_time(filename) + offset_filename
|
||||
print_clock_updated()
|
||||
except ValueError:
|
||||
pass
|
||||
@@ -104,8 +185,15 @@ def main(argv = None):
|
||||
|
||||
# Read each line
|
||||
for line in f:
|
||||
data_ts = data_ts_base + rate_to_period(data_ts_rate,
|
||||
data_ts_inc)
|
||||
# Once in a while a line might be truncated, if we're
|
||||
# at the end of a file. Ignore it, but if we ignore
|
||||
# too many, bail out.
|
||||
if line[-1] != '\n':
|
||||
truncated_lines += 1
|
||||
if truncated_lines > 3:
|
||||
raise ParseError(filename, "too many short lines")
|
||||
printf("Ignoring short line in %s\n", filename)
|
||||
continue
|
||||
|
||||
# If no content other than the newline, skip it
|
||||
if len(line) <= 1:
|
||||
@@ -114,20 +202,32 @@ def main(argv = None):
|
||||
# If line starts with a comment, look for a timestamp
|
||||
if line[0] == '#':
|
||||
try:
|
||||
clock_ts = parse_time(line[1:])
|
||||
clock_ts = parse_time(line[1:]) + offset_comment
|
||||
print_clock_updated()
|
||||
except ValueError:
|
||||
pass
|
||||
continue
|
||||
|
||||
# If --delta mode, increment data_ts_delta by the
|
||||
# delta from the file.
|
||||
if args.delta:
|
||||
try:
|
||||
(delta, line) = line.split(None, 1)
|
||||
data_ts_delta += float(delta)
|
||||
except ValueError:
|
||||
raise ParseError(filename, "can't parse delta")
|
||||
|
||||
# Calculate data_ts for this row
|
||||
data_ts = get_data_ts()
|
||||
|
||||
# If inserting live, use clock timestamp
|
||||
if live:
|
||||
if args.live:
|
||||
clock_ts = time_now()
|
||||
|
||||
# If we have a real timestamp, compare it to the data
|
||||
# timestamp, and make sure things match up.
|
||||
if clock_ts is not None:
|
||||
if (data_ts - seconds_to_timestamp(10)) > clock_ts:
|
||||
if (data_ts - max_gap) > clock_ts:
|
||||
# Accumulated line timestamps are in the future.
|
||||
# If we were to set data_ts=clock_ts, we'd create
|
||||
# an overlap, so we have to just bail out here.
|
||||
@@ -137,7 +237,7 @@ def main(argv = None):
|
||||
timestamp_to_human(clock_ts))
|
||||
raise ParseError(filename, err)
|
||||
|
||||
if (data_ts + seconds_to_timestamp(10)) < clock_ts:
|
||||
if (data_ts + max_gap) < clock_ts:
|
||||
# Accumulated line timetamps are in the past. We
|
||||
# can just skip some time and leave a gap in the
|
||||
# data.
|
||||
@@ -148,7 +248,7 @@ def main(argv = None):
|
||||
timestamp_to_human(clock_ts))
|
||||
stream.finalize()
|
||||
data_ts_base = data_ts = clock_ts
|
||||
data_ts_inc = 0
|
||||
data_ts_inc = data_ts_delta = 0
|
||||
|
||||
# Don't use this clock time anymore until we update it
|
||||
clock_ts = None
|
||||
@@ -156,21 +256,12 @@ def main(argv = None):
|
||||
if data_ts_base == 0:
|
||||
raise ParseError(filename, "No idea what timestamp to use")
|
||||
|
||||
# This line is legit, so increment timestamp
|
||||
# This line is legit, so increment timestamp (for --rate)
|
||||
data_ts_inc += 1
|
||||
|
||||
# Once in a while a line might be truncated, if we're at
|
||||
# the end of a file. Ignore it, but if we ignore too many,
|
||||
# bail out.
|
||||
if line[-1] != '\n':
|
||||
truncated_lines += 1
|
||||
if truncated_lines > 3:
|
||||
raise ParseError(filename, "too many short lines")
|
||||
printf("Ignoring short line in %s\n", filename)
|
||||
continue
|
||||
|
||||
# Insert it
|
||||
stream.insert("%d %s" % (data_ts, line))
|
||||
if not args.dry_run:
|
||||
stream.insert("%d %s" % (data_ts, line))
|
||||
print "Done"
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@@ -111,7 +111,7 @@ def process(data, interval, args, insert_function, final):
|
||||
# Fill output data
|
||||
out[0, 0] = t_min
|
||||
for k in range(nharm):
|
||||
Fk = F[2 * k + 1] * e**(rotation * 1j * k)
|
||||
Fk = F[2 * k + 1] * e**(rotation * 1j * (k+1))
|
||||
out[0, 2 * k + 1] = -imag(Fk) # Pk
|
||||
out[0, 2 * k + 2] = real(Fk) # Qk
|
||||
|
||||
|
@@ -23,7 +23,7 @@ def main(argv = None):
|
||||
try:
|
||||
args = f.parse_args(argv)
|
||||
except nilmtools.filter.MissingDestination as e:
|
||||
rec = "float32_4"
|
||||
rec = "float32_3"
|
||||
print "Source is %s (%s)" % (e.src.path, e.src.layout)
|
||||
print "Destination %s doesn't exist" % (e.dest.path)
|
||||
print "You could make it with a command like:"
|
||||
|
Reference in New Issue
Block a user