Compare commits

..

17 Commits

9 changed files with 203 additions and 169 deletions

View File

@@ -9,6 +9,13 @@ else
endif endif
test: test:
src/decimate.py
test_insert:
@make install >/dev/null
src/insert.py --file --dry-run /test/foo </dev/null
test_copy:
@make install >/dev/null @make install >/dev/null
src/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees* src/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees*

View File

@@ -6,8 +6,9 @@ Prerequisites:
# Runtime and build environments # Runtime and build environments
sudo apt-get install python2.7 python2.7-dev python-setuptools sudo apt-get install python2.7 python2.7-dev python-setuptools
sudo apt-get install python-numpy python-scipy python-matplotlib
nilmdb (1.3.1+) nilmdb (1.5.0+)
Install: Install:

View File

@@ -61,7 +61,7 @@ setup(name='nilmtools',
long_description = "NILM Database Tools", long_description = "NILM Database Tools",
license = "Proprietary", license = "Proprietary",
author_email = 'jim@jtan.com', author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.4.6', install_requires = [ 'nilmdb >= 1.5.0',
'numpy', 'numpy',
'scipy', 'scipy',
'matplotlib', 'matplotlib',
@@ -75,6 +75,7 @@ setup(name='nilmtools',
'nilm-decimate-auto = nilmtools.decimate_auto:main', 'nilm-decimate-auto = nilmtools.decimate_auto:main',
'nilm-insert = nilmtools.insert:main', 'nilm-insert = nilmtools.insert:main',
'nilm-copy = nilmtools.copy_one:main', 'nilm-copy = nilmtools.copy_one:main',
'nilm-prep = nilmtools.prep:main',
'nilm-copy-wildcard = nilmtools.copy_wildcard:main', 'nilm-copy-wildcard = nilmtools.copy_wildcard:main',
'nilm-sinefit = nilmtools.sinefit:main', 'nilm-sinefit = nilmtools.sinefit:main',
], ],

View File

