Compare commits

...

17 Commits

9 changed files with 203 additions and 169 deletions

View File

@@ -9,6 +9,13 @@ else
endif
test:
src/decimate.py
test_insert:
@make install >/dev/null
src/insert.py --file --dry-run /test/foo </dev/null
test_copy:
@make install >/dev/null
src/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees*

View File

@@ -6,8 +6,9 @@ Prerequisites:
# Runtime and build environments
sudo apt-get install python2.7 python2.7-dev python-setuptools
sudo apt-get install python-numpy python-scipy python-matplotlib
nilmdb (1.3.1+)
nilmdb (1.5.0+)
Install:

View File

@@ -61,7 +61,7 @@ setup(name='nilmtools',
long_description = "NILM Database Tools",
license = "Proprietary",
author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.4.6',
install_requires = [ 'nilmdb >= 1.5.0',
'numpy',
'scipy',
'matplotlib',
@@ -75,6 +75,7 @@ setup(name='nilmtools',
'nilm-decimate-auto = nilmtools.decimate_auto:main',
'nilm-insert = nilmtools.insert:main',
'nilm-copy = nilmtools.copy_one:main',
'nilm-prep = nilmtools.prep:main',
'nilm-copy-wildcard = nilmtools.copy_wildcard:main',
'nilm-sinefit = nilmtools.sinefit:main',
],

View File

@@ -5,6 +5,7 @@
import nilmtools.filter
import nilmdb.client
from nilmdb.client.numpyclient import NumpyClient
import numpy as np
import sys
@@ -27,14 +28,14 @@ def main(argv = None):
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata(meta)
# Copy all rows of data as ASCII strings
extractor = nilmdb.client.Client(f.src.url).stream_extract
inserter = nilmdb.client.Client(f.dest.url).stream_insert_context
# Copy all rows of data using the faster Numpy interfaces
extractor = NumpyClient(f.src.url).stream_extract_numpy
inserter = NumpyClient(f.dest.url).stream_insert_numpy_context
for i in f.intervals():
print "Processing", f.interval_string(i)
with inserter(f.dest.path, i.start, i.end) as insert_ctx:
for row in extractor(f.src.path, i.start, i.end):
insert_ctx.insert(row + "\n")
for data in extractor(f.src.path, i.start, i.end):
insert_ctx.insert(data)
if __name__ == "__main__":
main()

View File

@@ -41,41 +41,45 @@ def main(argv = None):
# If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath):
n = f.src.layout_count // 3
f.process_python(function = decimate_again, rows = args.factor,
args = (n,))
again = True
else:
n = f.src.layout_count
f.process_python(function = decimate_first, rows = args.factor,
args = (n,))
again = False
f.process_numpy(decimate, args = (args.factor, again))
def decimate_first(data, n):
"""Decimate original data -- result has 3 times as many columns"""
# For this simple calculation, converting to a Numpy array
# and doing the math is slower than just doing it directly.
rows = iter(data)
r_sum = r_min = r_max = rows.next()
for row in rows:
r_sum = map(operator.add, r_sum, row)
r_min = map(min, r_min, row)
r_max = map(max, r_max, row)
r_mean = [ x / len(data) for x in r_sum ]
return [ [ r_mean[0] ] + r_mean[1:] + r_min[1:] + r_max[1:] ]
def decimate(data, interval, args, insert_function, final):
"""Decimate data"""
(factor, again) = args
(n, m) = data.shape
def decimate_again(data, n):
"""Decimate already-decimated data -- result has the same number
of columns"""
rows = iter(data)
r = rows.next()
r_sum = r[0:(n+1)]
r_min = r[(n+1):(2*n+1)]
r_max = r[(2*n+1):(3*n+1)]
for r in rows:
r_sum = map(operator.add, r_sum, r[0:(n+1)])
r_min = map(min, r_min, r[(n+1):(2*n+1)])
r_max = map(max, r_max, r[(2*n+1):(3*n+1)])
r_mean = [ x / len(data) for x in r_sum ]
return [ r_mean + r_min + r_max ]
# Figure out which columns to use as the source for mean, min, and max,
# depending on whether this is the first decimation or we're decimating
# again. Note that we include the timestamp in the means.
if again:
c = (m - 1) // 3
# e.g. c = 3
# ts mean1 mean2 mean3 min1 min2 min3 max1 max2 max3
mean_col = slice(0, c + 1)
min_col = slice(c + 1, 2 * c + 1)
max_col = slice(2 * c + 1, 3 * c + 1)
else:
mean_col = slice(0, m)
min_col = slice(1, m)
max_col = slice(1, m)
# Discard extra rows that aren't a multiple of factor
n = n // factor * factor
data = data[:n,:]
# Reshape it into 3D so we can process 'factor' rows at a time
data = data.reshape(n // factor, factor, m)
# Fill the result
out = np.c_[ np.mean(data[:,:,mean_col], axis=1),
np.min(data[:,:,min_col], axis=1),
np.max(data[:,:,max_col], axis=1) ]
insert_function(out)
return n
if __name__ == "__main__":
main()

View File

@@ -4,6 +4,7 @@ from __future__ import absolute_import
import nilmdb.client
from nilmdb.client import Client
from nilmdb.client.numpyclient import NumpyClient
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds)
@@ -247,72 +248,7 @@ class Filter(object):
# All good -- write the metadata in case it's not already there
self._client_dest.stream_update_metadata(self.dest.path, data)
# Main processing helper
def process_python(self, function, rows, args = None, partial = False):
"""Process data in chunks of 'rows' data at a time.
This provides data as nested Python lists and expects the same
back.
function: function to process the data
rows: maximum number of rows to pass to 'function' at once
args: tuple containing extra arguments to pass to 'function'
partial: if true, less than 'rows' may be passed to 'function'.
if false, partial data at the end of an interval will
be dropped.
'function' should be defined like:
function(data, *args)
It will be passed a list containing up to 'rows' rows of
data from the source stream, and any arguments passed in
'args'. It should transform the data as desired, and return a
new list of rdata, which will be inserted into the destination
stream.
"""
if args is None:
args = []
extractor = Client(self.src.url).stream_extract
inserter = Client(self.dest.url).stream_insert_context
# Parse input data. We use homogenous types for now, which
# means the timestamp type will be either float or int.
if "int" in self.src.layout_type:
parser = lambda line: [ int(x) for x in line.split() ]
else:
parser = lambda line: [ float(x) for x in line.split() ]
# Format output data.
formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"
for interval in self.intervals():
print "Processing", self.interval_string(interval)
with inserter(self.dest.path,
interval.start, interval.end) as insert_ctx:
src_array = []
for line in extractor(self.src.path,
interval.start, interval.end):
# Read in data
src_array.append([ float(x) for x in line.split() ])
if len(src_array) == rows:
# Pass through filter function
dest_array = function(src_array, *args)
# Write result to destination
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("".join(out))
# Clear source array
src_array = []
# Take care of partial chunk
if len(src_array) and partial:
dest_array = function(src_array, *args)
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("".join(out))
# Like process_python, but provides Numpy arrays and allows for
# partial processing.
# The main filter processing method.
def process_numpy(self, function, args = None, rows = 100000):
"""For all intervals that exist in self.src but don't exist in
self.dest, call 'function' with a Numpy array corresponding to
@@ -342,8 +278,8 @@ class Filter(object):
"""
if args is None:
args = []
extractor = Client(self.src.url).stream_extract
inserter = Client(self.dest.url).stream_insert_context
extractor = NumpyClient(self.src.url).stream_extract_numpy
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
# Format output data.
formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"
@@ -357,19 +293,12 @@ class Filter(object):
print "Processing", self.interval_string(interval)
with inserter(self.dest.path,
interval.start, interval.end) as insert_ctx:
def insert_function(array):
s = cStringIO.StringIO()
if len(np.shape(array)) != 2:
raise Exception("array must be 2-dimensional")
np.savetxt(s, array)
insert_ctx.insert(s.getvalue())
extract = extractor(self.src.path, interval.start, interval.end)
insert_function = insert_ctx.insert
old_array = np.array([])
for batched in batch(extract, rows):
# Read in this batch of data
new_array = np.loadtxt(batched)
for new_array in extractor(self.src.path,
interval.start, interval.end,
layout = self.src.layout,
maxrows = rows):
# If we still had old data left, combine it
if old_array.shape[0] != 0:
array = np.vstack((old_array, new_array))

View File

@@ -12,6 +12,7 @@ import sys
import re
import argparse
import subprocess
import textwrap
class ParseError(Exception):
def __init__(self, filename, error):
@@ -22,32 +23,103 @@ def parse_args(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = """\
Insert data from ethstream, either live (using the system time as a
reference) or prerecorded (using comments in the file as a reference).
description = textwrap.dedent("""\
Insert large amount of data from an external source like ethstream.
The data is assumed to have been recorded at the specified rate.
Small discrepencies between the accumulated timestamps and the
reference time are ignored; larger discrepencies cause gaps to be
created in the stream. Overlapping data returns an error.
""")
This code tracks two timestamps:
(1) The 'data' timestamp is the precise timestamp corresponding to
a particular row of data, and is the timestamp that gets
inserted into the database. It increases by 'data_delta' for
every row of input.
'data_delta' can come from one of two sources. If '--delta'
is specified, it is pulled from the first column of data. If
'--rate' is specified, 'data_delta' is set to a fixed value of
(1 / rate).
(2) The 'clock' timestamp is the less precise timestamp that gives
the absolute time. It can come from two sources. If '--live'
is specified, it is pulled directly from the system clock. If
'--file' is specified, it is extracted from the input filename
every time a new file is opened for read, and from comments
that appear in the file.
Small discrepencies between 'data' and 'clock' are ignored. If
the 'data' timestamp ever differs from the 'clock' timestamp by
more than 'max_gap' seconds:
- If 'data' is running behind, there is a gap in the data, so it
is stepped forward to match 'clock'.
- If 'data' is running ahead, there is overlap in the data, and an
error is raised.
"""))
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-r", "--rate", action="store", default=8000,
type=float,
help="Data rate in Hz (default: %(default)s)")
parser.add_argument("-l", "--live", action="store_true",
help="Live capture; use system time to verify rate")
parser.add_argument("path", action="store",
help="Path of stream, e.g. /foo/bar")
parser.add_argument("infile", type=argparse.FileType('r'), nargs='*',
default=[sys.stdin],
help="Input files (default: stdin)")
group = parser.add_argument_group("Misc options")
group.add_argument("-D", "--dry-run", action="store_true",
help="Parse files, but don't insert any data")
group.add_argument("-m", "--max-gap", action="store", default=10.0,
metavar="SEC", type=float,
help="Max discrepency between clock and data "
"timestamps (default: %(default)s)")
group = parser.add_argument_group("Data timestamp delta")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-r", "--rate", action="store", default=8000.0,
type=float,
help="Data_delta is constant 1/RATE "
"(default: %(default)s Hz)")
exc.add_argument("-d", "--delta", action="store_true",
help="Data_delta is the first number in each line")
group = parser.add_argument_group("Clock timestamp source")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-l", "--live", action="store_true",
help="Use live system time for clock timestamp")
exc.add_argument("-f", "--file", action="store_true", default=True,
help="Use filename or comments for clock timestamp")
group.add_argument("-o", "--offset-filename", metavar="SEC",
action="store", default=-3600.0, type=float,
help="Offset to add to filename timestamps "
"(default: %(default)s)")
group.add_argument("-O", "--offset-comment", metavar="SEC",
action="store", default=0.0, type=float,
help="Offset to add to comment timestamps "
"(default: %(default)s)")
group = parser.add_argument_group("Database path")
group.add_argument("path", action="store",
help="Path of stream, e.g. /foo/bar")
group = parser.add_argument_group("Input files")
group.add_argument("infile", type=argparse.FileType('r'), nargs='*',
default=[sys.stdin],
help="Input files (default: stdin)")
args = parser.parse_args(argv)
printf("Stream path: %s\n", args.path)
printf(" Data rate: %s Hz\n", repr(args.rate))
printf(" Stream path: %s\n", args.path)
printf(" Data timestamp: ")
if args.delta:
printf("delta on each input line\n")
else:
printf("fixed rate %s Hz\n", repr(args.rate))
printf(" Clock timestamp: ")
if args.live:
printf("live system clock\n")
else:
printf("from filenames and comments\n")
printf(" Filename offset: %s seconds\n", repr(args.offset_filename))
printf(" Comment offset: %s seconds\n", repr(args.offset_comment))
printf(" Max gap: %s seconds\n", repr(args.max_gap))
if args.dry_run:
printf("Dry run (no data will be inserted)\n")
return args
@@ -56,22 +128,26 @@ def main(argv = None):
client = nilmdb.client.Client(args.url)
# Local copies to save dictionary lookups
live = args.live
# data_ts is the timestamp that we'll use for the current line
data_ts_base = 0
data_ts_inc = 0
data_ts_rate = args.rate
data_ts_delta = 0
def get_data_ts():
if args.delta:
return data_ts_base + data_ts_delta
else:
return data_ts_base + rate_to_period(data_ts_rate,
data_ts_inc)
# clock_ts is the imprecise "real" timestamp (from the filename,
# comments, or or system clock)
# comments, or system clock)
clock_ts = None
def print_clock_updated():
printf("Clock time updated to %s\n", timestamp_to_human(clock_ts))
printf("Clock timestamp updated to %s\n", timestamp_to_human(clock_ts))
if data_ts_base != 0:
diff = data_ts - clock_ts
diff = get_data_ts() - clock_ts
if diff >= 0:
printf(" (data timestamp ahead by %.6f sec)\n",
timestamp_to_seconds(diff))
@@ -79,12 +155,17 @@ def main(argv = None):
printf(" (data timestamp behind by %.6f sec)\n",
timestamp_to_seconds(-diff))
offset_filename = seconds_to_timestamp(args.offset_filename)
offset_comment = seconds_to_timestamp(args.offset_comment)
max_gap = seconds_to_timestamp(args.max_gap)
with client.stream_insert_context(args.path) as stream:
for f in args.infile:
filename = f.name
printf("Processing %s\n", filename)
# If the filename ends in .gz, open it with gzcat instead.
# If the filename ends in .gz, re-open it with gzip to
# decompress.
if filename.endswith(".gz"):
p = subprocess.Popen(["gzip", "-dc"],
stdin = f, stdout = subprocess.PIPE)
@@ -95,7 +176,7 @@ def main(argv = None):
# Subtract 1 hour because files are created at the end
# of the hour. Hopefully, we'll be able to use
# internal comments and this value won't matter anyway.
clock_ts = parse_time(filename) - seconds_to_timestamp(3600)
clock_ts = parse_time(filename) + offset_filename
print_clock_updated()
except ValueError:
pass
@@ -104,8 +185,15 @@ def main(argv = None):
# Read each line
for line in f:
data_ts = data_ts_base + rate_to_period(data_ts_rate,
data_ts_inc)
# Once in a while a line might be truncated, if we're
# at the end of a file. Ignore it, but if we ignore
# too many, bail out.
if line[-1] != '\n':
truncated_lines += 1
if truncated_lines > 3:
raise ParseError(filename, "too many short lines")
printf("Ignoring short line in %s\n", filename)
continue
# If no content other than the newline, skip it
if len(line) <= 1:
@@ -114,20 +202,32 @@ def main(argv = None):
# If line starts with a comment, look for a timestamp
if line[0] == '#':
try:
clock_ts = parse_time(line[1:])
clock_ts = parse_time(line[1:]) + offset_comment
print_clock_updated()
except ValueError:
pass
continue
# If --delta mode, increment data_ts_delta by the
# delta from the file.
if args.delta:
try:
(delta, line) = line.split(None, 1)
data_ts_delta += float(delta)
except ValueError:
raise ParseError(filename, "can't parse delta")
# Calculate data_ts for this row
data_ts = get_data_ts()
# If inserting live, use clock timestamp
if live:
if args.live:
clock_ts = time_now()
# If we have a real timestamp, compare it to the data
# timestamp, and make sure things match up.
if clock_ts is not None:
if (data_ts - seconds_to_timestamp(10)) > clock_ts:
if (data_ts - max_gap) > clock_ts:
# Accumulated line timestamps are in the future.
# If we were to set data_ts=clock_ts, we'd create
# an overlap, so we have to just bail out here.
@@ -137,7 +237,7 @@ def main(argv = None):
timestamp_to_human(clock_ts))
raise ParseError(filename, err)
if (data_ts + seconds_to_timestamp(10)) < clock_ts:
if (data_ts + max_gap) < clock_ts:
# Accumulated line timetamps are in the past. We
# can just skip some time and leave a gap in the
# data.
@@ -148,7 +248,7 @@ def main(argv = None):
timestamp_to_human(clock_ts))
stream.finalize()
data_ts_base = data_ts = clock_ts
data_ts_inc = 0
data_ts_inc = data_ts_delta = 0
# Don't use this clock time anymore until we update it
clock_ts = None
@@ -156,21 +256,12 @@ def main(argv = None):
if data_ts_base == 0:
raise ParseError(filename, "No idea what timestamp to use")
# This line is legit, so increment timestamp
# This line is legit, so increment timestamp (for --rate)
data_ts_inc += 1
# Once in a while a line might be truncated, if we're at
# the end of a file. Ignore it, but if we ignore too many,
# bail out.
if line[-1] != '\n':
truncated_lines += 1
if truncated_lines > 3:
raise ParseError(filename, "too many short lines")
printf("Ignoring short line in %s\n", filename)
continue
# Insert it
stream.insert("%d %s" % (data_ts, line))
if not args.dry_run:
stream.insert("%d %s" % (data_ts, line))
print "Done"
if __name__ == "__main__":

View File

@@ -8,7 +8,7 @@ import nilmdb.client
from numpy import *
import scipy.fftpack
import scipy.signal
from matplotlib import pyplot as p
#from matplotlib import pyplot as p
import bisect
def main(argv = None):
@@ -111,7 +111,7 @@ def process(data, interval, args, insert_function, final):
# Fill output data
out[0, 0] = t_min
for k in range(nharm):
Fk = F[2 * k + 1] * e**(rotation * 1j * k)
Fk = F[2 * k + 1] * e**(rotation * 1j * (k+1))
out[0, 2 * k + 1] = -imag(Fk) # Pk
out[0, 2 * k + 2] = real(Fk) # Qk

View File

@@ -23,7 +23,7 @@ def main(argv = None):
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
rec = "float32_4"
rec = "float32_3"
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"