Compare commits

..

8 Commits

6 changed files with 188 additions and 82 deletions

View File

@@ -9,6 +9,13 @@ else
endif
test:
src/decimate.py
test_insert:
@make install >/dev/null
src/insert.py --file --dry-run /test/foo </dev/null
test_copy:
@make install >/dev/null
src/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees*

View File

@@ -75,6 +75,7 @@ setup(name='nilmtools',
'nilm-decimate-auto = nilmtools.decimate_auto:main',
'nilm-insert = nilmtools.insert:main',
'nilm-copy = nilmtools.copy_one:main',
'nilm-prep = nilmtools.prep:main',
'nilm-copy-wildcard = nilmtools.copy_wildcard:main',
'nilm-sinefit = nilmtools.sinefit:main',
],

View File

@@ -41,41 +41,45 @@ def main(argv = None):
# If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath):
n = f.src.layout_count // 3
f.process_python(function = decimate_again, rows = args.factor,
args = (n,))
again = True
else:
n = f.src.layout_count
f.process_python(function = decimate_first, rows = args.factor,
args = (n,))
again = False
f.process_numpy(decimate, args = (args.factor, again))
def decimate_first(data, n):
"""Decimate original data -- result has 3 times as many columns"""
# For this simple calculation, converting to a Numpy array
# and doing the math is slower than just doing it directly.
rows = iter(data)
r_sum = r_min = r_max = rows.next()
for row in rows:
r_sum = map(operator.add, r_sum, row)
r_min = map(min, r_min, row)
r_max = map(max, r_max, row)
r_mean = [ x / len(data) for x in r_sum ]
return [ [ r_mean[0] ] + r_mean[1:] + r_min[1:] + r_max[1:] ]
def decimate(data, interval, args, insert_function, final):
"""Decimate data"""
(factor, again) = args
(n, m) = data.shape
def decimate_again(data, n):
"""Decimate already-decimated data -- result has the same number
of columns"""
rows = iter(data)
r = rows.next()
r_sum = r[0:(n+1)]
r_min = r[(n+1):(2*n+1)]
r_max = r[(2*n+1):(3*n+1)]
for r in rows:
r_sum = map(operator.add, r_sum, r[0:(n+1)])
r_min = map(min, r_min, r[(n+1):(2*n+1)])
r_max = map(max, r_max, r[(2*n+1):(3*n+1)])
r_mean = [ x / len(data) for x in r_sum ]
return [ r_mean + r_min + r_max ]
# Figure out which columns to use as the source for mean, min, and max,
# depending on whether this is the first decimation or we're decimating
# again. Note that we include the timestamp in the means.
if again:
c = (m - 1) // 3
# e.g. c = 3
# ts mean1 mean2 mean3 min1 min2 min3 max1 max2 max3
mean_col = slice(0, c + 1)
min_col = slice(c + 1, 2 * c + 1)
max_col = slice(2 * c + 1, 3 * c + 1)
else:
mean_col = slice(0, m)
min_col = slice(1, m)
max_col = slice(1, m)
# Discard extra rows that aren't a multiple of factor
n = n // factor * factor
data = data[:n,:]
# Reshape it into 3D so we can process 'factor' rows at a time
data.shape = (n // factor, factor, m)
# Fill the result
out = np.c_[ np.mean(data[:,:,mean_col], axis=1),
np.min(data[:,:,min_col], axis=1),
np.max(data[:,:,max_col], axis=1) ]
insert_function(out)
return n
if __name__ == "__main__":
main()

View File

@@ -367,8 +367,11 @@ class Filter(object):
extract = extractor(self.src.path, interval.start, interval.end)
old_array = np.array([])
for batched in batch(extract, rows):
# Read in this batch of data
new_array = np.loadtxt(batched)
# Read in this batch of data. This turns out to
# be a very fast way to read and convert it (order
# of magnitude faster than numpy.loadtxt)
new_array = np.fromstring("\n".join(batched), sep=' ')
new_array = new_array.reshape(-1, self.src.total_count)
# If we still had old data left, combine it
if old_array.shape[0] != 0:

View File

@@ -12,6 +12,7 @@ import sys
import re
import argparse
import subprocess
import textwrap
class ParseError(Exception):
def __init__(self, filename, error):
@@ -22,32 +23,103 @@ def parse_args(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = """\
Insert data from ethstream, either live (using the system time as a
reference) or prerecorded (using comments in the file as a reference).
description = textwrap.dedent("""\
Insert large amount of data from an external source like ethstream.
The data is assumed to have been recorded at the specified rate.
Small discrepencies between the accumulated timestamps and the
reference time are ignored; larger discrepencies cause gaps to be
created in the stream. Overlapping data returns an error.
""")
This code tracks two timestamps:
(1) The 'data' timestamp is the precise timestamp corresponding to
a particular row of data, and is the timestamp that gets
inserted into the database. It increases by 'data_delta' for
every row of input.
'data_delta' can come from one of two sources. If '--delta'
is specified, it is pulled from the first column of data. If
'--rate' is specified, 'data_delta' is set to a fixed value of
(1 / rate).
(2) The 'clock' timestamp is the less precise timestamp that gives
the absolute time. It can come from two sources. If '--live'
is specified, it is pulled directly from the system clock. If
'--file' is specified, it is extracted from the input filename
every time a new file is opened for read, and from comments
that appear in the file.
Small discrepencies between 'data' and 'clock' are ignored. If
the 'data' timestamp ever differs from the 'clock' timestamp by
more than 'max_gap' seconds:
- If 'data' is running behind, there is a gap in the data, so it
is stepped forward to match 'clock'.
- If 'data' is running ahead, there is overlap in the data, and an
error is raised.
"""))
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-r", "--rate", action="store", default=8000,
group = parser.add_argument_group("Misc options")
group.add_argument("-D", "--dry-run", action="store_true",
help="Parse files, but don't insert any data")
group.add_argument("-m", "--max-gap", action="store", default=10.0,
metavar="SEC", type=float,
help="Max discrepency between clock and data "
"timestamps (default: %(default)s)")
group = parser.add_argument_group("Data timestamp delta")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-r", "--rate", action="store", default=8000.0,
type=float,
help="Data rate in Hz (default: %(default)s)")
parser.add_argument("-l", "--live", action="store_true",
help="Live capture; use system time to verify rate")
parser.add_argument("path", action="store",
help="Data_delta is constant 1/RATE "
"(default: %(default)s Hz)")
exc.add_argument("-d", "--delta", action="store_true",
help="Data_delta is the first number in each line")
group = parser.add_argument_group("Clock timestamp source")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-l", "--live", action="store_true",
help="Use live system time for clock timestamp")
exc.add_argument("-f", "--file", action="store_true", default=True,
help="Use filename or comments for clock timestamp")
group.add_argument("-o", "--offset-filename", metavar="SEC",
action="store", default=-3600.0, type=float,
help="Offset to add to filename timestamps "
"(default: %(default)s)")
group.add_argument("-O", "--offset-comment", metavar="SEC",
action="store", default=0.0, type=float,
help="Offset to add to comment timestamps "
"(default: %(default)s)")
group = parser.add_argument_group("Database path")
group.add_argument("path", action="store",
help="Path of stream, e.g. /foo/bar")
parser.add_argument("infile", type=argparse.FileType('r'), nargs='*',
group = parser.add_argument_group("Input files")
group.add_argument("infile", type=argparse.FileType('r'), nargs='*',
default=[sys.stdin],
help="Input files (default: stdin)")
args = parser.parse_args(argv)
printf("Stream path: %s\n", args.path)
printf(" Data rate: %s Hz\n", repr(args.rate))
printf(" Stream path: %s\n", args.path)
printf(" Data timestamp: ")
if args.delta:
printf("delta on each input line\n")
else:
printf("fixed rate %s Hz\n", repr(args.rate))
printf(" Clock timestamp: ")
if args.live:
printf("live system clock\n")
else:
printf("from filenames and comments\n")
printf(" Filename offset: %s seconds\n", repr(args.offset_filename))
printf(" Comment offset: %s seconds\n", repr(args.offset_comment))
printf(" Max gap: %s seconds\n", repr(args.max_gap))
if args.dry_run:
printf("Dry run (no data will be inserted)\n")
return args
@@ -56,22 +128,26 @@ def main(argv = None):
client = nilmdb.client.Client(args.url)
# Local copies to save dictionary lookups
live = args.live
# data_ts is the timestamp that we'll use for the current line
data_ts_base = 0
data_ts_inc = 0
data_ts_rate = args.rate
data_ts_delta = 0
def get_data_ts():
if args.delta:
return data_ts_base + data_ts_delta
else:
return data_ts_base + rate_to_period(data_ts_rate,
data_ts_inc)
# clock_ts is the imprecise "real" timestamp (from the filename,
# comments, or or system clock)
# comments, or system clock)
clock_ts = None
def print_clock_updated():
printf("Clock time updated to %s\n", timestamp_to_human(clock_ts))
printf("Clock timestamp updated to %s\n", timestamp_to_human(clock_ts))
if data_ts_base != 0:
diff = data_ts - clock_ts
diff = get_data_ts() - clock_ts
if diff >= 0:
printf(" (data timestamp ahead by %.6f sec)\n",
timestamp_to_seconds(diff))
@@ -79,12 +155,17 @@ def main(argv = None):
printf(" (data timestamp behind by %.6f sec)\n",
timestamp_to_seconds(-diff))
offset_filename = seconds_to_timestamp(args.offset_filename)
offset_comment = seconds_to_timestamp(args.offset_comment)
max_gap = seconds_to_timestamp(args.max_gap)
with client.stream_insert_context(args.path) as stream:
for f in args.infile:
filename = f.name
printf("Processing %s\n", filename)
# If the filename ends in .gz, open it with gzcat instead.
# If the filename ends in .gz, re-open it with gzip to
# decompress.
if filename.endswith(".gz"):
p = subprocess.Popen(["gzip", "-dc"],
stdin = f, stdout = subprocess.PIPE)
@@ -95,7 +176,7 @@ def main(argv = None):
# Subtract 1 hour because files are created at the end
# of the hour. Hopefully, we'll be able to use
# internal comments and this value won't matter anyway.
clock_ts = parse_time(filename) - seconds_to_timestamp(3600)
clock_ts = parse_time(filename) + offset_filename
print_clock_updated()
except ValueError:
pass
@@ -104,8 +185,15 @@ def main(argv = None):
# Read each line
for line in f:
data_ts = data_ts_base + rate_to_period(data_ts_rate,
data_ts_inc)
# Once in a while a line might be truncated, if we're
# at the end of a file. Ignore it, but if we ignore
# too many, bail out.
if line[-1] != '\n':
truncated_lines += 1
if truncated_lines > 3:
raise ParseError(filename, "too many short lines")
printf("Ignoring short line in %s\n", filename)
continue
# If no content other than the newline, skip it
if len(line) <= 1:
@@ -114,20 +202,32 @@ def main(argv = None):
# If line starts with a comment, look for a timestamp
if line[0] == '#':
try:
clock_ts = parse_time(line[1:])
clock_ts = parse_time(line[1:]) + offset_comment
print_clock_updated()
except ValueError:
pass
continue
# If --delta mode, increment data_ts_delta by the
# delta from the file.
if args.delta:
try:
(delta, line) = line.split(None, 1)
data_ts_delta += float(delta)
except ValueError:
raise ParseError(filename, "can't parse delta")
# Calculate data_ts for this row
data_ts = get_data_ts()
# If inserting live, use clock timestamp
if live:
if args.live:
clock_ts = time_now()
# If we have a real timestamp, compare it to the data
# timestamp, and make sure things match up.
if clock_ts is not None:
if (data_ts - seconds_to_timestamp(10)) > clock_ts:
if (data_ts - max_gap) > clock_ts:
# Accumulated line timestamps are in the future.
# If we were to set data_ts=clock_ts, we'd create
# an overlap, so we have to just bail out here.
@@ -137,7 +237,7 @@ def main(argv = None):
timestamp_to_human(clock_ts))
raise ParseError(filename, err)
if (data_ts + seconds_to_timestamp(10)) < clock_ts:
if (data_ts + max_gap) < clock_ts:
# Accumulated line timetamps are in the past. We
# can just skip some time and leave a gap in the
# data.
@@ -148,7 +248,7 @@ def main(argv = None):
timestamp_to_human(clock_ts))
stream.finalize()
data_ts_base = data_ts = clock_ts
data_ts_inc = 0
data_ts_inc = data_ts_delta = 0
# Don't use this clock time anymore until we update it
clock_ts = None
@@ -156,20 +256,11 @@ def main(argv = None):
if data_ts_base == 0:
raise ParseError(filename, "No idea what timestamp to use")
# This line is legit, so increment timestamp
# This line is legit, so increment timestamp (for --rate)
data_ts_inc += 1
# Once in a while a line might be truncated, if we're at
# the end of a file. Ignore it, but if we ignore too many,
# bail out.
if line[-1] != '\n':
truncated_lines += 1
if truncated_lines > 3:
raise ParseError(filename, "too many short lines")
printf("Ignoring short line in %s\n", filename)
continue
# Insert it
if not args.dry_run:
stream.insert("%d %s" % (data_ts, line))
print "Done"

View File

@@ -111,7 +111,7 @@ def process(data, interval, args, insert_function, final):
# Fill output data
out[0, 0] = t_min
for k in range(nharm):
Fk = F[2 * k + 1] * e**(rotation * 1j * k)
Fk = F[2 * k + 1] * e**(rotation * 1j * (k+1))
out[0, 2 * k + 1] = -imag(Fk) # Pk
out[0, 2 * k + 2] = real(Fk) # Qk