@@ -5,6 +5,7 @@
import nilmtools.filter import nilmtools.filter
import nilmdb.client import nilmdb.client
from nilmdb.client.numpyclient import NumpyClient
import numpy as np import numpy as np
import sys import sys
@@ -27,14 +28,14 @@ def main(argv = None):
meta = f.client_src.stream_get_metadata(f.src.path) meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata(meta) f.check_dest_metadata(meta)
# Copy all rows of data as ASCII strings # Copy all rows of data using the faster Numpy interfaces
extractor = nilmdb.client.Client(f.src.url).stream_extract extractor = NumpyClient(f.src.url).stream_extract_numpy
inserter = nilmdb.client.Client(f.dest.url).stream_insert_context inserter = NumpyClient(f.dest.url).stream_insert_numpy_context
for i in f.intervals(): for i in f.intervals():
print "Processing", f.interval_string(i) print "Processing", f.interval_string(i)
with inserter(f.dest.path, i.start, i.end) as insert_ctx: with inserter(f.dest.path, i.start, i.end) as insert_ctx:
for row in extractor(f.src.path, i.start, i.end): for data in extractor(f.src.path, i.start, i.end):
insert_ctx.insert(row + "\n") insert_ctx.insert(data)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -41,41 +41,45 @@ def main(argv = None):
# If source is decimated, we have to decimate a bit differently # If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath): if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath):
n = f.src.layout_count // 3 again = True
f.process_python(function = decimate_again, rows = args.factor,
args = (n,))
else: else:
n = f.src.layout_count again = False
f.process_python(function = decimate_first, rows = args.factor, f.process_numpy(decimate, args = (args.factor, again))
args = (n,))
def decimate_first(data, n): def decimate(data, interval, args, insert_function, final):
"""Decimate original data -- result has 3 times as many columns""" """Decimate data"""
# For this simple calculation, converting to a Numpy array (factor, again) = args
# and doing the math is slower than just doing it directly. (n, m) = data.shape
rows = iter(data)
r_sum = r_min = r_max = rows.next()
for row in rows:
r_sum = map(operator.add, r_sum, row)
r_min = map(min, r_min, row)
r_max = map(max, r_max, row)
r_mean = [ x / len(data) for x in r_sum ]
return [ [ r_mean[0] ] + r_mean[1:] + r_min[1:] + r_max[1:] ]
def decimate_again(data, n): # Figure out which columns to use as the source for mean, min, and max,
"""Decimate already-decimated data -- result has the same number # depending on whether this is the first decimation or we're decimating
of columns""" # again. Note that we include the timestamp in the means.
rows = iter(data) if again:
r = rows.next() c = (m - 1) // 3
r_sum = r[0:(n+1)] # e.g. c = 3
r_min = r[(n+1):(2*n+1)] # ts mean1 mean2 mean3 min1 min2 min3 max1 max2 max3
r_max = r[(2*n+1):(3*n+1)] mean_col = slice(0, c + 1)
for r in rows: min_col = slice(c + 1, 2 * c + 1)
r_sum = map(operator.add, r_sum, r[0:(n+1)]) max_col = slice(2 * c + 1, 3 * c + 1)
r_min = map(min, r_min, r[(n+1):(2*n+1)]) else:
r_max = map(max, r_max, r[(2*n+1):(3*n+1)]) mean_col = slice(0, m)
r_mean = [ x / len(data) for x in r_sum ] min_col = slice(1, m)
return [ r_mean + r_min + r_max ] max_col = slice(1, m)
# Discard extra rows that aren't a multiple of factor
n = n // factor * factor
data = data[:n,:]
# Reshape it into 3D so we can process 'factor' rows at a time
data = data.reshape(n // factor, factor, m)
# Fill the result
out = np.c_[ np.mean(data[:,:,mean_col], axis=1),
np.min(data[:,:,min_col], axis=1),
np.max(data[:,:,max_col], axis=1) ]
insert_function(out)
return n
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -4,6 +4,7 @@ from __future__ import absolute_import
import nilmdb.client import nilmdb.client
from nilmdb.client import Client from nilmdb.client import Client
from nilmdb.client.numpyclient import NumpyClient
from nilmdb.utils.printf import * from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human, from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds) timestamp_to_seconds)
@@ -247,72 +248,7 @@ class Filter(object):
# All good -- write the metadata in case it's not already there # All good -- write the metadata in case it's not already there
self._client_dest.stream_update_metadata(self.dest.path, data) self._client_dest.stream_update_metadata(self.dest.path, data)
# Main processing helper # The main filter processing method.
def process_python(self, function, rows, args = None, partial = False):
"""Process data in chunks of 'rows' data at a time.
This provides data as nested Python lists and expects the same
back.
function: function to process the data
rows: maximum number of rows to pass to 'function' at once
args: tuple containing extra arguments to pass to 'function'
partial: if true, less than 'rows' may be passed to 'function'.
if false, partial data at the end of an interval will
be dropped.
'function' should be defined like:
function(data, *args)
It will be passed a list containing up to 'rows' rows of
data from the source stream, and any arguments passed in
'args'. It should transform the data as desired, and return a
new list of rdata, which will be inserted into the destination
stream.
"""
if args is None:
args = []
extractor = Client(self.src.url).stream_extract
inserter = Client(self.dest.url).stream_insert_context
# Parse input data. We use homogenous types for now, which
# means the timestamp type will be either float or int.
if "int" in self.src.layout_type:
parser = lambda line: [ int(x) for x in line.split() ]
else:
parser = lambda line: [ float(x) for x in line.split() ]
# Format output data.
formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"
for interval in self.intervals():
print "Processing", self.interval_string(interval)
with inserter(self.dest.path,
interval.start, interval.end) as insert_ctx:
src_array = []
for line in extractor(self.src.path,
interval.start, interval.end):
# Read in data
src_array.append([ float(x) for x in line.split() ])
if len(src_array) == rows:
# Pass through filter function
dest_array = function(src_array, *args)
# Write result to destination
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("".join(out))
# Clear source array
src_array = []
# Take care of partial chunk
if len(src_array) and partial:
dest_array = function(src_array, *args)
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("".join(out))
# Like process_python, but provides Numpy arrays and allows for
# partial processing.
def process_numpy(self, function, args = None, rows = 100000): def process_numpy(self, function, args = None, rows = 100000):
"""For all intervals that exist in self.src but don't exist in """For all intervals that exist in self.src but don't exist in
self.dest, call 'function' with a Numpy array corresponding to self.dest, call 'function' with a Numpy array corresponding to
@@ -342,8 +278,8 @@ class Filter(object):
""" """
if args is None: if args is None:
args = [] args = []
extractor = Client(self.src.url).stream_extract extractor = NumpyClient(self.src.url).stream_extract_numpy
inserter = Client(self.dest.url).stream_insert_context inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
# Format output data. # Format output data.
formatter = lambda row: " ".join([repr(x) for x in row]) + "\n" formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"
@@ -357,19 +293,12 @@ class Filter(object):
print "Processing", self.interval_string(interval) print "Processing", self.interval_string(interval)
with inserter(self.dest.path, with inserter(self.dest.path,
interval.start, interval.end) as insert_ctx: interval.start, interval.end) as insert_ctx:
def insert_function(array): insert_function = insert_ctx.insert
s = cStringIO.StringIO()
if len(np.shape(array)) != 2:
raise Exception("array must be 2-dimensional")
np.savetxt(s, array)
insert_ctx.insert(s.getvalue())
extract = extractor(self.src.path, interval.start, interval.end)
old_array = np.array([]) old_array = np.array([])
for batched in batch(extract, rows): for new_array in extractor(self.src.path,
# Read in this batch of data interval.start, interval.end,
new_array = np.loadtxt(batched) layout = self.src.layout,
maxrows = rows):
# If we still had old data left, combine it # If we still had old data left, combine it
if old_array.shape[0] != 0: if old_array.shape[0] != 0:
array = np.vstack((old_array, new_array)) array = np.vstack((old_array, new_array))

View File

@@ -12,6 +12,7 @@ import sys
import re import re
import argparse import argparse
import subprocess import subprocess
import textwrap
class ParseError(Exception): class ParseError(Exception):
def __init__(self, filename, error): def __init__(self, filename, error):
@@ -22,32 +23,103 @@ def parse_args(argv = None):
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter, formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__, version = nilmtools.__version__,
description = """\ description = textwrap.dedent("""\
Insert data from ethstream, either live (using the system time as a Insert large amount of data from an external source like ethstream.
reference) or prerecorded (using comments in the file as a reference).
The data is assumed to have been recorded at the specified rate. This code tracks two timestamps:
Small discrepencies between the accumulated timestamps and the
reference time are ignored; larger discrepencies cause gaps to be (1) The 'data' timestamp is the precise timestamp corresponding to
created in the stream. Overlapping data returns an error. a particular row of data, and is the timestamp that gets
""") inserted into the database. It increases by 'data_delta' for
every row of input.
'data_delta' can come from one of two sources. If '--delta'
is specified, it is pulled from the first column of data. If
'--rate' is specified, 'data_delta' is set to a fixed value of
(1 / rate).
(2) The 'clock' timestamp is the less precise timestamp that gives
the absolute time. It can come from two sources. If '--live'
is specified, it is pulled directly from the system clock. If
'--file' is specified, it is extracted from the input filename
every time a new file is opened for read, and from comments
that appear in the file.
Small discrepencies between 'data' and 'clock' are ignored. If
the 'data' timestamp ever differs from the 'clock' timestamp by
more than 'max_gap' seconds:
- If 'data' is running behind, there is a gap in the data, so it
is stepped forward to match 'clock'.
- If 'data' is running ahead, there is overlap in the data, and an
error is raised.
"""))
parser.add_argument("-u", "--url", action="store", parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/", default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)") help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-r", "--rate", action="store", default=8000, group = parser.add_argument_group("Misc options")
group.add_argument("-D", "--dry-run", action="store_true",
help="Parse files, but don't insert any data")
group.add_argument("-m", "--max-gap", action="store", default=10.0,
metavar="SEC", type=float,
help="Max discrepency between clock and data "
"timestamps (default: %(default)s)")
group = parser.add_argument_group("Data timestamp delta")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-r", "--rate", action="store", default=8000.0,
type=float, type=float,
help="Data rate in Hz (default: %(default)s)") help="Data_delta is constant 1/RATE "
parser.add_argument("-l", "--live", action="store_true", "(default: %(default)s Hz)")
help="Live capture; use system time to verify rate") exc.add_argument("-d", "--delta", action="store_true",
parser.add_argument("path", action="store", help="Data_delta is the first number in each line")
group = parser.add_argument_group("Clock timestamp source")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-l", "--live", action="store_true",
help="Use live system time for clock timestamp")
exc.add_argument("-f", "--file", action="store_true", default=True,
help="Use filename or comments for clock timestamp")
group.add_argument("-o", "--offset-filename", metavar="SEC",
action="store", default=-3600.0, type=float,
help="Offset to add to filename timestamps "
"(default: %(default)s)")
group.add_argument("-O", "--offset-comment", metavar="SEC",
action="store", default=0.0, type=float,
help="Offset to add to comment timestamps "
"(default: %(default)s)")
group = parser.add_argument_group("Database path")
group.add_argument("path", action="store",
help="Path of stream, e.g. /foo/bar") help="Path of stream, e.g. /foo/bar")
parser.add_argument("infile", type=argparse.FileType('r'), nargs='*',
group = parser.add_argument_group("Input files")
group.add_argument("infile", type=argparse.FileType('r'), nargs='*',
default=[sys.stdin], default=[sys.stdin],
help="Input files (default: stdin)") help="Input files (default: stdin)")
args = parser.parse_args(argv) args = parser.parse_args(argv)
printf(" Stream path: %s\n", args.path) printf(" Stream path: %s\n", args.path)
printf(" Data rate: %s Hz\n", repr(args.rate))
printf(" Data timestamp: ")
if args.delta:
printf("delta on each input line\n")
else:
printf("fixed rate %s Hz\n", repr(args.rate))
printf(" Clock timestamp: ")
if args.live:
printf("live system clock\n")
else:
printf("from filenames and comments\n")
printf(" Filename offset: %s seconds\n", repr(args.offset_filename))
printf(" Comment offset: %s seconds\n", repr(args.offset_comment))
printf(" Max gap: %s seconds\n", repr(args.max_gap))
if args.dry_run:
printf("Dry run (no data will be inserted)\n")
return args return args
@@ -56,22 +128,26 @@ def main(argv = None):
client = nilmdb.client.Client(args.url) client = nilmdb.client.Client(args.url)
# Local copies to save dictionary lookups
live = args.live
# data_ts is the timestamp that we'll use for the current line # data_ts is the timestamp that we'll use for the current line
data_ts_base = 0 data_ts_base = 0
data_ts_inc = 0 data_ts_inc = 0
data_ts_rate = args.rate data_ts_rate = args.rate
data_ts_delta = 0
def get_data_ts():
if args.delta:
return data_ts_base + data_ts_delta
else:
return data_ts_base + rate_to_period(data_ts_rate,
data_ts_inc)
# clock_ts is the imprecise "real" timestamp (from the filename, # clock_ts is the imprecise "real" timestamp (from the filename,
# comments, or or system clock) # comments, or system clock)
clock_ts = None clock_ts = None
def print_clock_updated(): def print_clock_updated():
printf("Clock time updated to %s\n", timestamp_to_human(clock_ts)) printf("Clock timestamp updated to %s\n", timestamp_to_human(clock_ts))
if data_ts_base != 0: if data_ts_base != 0:
diff = data_ts - clock_ts diff = get_data_ts() - clock_ts
if diff >= 0: if diff >= 0:
printf(" (data timestamp ahead by %.6f sec)\n", printf(" (data timestamp ahead by %.6f sec)\n",
timestamp_to_seconds(diff)) timestamp_to_seconds(diff))
@@ -79,12 +155,17 @@ def main(argv = None):
printf(" (data timestamp behind by %.6f sec)\n", printf(" (data timestamp behind by %.6f sec)\n",
timestamp_to_seconds(-diff)) timestamp_to_seconds(-diff))
offset_filename = seconds_to_timestamp(args.offset_filename)
offset_comment = seconds_to_timestamp(args.offset_comment)
max_gap = seconds_to_timestamp(args.max_gap)
with client.stream_insert_context(args.path) as stream: with client.stream_insert_context(args.path) as stream:
for f in args.infile: for f in args.infile:
filename = f.name filename = f.name
printf("Processing %s\n", filename) printf("Processing %s\n", filename)
# If the filename ends in .gz, open it with gzcat instead. # If the filename ends in .gz, re-open it with gzip to
# decompress.
if filename.endswith(".gz"): if filename.endswith(".gz"):
p = subprocess.Popen(["gzip", "-dc"], p = subprocess.Popen(["gzip", "-dc"],
stdin = f, stdout = subprocess.PIPE) stdin = f, stdout = subprocess.PIPE)
@@ -95,7 +176,7 @@ def main(argv = None):
# Subtract 1 hour because files are created at the end # Subtract 1 hour because files are created at the end
# of the hour. Hopefully, we'll be able to use # of the hour. Hopefully, we'll be able to use
# internal comments and this value won't matter anyway. # internal comments and this value won't matter anyway.
clock_ts = parse_time(filename) - seconds_to_timestamp(3600) clock_ts = parse_time(filename) + offset_filename
print_clock_updated() print_clock_updated()
except ValueError: except ValueError:
pass pass
@@ -104,8 +185,15 @@ def main(argv = None):
# Read each line # Read each line
for line in f: for line in f:
data_ts = data_ts_base + rate_to_period(data_ts_rate, # Once in a while a line might be truncated, if we're
data_ts_inc) # at the end of a file. Ignore it, but if we ignore
# too many, bail out.
if line[-1] != '\n':
truncated_lines += 1
if truncated_lines > 3:
raise ParseError(filename, "too many short lines")
printf("Ignoring short line in %s\n", filename)
continue
# If no content other than the newline, skip it # If no content other than the newline, skip it
if len(line) <= 1: if len(line) <= 1:
@@ -114,20 +202,32 @@ def main(argv = None):
# If line starts with a comment, look for a timestamp # If line starts with a comment, look for a timestamp
if line[0] == '#': if line[0] == '#':
try: try:
clock_ts = parse_time(line[1:]) clock_ts = parse_time(line[1:]) + offset_comment
print_clock_updated() print_clock_updated()
except ValueError: except ValueError:
pass pass
continue continue
# If --delta mode, increment data_ts_delta by the
# delta from the file.
if args.delta:
try:
(delta, line) = line.split(None, 1)
data_ts_delta += float(delta)
except ValueError:
raise ParseError(filename, "can't parse delta")
# Calculate data_ts for this row
data_ts = get_data_ts()
# If inserting live, use clock timestamp # If inserting live, use clock timestamp
if live: if args.live:
clock_ts = time_now() clock_ts = time_now()
# If we have a real timestamp, compare it to the data # If we have a real timestamp, compare it to the data
# timestamp, and make sure things match up. # timestamp, and make sure things match up.
if clock_ts is not None: if clock_ts is not None:
if (data_ts - seconds_to_timestamp(10)) > clock_ts: if (data_ts - max_gap) > clock_ts:
# Accumulated line timestamps are in the future. # Accumulated line timestamps are in the future.
# If we were to set data_ts=clock_ts, we'd create # If we were to set data_ts=clock_ts, we'd create
# an overlap, so we have to just bail out here. # an overlap, so we have to just bail out here.
@@ -137,7 +237,7 @@ def main(argv = None):
timestamp_to_human(clock_ts)) timestamp_to_human(clock_ts))
raise ParseError(filename, err) raise ParseError(filename, err)
if (data_ts + seconds_to_timestamp(10)) < clock_ts: if (data_ts + max_gap) < clock_ts:
# Accumulated line timetamps are in the past. We # Accumulated line timetamps are in the past. We
# can just skip some time and leave a gap in the # can just skip some time and leave a gap in the
# data. # data.
@@ -148,7 +248,7 @@ def main(argv = None):
timestamp_to_human(clock_ts)) timestamp_to_human(clock_ts))
stream.finalize() stream.finalize()
data_ts_base = data_ts = clock_ts data_ts_base = data_ts = clock_ts
data_ts_inc = 0 data_ts_inc = data_ts_delta = 0
# Don't use this clock time anymore until we update it # Don't use this clock time anymore until we update it
clock_ts = None clock_ts = None
@@ -156,20 +256,11 @@ def main(argv = None):
if data_ts_base == 0: if data_ts_base == 0:
raise ParseError(filename, "No idea what timestamp to use") raise ParseError(filename, "No idea what timestamp to use")
# This line is legit, so increment timestamp # This line is legit, so increment timestamp (for --rate)
data_ts_inc += 1 data_ts_inc += 1
# Once in a while a line might be truncated, if we're at
# the end of a file. Ignore it, but if we ignore too many,
# bail out.
if line[-1] != '\n':
truncated_lines += 1
if truncated_lines > 3:
raise ParseError(filename, "too many short lines")
printf("Ignoring short line in %s\n", filename)
continue
# Insert it # Insert it
if not args.dry_run:
stream.insert("%d %s" % (data_ts, line)) stream.insert("%d %s" % (data_ts, line))
print "Done" print "Done"

View File

@@ -8,7 +8,7 @@ import nilmdb.client
from numpy import * from numpy import *
import scipy.fftpack import scipy.fftpack
import scipy.signal import scipy.signal
from matplotlib import pyplot as p #from matplotlib import pyplot as p
import bisect import bisect
def main(argv = None): def main(argv = None):
@@ -111,7 +111,7 @@ def process(data, interval, args, insert_function, final):
# Fill output data # Fill output data
out[0, 0] = t_min out[0, 0] = t_min
for k in range(nharm): for k in range(nharm):
Fk = F[2 * k + 1] * e**(rotation * 1j * k) Fk = F[2 * k + 1] * e**(rotation * 1j * (k+1))
out[0, 2 * k + 1] = -imag(Fk) # Pk out[0, 2 * k + 1] = -imag(Fk) # Pk
out[0, 2 * k + 2] = real(Fk) # Qk out[0, 2 * k + 2] = real(Fk) # Qk

View File

@@ -23,7 +23,7 @@ def main(argv = None):
try: try:
args = f.parse_args(argv) args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e: except nilmtools.filter.MissingDestination as e:
rec = "float32_4" rec = "float32_3"
print "Source is %s (%s)" % (e.src.path, e.src.layout) print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path) print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:" print "You could make it with a command like:"