Compare commits

...

49 Commits

Author SHA1 Message Date
97503b73b9 Fix dependencies 2013-04-08 18:50:27 -04:00
4e64c804bf Merge branch 'binary' 2013-04-08 18:45:21 -04:00
189fb9df3a Use binary interface for copy_one too 2013-04-08 18:45:16 -04:00
3323c997a7 Use the new stream_insert_numpy_context function 2013-04-08 18:39:14 -04:00
e09153e34b Use the new NumpyClient for extracting data in filter 2013-04-07 18:14:35 -04:00
5c56e9d075 Remove ounused process_python function 2013-04-06 16:39:39 -04:00
60f09427cf Update decimate to use process_numpy 2013-04-06 15:56:36 -04:00
d6d31190eb Fix fromstring usage 2013-04-06 13:40:09 -04:00
2ec574c59d Use np.fromstring instead of np.loadtxt 2013-04-06 13:32:16 -04:00
1988955671 Accumulate delta separately from data timestamp 2013-04-05 17:41:48 -04:00
36e5af4be1 Fix data_ts when clock is updated 2013-04-05 16:40:04 -04:00
ca175bd9dd Improve nilm-insert to support deltas, etc, for accelerometer data 2013-04-05 16:13:56 -04:00
aa9656bc10 Fix off-by-one error in prep rotation 2013-04-04 19:23:12 -04:00
10ab2cc2de Build nilm-prep tool 2013-04-04 19:07:18 -04:00
eb6d7a8809 Fix recommended layout 2013-04-04 16:32:38 -04:00
c8be6755ae Update README dependencies 2013-04-04 15:36:30 -04:00
9e321d9e41 Add --force-metadata option; fix bugs 2013-04-04 15:33:57 -04:00
f2bebea5d0 Add copy_wildcard script 2013-04-04 15:33:57 -04:00
d919a73387 Use parser error rather than systemexit exception 2013-04-04 15:33:57 -04:00
17fa79a5dc Makefile updates 2013-04-04 15:33:57 -04:00
ca970fa1fd Rename copy_ to copy_one 2013-04-04 15:33:57 -04:00
805d8fb24f Add prep 2013-04-03 21:04:03 -04:00
05da75e34a Allow processing 0 lines; warn if data is building up 2013-04-03 21:03:44 -04:00
56e778df71 Add skip_paths option to filter.setup_parser 2013-04-03 21:03:29 -04:00
87178e9599 self.args is used for other stuff in an exception 2013-04-03 21:03:17 -04:00
f8b1a001c3 makefile updates 2013-04-03 21:02:42 -04:00
7e88da3c26 Bring get_stream_info into nilmtools.filter module; use it 2013-04-02 18:04:28 -04:00
b637f17887 Add nilm-decimate-auto script 2013-04-02 16:13:44 -04:00
9a7a1df537 Simplify StreamInfo constructor 2013-04-02 15:47:43 -04:00
101b701882 Fix sys.argv stuff; remove outdated shell scripts 2013-04-01 18:40:48 -04:00
457c518809 Add some docs about building new tools 2013-04-01 18:14:59 -04:00
3eff3d81fe Change default URL to http://localhost/nilmdb/ 2013-04-01 18:03:43 -04:00
a56dc22030 Allow arguments to be passed in as a list to main() functions 2013-04-01 18:03:22 -04:00
9b770cd28f Note speed 2013-04-01 16:39:23 -04:00
348c435d1e Catch arctan2 errors 2013-04-01 16:38:50 -04:00
7f1c1a6c32 Add nilm-sinefit script link 2013-03-30 22:53:29 -04:00
bdfc29887b Proper window advancement & insertion for sine fit; comment out plots 2013-03-30 22:51:54 -04:00
4e5907f381 Send pending data after each block 2013-03-30 22:51:37 -04:00
9078a014ae Require 2-dimensional array in insert_function 2013-03-30 22:51:20 -04:00
533892e624 Update Makefile and setup.py dependencies 2013-03-30 22:50:55 -04:00
e0f973b449 Update makefile and setup.py requirements 2013-03-30 17:30:10 -04:00
698cb6ef26 More work on sinefit 2013-03-28 14:08:50 -04:00
1db38cc5da Port sfit4 function to Python, start writing sinefit filter 2013-03-26 19:38:53 -04:00
a984e54f23 Add process_numpy function to filter.py
This converts data into big Numpy arrays and lets the user-provided
function process as much as they'd like at a time.
2013-03-26 19:38:05 -04:00
974c9a3050 Fix error message when decimate target is missing 2013-03-22 14:59:08 -04:00
320c32cfdc Sample script for copying wildcard paths between hosts 2013-03-19 17:55:18 -04:00
0f1e442cd4 Support filtering (or copying) between two different hosts 2013-03-19 17:54:59 -04:00
3e78da12dc Rename copy.py to avoid stupid Python conflicts
Any other module (in this case, nilmdb -> datetime_tz -> pytz -> gettext)
that does an "import copy" would pull in the copy.py, if it's in the
current working directory.  Python is dumb...
2013-03-19 16:19:14 -04:00
ef9277cbff Rename nilmtools directory to just src 2013-03-19 15:42:29 -04:00
18 changed files with 1229 additions and 527 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,6 @@
oldprep
newprep
*.dat
build/
*.pyc
dist/

View File

@@ -1,13 +1,40 @@
#URL="http://bucket.mit.edu:8080/nilmdb"
URL="http://localhost/nilmdb"
all:
ifeq ($(INSIDE_EMACS), t)
@make test
else
@echo "Try 'make install'"
endif
test:
src/decimate.py
test_insert:
@make install >/dev/null
src/insert.py --file --dry-run /test/foo </dev/null
test_copy:
@make install >/dev/null
src/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees*
test_prep:
@make install >/dev/null
src/prep.py -c 3 \
/lees-compressor/no-leak/raw \
/lees-compressor/no-leak/sinefit \
/lees-compressor/no-leak/prep \
-s '2013-02-19 18:00:00' \
-r 0
test_decimate:
-@nilmtool destroy /lees-compressor/no-leak/raw/4 || true
-@nilmtool destroy /lees-compressor/no-leak/raw/16 || true
-@nilmtool create /lees-compressor/no-leak/raw/4 float32_18 || true
-@nilmtool create /lees-compressor/no-leak/raw/16 float32_18 || true
time python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
all:
@echo "Try 'make install'"
time python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
version:
python setup.py version

View File

@@ -6,9 +6,22 @@ Prerequisites:
# Runtime and build environments
sudo apt-get install python2.7 python2.7-dev python-setuptools
sudo apt-get install python-numpy python-scipy python-matplotlib
nilmdb (1.3.1+)
nilmdb (1.5.0+)
Install:
python setup.py install
Building new tools:
The tools in this package are meant to be installed with
"python setup.py install". If you want to make a new one,
an easier way to develop would be to first install this package,
and then copy a specific script like "src/sinefit.py" to a new
location, and modify it as desired.
To add a tool to the package, place it in "src/" and add the
appropriate configuration to "setup.py".

View File

@@ -1,79 +0,0 @@
#!/usr/bin/python
import nilmtools.filter
import nilmdb.client
import numpy as np
import operator
def main():
f = nilmtools.filter.Filter()
parser = f.setup_parser("Decimate a stream")
group = parser.add_argument_group("Decimate options")
group.add_argument('-f', '--factor', action='store', default=4, type=int,
help='Decimation factor (default: %(default)s)')
# Parse arguments
try:
args = f.parse_args()
except nilmtools.filter.MissingDestination as e:
# If no destination, suggest how to create it by figuring out
# a recommended layout.
src = e.src
dest = e.dest
print "Source is %s (%s)" % (src.path, src.layout)
print "Destination %s doesn't exist" % (dest.path)
if "decimate_source" in f.client.stream_get_metadata(src.path):
rec = src.layout
elif 'int32' in src.layout_type or 'float64' in src.layout_type:
rec = 'float64_' + str(src.layout_count * 3)
else:
rec = 'float32_' + str(src.layout_count * 3)
print "You could make it with a command like:"
print " nilmtool create", dest.path, rec
raise SystemExit(1)
if not (args.factor >= 2):
raise Exception("factor needs to be 2 or more")
f.check_dest_metadata({ "decimate_source": args.srcpath,
"decimate_factor": args.factor })
# If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client.stream_get_metadata(args.srcpath):
n = f.src.layout_count // 3
f.process_python(function = decimate_again, rows = args.factor,
args = (n,))
else:
n = f.src.layout_count
f.process_python(function = decimate_first, rows = args.factor,
args = (n,))
def decimate_first(data, n):
"""Decimate original data -- result has 3 times as many columns"""
# For this simple calculation, converting to a Numpy array
# and doing the math is slower than just doing it directly.
rows = iter(data)
r_sum = r_min = r_max = rows.next()
for row in rows:
r_sum = map(operator.add, r_sum, row)
r_min = map(min, r_min, row)
r_max = map(max, r_max, row)
r_mean = [ x / len(data) for x in r_sum ]
return [ [ r_mean[0] ] + r_mean[1:] + r_min[1:] + r_max[1:] ]
def decimate_again(data, n):
"""Decimate already-decimated data -- result has the same number
of columns"""
rows = iter(data)
r = rows.next()
r_sum = r[0:(n+1)]
r_min = r[(n+1):(2*n+1)]
r_max = r[(2*n+1):(3*n+1)]
for r in rows:
r_sum = map(operator.add, r_sum, r[0:(n+1)])
r_min = map(min, r_min, r[(n+1):(2*n+1)])
r_max = map(max, r_max, r[(2*n+1):(3*n+1)])
r_mean = [ x / len(data) for x in r_sum ]
return [ r_mean + r_min + r_max ]
if __name__ == "__main__":
main()

View File

@@ -1,245 +0,0 @@
#!/usr/bin/python
import nilmdb.client
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds)
from nilmdb.utils.interval import Interval
import nilmtools
import itertools
import time
import sys
import re
import argparse
class MissingDestination(Exception):
def __init__(self, src, dest):
self.src = src
self.dest = dest
Exception.__init__(self, "destination path " + dest.path + " not found")
class StreamInfo(object):
def __init__(self, info):
self.info = info
try:
self.path = info[0]
self.layout = info[1]
self.layout_type = self.layout.split('_')[0]
self.layout_count = int(self.layout.split('_')[1])
self.total_count = self.layout_count + 1
self.timestamp_min = info[2]
self.timestamp_max = info[3]
self.rows = info[4]
self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5])
except IndexError, TypeError:
pass
def __str__(self):
"""Print stream info as a string"""
return sprintf("%s (%s), %.2fM rows, %.2f hours",
self.path, self.layout, self.rows / 1e6,
self.seconds / 3600.0)
class Filter(object):
def __init__(self):
self._parser = None
self._args = None
self._client = None
self._using_client = False
self.src = None
self.dest = None
@property
def client(self):
if self._using_client:
raise Exception("Filter client is in use; make another")
return self._client
def setup_parser(self, description = "Filter data"):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = description)
group = parser.add_argument_group("General filter arguments")
group.add_argument("-u", "--url", action="store",
default="http://localhost:12380/",
help="Server URL (default: %(default)s)")
group.add_argument("-D", "--dry-run", action="store_true",
default = False,
help="Just print intervals that would be "
"processed")
group.add_argument("-s", "--start",
metavar="TIME", type=self.arg_time,
help="Starting timestamp for intervals "
"(free-form, inclusive)")
group.add_argument("-e", "--end",
metavar="TIME", type=self.arg_time,
help="Ending timestamp for intervals "
"(free-form, noninclusive)")
group.add_argument("srcpath", action="store",
help="Path of source stream, e.g. /foo/bar")
group.add_argument("destpath", action="store",
help="Path of destination stream, e.g. /foo/bar")
self._parser = parser
return parser
def interval_string(self, interval):
return sprintf("[ %s -> %s ]",
timestamp_to_human(interval.start),
timestamp_to_human(interval.end))
def parse_args(self):
args = self._parser.parse_args()
self._args = args
self._client = nilmdb.client.Client(args.url)
if args.srcpath == args.destpath:
raise Exception("source and destination path must be different")
# Open and print info about the streams
src = self._client.stream_list(args.srcpath, extended = True)
if len(src) != 1:
raise Exception("source path " + args.srcpath + " not found")
self.src = StreamInfo(src[0])
dest = self._client.stream_list(args.destpath, extended = True)
if len(dest) != 1:
raise MissingDestination(self.src, StreamInfo([args.destpath]))
self.dest = StreamInfo(dest[0])
print "Source:", self.src
print " Dest:", self.dest
if args.dry_run:
for interval in self.intervals():
print self.interval_string(interval)
raise SystemExit(0)
return args
def intervals(self):
"""Generate all the intervals that this filter should process"""
self._using_client = True
saved_int = None
for (start, end) in self._client.stream_intervals(
self._args.srcpath, diffpath = self._args.destpath,
start = self._args.start, end = self._args.end):
# Join adjacent intervals
if saved_int is not None:
if saved_int.end == start:
start = saved_int.start
else:
yield saved_int
saved_int = Interval(start, end)
if saved_int is not None:
yield saved_int
self._using_client = False
# Misc helpers
def arg_time(self, toparse):
"""Parse a time string argument"""
try:
return nilmdb.utils.time.parse_time(toparse)
except ValueError as e:
raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
str(e), toparse))
def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. If
there's no conflict, update the metadata to match 'data'."""
metadata = self._client.stream_get_metadata(self._args.destpath)
for key in data:
wanted = str(data[key])
val = metadata.get(key, wanted)
if val != wanted and self.dest.rows > 0:
m = "Metadata in destination stream:\n"
m += " %s = %s\n" % (key, val)
m += "doesn't match desired data:\n"
m += " %s = %s\n" % (key, wanted)
m += "Refusing to change it. You can change the stream's "
m += "metadata manually, or\n"
m += "remove existing data from the stream, to prevent "
m += "this error.\n"
raise Exception(m)
# All good -- write the metadata in case it's not already there
self._client.stream_update_metadata(self._args.destpath, data)
# Main processing helper
def process_python(self, function, rows, args = None, partial = False):
"""Process data in chunks of 'rows' data at a time.
This provides data as nested Python lists and expects the same
back.
function: function to process the data
rows: maximum number of rows to pass to 'function' at once
args: tuple containing extra arguments to pass to 'function'
partial: if true, less than 'rows' may be passed to 'function'.
if false, partial data at the end of an interval will
be dropped.
'function' should be defined like:
function(data, *args)
It will be passed a list containing up to 'rows' rows of
data from the source stream, and any arguments passed in
'args'. It should transform the data as desired, and return a
new list of rdata, which will be inserted into the destination
stream.
"""
if args is None:
args = []
extractor = nilmdb.client.Client(self._args.url).stream_extract
inserter = nilmdb.client.Client(self._args.url).stream_insert_context
src = self._args.srcpath
dest = self._args.destpath
# Parse input data. We use homogenous types for now, which
# means the timestamp type will be either float or int.
if "int" in self.src.layout_type:
parser = lambda line: [ int(x) for x in line.split() ]
else:
parser = lambda line: [ float(x) for x in line.split() ]
# Format output data.
formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"
for interval in self.intervals():
print "Processing", self.interval_string(interval)
with inserter(dest, interval.start, interval.end) as insert_ctx:
src_array = []
for line in extractor(src, interval.start, interval.end):
# Read in data
src_array.append([ float(x) for x in line.split() ])
if len(src_array) == rows:
# Pass through filter function
dest_array = function(src_array, *args)
# Write result to destination
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("".join(out))
# Clear source array
src_array = []
# Take care of partial chunk
if len(src_array) and partial:
dest_array = function(src_array, *args)
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("".join(out))
def main():
# This is just a dummy function; actual filters can use the other
# functions to prepare stuff, and then do something with the data.
f = Filter()
parser = f.setup_parser()
args = f.parse_args()
for i in f.intervals():
print "Generic filter: need to handle", f.interval_string(i)
if __name__ == "__main__":
main()

View File

@@ -1,178 +0,0 @@
#!/usr/bin/python
import nilmdb.client
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds, seconds_to_timestamp,
rate_to_period, now as time_now)
import nilmtools
import time
import sys
import re
import argparse
import subprocess
class ParseError(Exception):
def __init__(self, filename, error):
msg = filename + ": " + error
super(ParseError, self).__init__(msg)
def parse_args():
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = """\
Insert data from ethstream, either live (using the system time as a
reference) or prerecorded (using comments in the file as a reference).
The data is assumed to have been recorded at the specified rate.
Small discrepencies between the accumulated timestamps and the
reference time are ignored; larger discrepencies cause gaps to be
created in the stream. Overlapping data returns an error.
""")
parser.add_argument("-u", "--url", action="store",
default="http://localhost:12380/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-r", "--rate", action="store", default=8000,
type=float,
help="Data rate in Hz (default: %(default)s)")
parser.add_argument("-l", "--live", action="store_true",
help="Live capture; use system time to verify rate")
parser.add_argument("path", action="store",
help="Path of stream, e.g. /foo/bar")
parser.add_argument("infile", type=argparse.FileType('r'), nargs='*',
default=[sys.stdin],
help="Input files (default: stdin)")
args = parser.parse_args()
printf("Stream path: %s\n", args.path)
printf(" Data rate: %s Hz\n", repr(args.rate))
return args
def main(args = None):
if args is None:
args = parse_args()
client = nilmdb.client.Client(args.url)
# Local copies to save dictionary lookups
live = args.live
# data_ts is the timestamp that we'll use for the current line
data_ts_base = 0
data_ts_inc = 0
data_ts_rate = args.rate
# clock_ts is the imprecise "real" timestamp (from the filename,
# comments, or or system clock)
clock_ts = None
def print_clock_updated():
printf("Clock time updated to %s\n", timestamp_to_human(clock_ts))
if data_ts_base != 0:
diff = data_ts - clock_ts
if diff >= 0:
printf(" (data timestamp ahead by %.6f sec)\n",
timestamp_to_seconds(diff))
else:
printf(" (data timestamp behind by %.6f sec)\n",
timestamp_to_seconds(-diff))
with client.stream_insert_context(args.path) as stream:
for f in args.infile:
filename = f.name
printf("Processing %s\n", filename)
# If the filename ends in .gz, open it with gzcat instead.
if filename.endswith(".gz"):
p = subprocess.Popen(["gzip", "-dc"],
stdin = f, stdout = subprocess.PIPE)
f = p.stdout
# Try to get a real timestamp from the filename
try:
# Subtract 1 hour because files are created at the end
# of the hour. Hopefully, we'll be able to use
# internal comments and this value won't matter anyway.
clock_ts = parse_time(filename) - seconds_to_timestamp(3600)
print_clock_updated()
except ValueError:
pass
truncated_lines = 0
# Read each line
for line in f:
data_ts = data_ts_base + rate_to_period(data_ts_rate,
data_ts_inc)
# If no content other than the newline, skip it
if len(line) <= 1:
continue
# If line starts with a comment, look for a timestamp
if line[0] == '#':
try:
clock_ts = parse_time(line[1:])
print_clock_updated()
except ValueError:
pass
continue
# If inserting live, use clock timestamp
if live:
clock_ts = time_now()
# If we have a real timestamp, compare it to the data
# timestamp, and make sure things match up.
if clock_ts is not None:
if (data_ts - seconds_to_timestamp(10)) > clock_ts:
# Accumulated line timestamps are in the future.
# If we were to set data_ts=clock_ts, we'd create
# an overlap, so we have to just bail out here.
err = sprintf("Data is coming in too fast: data time "
"is %s but clock time is only %s",
timestamp_to_human(data_ts),
timestamp_to_human(clock_ts))
raise ParseError(filename, err)
if (data_ts + seconds_to_timestamp(10)) < clock_ts:
# Accumulated line timetamps are in the past. We
# can just skip some time and leave a gap in the
# data.
if data_ts_base != 0:
printf("Skipping data timestamp forward from "
"%s to %s to match clock time\n",
timestamp_to_human(data_ts),
timestamp_to_human(clock_ts))
stream.finalize()
data_ts_base = data_ts = clock_ts
data_ts_inc = 0
# Don't use this clock time anymore until we update it
clock_ts = None
if data_ts_base == 0:
raise ParseError(filename, "No idea what timestamp to use")
# This line is legit, so increment timestamp
data_ts_inc += 1
# Once in a while a line might be truncated, if we're at
# the end of a file. Ignore it, but if we ignore too many,
# bail out.
if line[-1] != '\n':
truncated_lines += 1
if truncated_lines > 3:
raise ParseError(filename, "too many short lines")
printf("Ignoring short line in %s\n", filename)
continue
# Insert it
stream.insert("%d %s" % (data_ts, line))
print "Done"
if __name__ == "__main__":
main()

View File

@@ -1,6 +0,0 @@
#!/bin/bash
for i in "$@"; do
nilmtool create /lees-compressor/no-leak/raw/$((4**$i)) float32_18
/usr/bin/time -v nilm-decimate /lees-compressor/no-leak/raw/$((4**($i-1))) /lees-compressor/no-leak/raw/$((4**$i))
done

View File

@@ -30,7 +30,7 @@ except ImportError:
# Versioneer manages version numbers from git tags.
# https://github.com/warner/python-versioneer
import versioneer
versioneer.versionfile_source = 'nilmtools/_version.py'
versioneer.versionfile_source = 'src/_version.py'
versioneer.versionfile_build = 'nilmtools/_version.py'
versioneer.tag_prefix = 'nilmtools-'
versioneer.parentdir_prefix = 'nilmtools-'
@@ -61,15 +61,23 @@ setup(name='nilmtools',
long_description = "NILM Database Tools",
license = "Proprietary",
author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.3.0',
install_requires = [ 'nilmdb >= 1.5.0',
'numpy',
'scipy',
'matplotlib',
],
packages = [ 'nilmtools',
],
package_dir = { 'nilmtools': 'src' },
entry_points = {
'console_scripts': [
'nilm-decimate = nilmtools.decimate:main',
'nilm-decimate-auto = nilmtools.decimate_auto:main',
'nilm-insert = nilmtools.insert:main',
'nilm-copy = nilmtools.copy:main',
'nilm-copy = nilmtools.copy_one:main',
'nilm-prep = nilmtools.prep:main',
'nilm-copy-wildcard = nilmtools.copy_wildcard:main',
'nilm-sinefit = nilmtools.sinefit:main',
],
},
zip_safe = False,

View File

@@ -181,7 +181,7 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False)
tag_prefix = "nilmtools-"
parentdir_prefix = "nilmtools-"
versionfile_source = "nilmtools/_version.py"
versionfile_source = "src/_version.py"
def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
variables = { "refnames": git_refnames, "full": git_full }

View File

@@ -1,36 +1,41 @@
#!/usr/bin/python
# This is called copy_one instead of copy to avoid name conflicts with
# the Python standard library.
import nilmtools.filter
import nilmdb.client
from nilmdb.client.numpyclient import NumpyClient
import numpy as np
import sys
def main():
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Copy a stream")
# Parse arguments
try:
args = f.parse_args()
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool create", e.dest.path, e.src.layout
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, e.src.layout)
raise SystemExit(1)
# Copy metadata
meta = f.client.stream_get_metadata(f.src.path)
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata(meta)
# Copy all rows of data as ASCII strings
extractor = nilmdb.client.Client(args.url).stream_extract
inserter = nilmdb.client.Client(args.url).stream_insert_context
# Copy all rows of data using the faster Numpy interfaces
extractor = NumpyClient(f.src.url).stream_extract_numpy
inserter = NumpyClient(f.dest.url).stream_insert_numpy_context
for i in f.intervals():
print "Processing", f.interval_string(i)
with inserter(f.dest.path, i.start, i.end) as insert_ctx:
for row in extractor(f.src.path, i.start, i.end):
insert_ctx.insert(row + "\n")
for data in extractor(f.src.path, i.start, i.end):
insert_ctx.insert(data)
if __name__ == "__main__":
main()

70
src/copy_wildcard.py Executable file
View File

@@ -0,0 +1,70 @@
#!/usr/bin/python
# Copy streams between NilmDB servers with wildcards
import nilmtools.filter
import nilmtools.copy_one
import nilmdb.client
import argparse
import fnmatch
def main(argv = None):
f = nilmtools.filter.Filter()
# Reuse filter's parser, since it handles most options we need.
parser = f.setup_parser(description = """\
Copy all streams matching the given wildcard from one host to another.
Example: %(prog)s -u http://host1/nilmdb -U http://host2/nilmdb /sharon/*
""", skip_paths = True)
parser.add_argument("path", action="store", nargs="+",
help='Wildcard paths to copy')
args = parser.parse_args(argv)
# Verify arguments
if args.dest_url is None:
parser.error("must provide both source and destination URL")
client_src = nilmdb.client.Client(args.url)
client_dest = nilmdb.client.Client(args.dest_url)
if client_src.geturl() == client_dest.geturl():
parser.error("source and destination URL must be different")
print "Source URL:", client_src.geturl()
print " Dest URL:", client_dest.geturl()
# Find matching streams
matched = []
for path in args.path:
matched.extend([s for s in client_src.stream_list(extended = True)
if fnmatch.fnmatch(s[0], path)
and s not in matched])
# Create destination streams if they don't exist
for stream in matched:
src = nilmtools.filter.StreamInfo(client_src.geturl(), stream)
dest = nilmtools.filter.get_stream_info(client_dest, src.path)
if not dest:
print "Creating destination stream", src.path
client_dest.stream_create(src.path, src.layout)
# Copy them all by running the "copy" tool as if it were
# invoked from the command line.
for stream in matched:
new_argv = ["--url", client_src.geturl(),
"--dest-url", client_dest.geturl() ]
if args.start:
new_argv.extend(["--start", "@" + repr(args.start)])
if args.end:
new_argv.extend(["--end", "@" + repr(args.end)])
if args.dry_run:
new_argv.extend(["--dry-run"])
if args.force_metadata:
new_argv.extend(["--force-metadata"])
new_argv.extend([stream[0], stream[0]])
try:
nilmtools.copy_one.main(new_argv)
except SystemExit as e:
# Ignore SystemExit which could be raised on --dry-run
if e.code != 0:
raise
if __name__ == "__main__":
main()

85
src/decimate.py Executable file
View File

@@ -0,0 +1,85 @@
#!/usr/bin/python
import nilmtools.filter
import nilmdb.client
import numpy as np
import operator
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Decimate a stream")
group = parser.add_argument_group("Decimate options")
group.add_argument('-f', '--factor', action='store', default=4, type=int,
help='Decimation factor (default: %(default)s)')
# Parse arguments
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
# If no destination, suggest how to create it by figuring out
# a recommended layout.
src = e.src
dest = e.dest
print "Source is %s (%s)" % (src.path, src.layout)
print "Destination %s doesn't exist" % (dest.path)
if "decimate_source" in f.client_src.stream_get_metadata(src.path):
rec = src.layout
elif 'int32' in src.layout_type or 'float64' in src.layout_type:
rec = 'float64_' + str(src.layout_count * 3)
else:
rec = 'float32_' + str(src.layout_count * 3)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, rec)
raise SystemExit(1)
if not (args.factor >= 2):
raise Exception("factor needs to be 2 or more")
f.check_dest_metadata({ "decimate_source": f.src.path,
"decimate_factor": args.factor })
# If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath):
again = True
else:
again = False
f.process_numpy(decimate, args = (args.factor, again))
def decimate(data, interval, args, insert_function, final):
"""Decimate data"""
(factor, again) = args
(n, m) = data.shape
# Figure out which columns to use as the source for mean, min, and max,
# depending on whether this is the first decimation or we're decimating
# again. Note that we include the timestamp in the means.
if again:
c = (m - 1) // 3
# e.g. c = 3
# ts mean1 mean2 mean3 min1 min2 min3 max1 max2 max3
mean_col = slice(0, c + 1)
min_col = slice(c + 1, 2 * c + 1)
max_col = slice(2 * c + 1, 3 * c + 1)
else:
mean_col = slice(0, m)
min_col = slice(1, m)
max_col = slice(1, m)
# Discard extra rows that aren't a multiple of factor
n = n // factor * factor
data = data[:n,:]
# Reshape it into 3D so we can process 'factor' rows at a time
data = data.reshape(n // factor, factor, m)
# Fill the result
out = np.c_[ np.mean(data[:,:,mean_col], axis=1),
np.min(data[:,:,min_col], axis=1),
np.max(data[:,:,max_col], axis=1) ]
insert_function(out)
return n
if __name__ == "__main__":
main()

76
src/decimate_auto.py Executable file
View File

@@ -0,0 +1,76 @@
#!/usr/bin/python
import nilmtools.filter
import nilmtools.decimate
import nilmdb.client
import argparse
def main(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = "1.0",
description = """\
Automatically create multiple decimations from a single source
stream, continuing until the last decimated level contains fewer
than 500 points total.
""")
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument('-f', '--factor', action='store', default=4, type=int,
help='Decimation factor (default: %(default)s)')
parser.add_argument("--force-metadata", action="store_true",
default = False,
help="Force metadata changes if the dest "
"doesn't match")
parser.add_argument("path", action="store",
help='Path of base stream')
args = parser.parse_args(argv)
# Pull out info about the base stream
client = nilmdb.client.Client(args.url)
info = nilmtools.filter.get_stream_info(client, args.path)
if not info:
raise Exception("path " + args.path + " not found")
meta = client.stream_get_metadata(args.path)
if "decimate_source" in meta:
print "Stream", args.path, "was decimated from", meta["decimate_source"]
print "You need to pass the base stream instead"
raise SystemExit(1)
# Figure out the type we should use for decimated streams
if 'int32' in info.layout_type or 'float64' in info.layout_type:
decimated_type = 'float64_' + str(info.layout_count * 3)
else:
decimated_type = 'float32_' + str(info.layout_count * 3)
# Now do the decimations until we have few enough points
factor = 1
while True:
print "Level", factor, "decimation has", info.rows, "rows"
if info.rows <= 500:
break
factor *= args.factor
new_path = "%s~decim-%d" % (args.path, factor)
# Create the stream if needed
new_info = nilmtools.filter.get_stream_info(client, new_path)
if not new_info:
print "Creating stream", new_path
client.stream_create(new_path, decimated_type)
# Run the decimation as if it were run from the commandline
new_argv = [ "-u", args.url,
"-f", str(args.factor) ]
if args.force_metadata:
new_argv.extend([ "--force-metadata" ])
new_argv.extend([info.path, new_path])
nilmtools.decimate.main(new_argv)
# Update info using the newly decimated stream
info = nilmtools.filter.get_stream_info(client, new_path)
if __name__ == "__main__":
main()

342
src/filter.py Normal file
View File

@@ -0,0 +1,342 @@
#!/usr/bin/python
from __future__ import absolute_import
import nilmdb.client
from nilmdb.client import Client
from nilmdb.client.numpyclient import NumpyClient
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds)
from nilmdb.utils.interval import Interval
import nilmtools
import itertools
import time
import sys
import re
import argparse
import numpy as np
import cStringIO
class MissingDestination(Exception):
def __init__(self, args, src, dest):
self.parsed_args = args
self.src = src
self.dest = dest
Exception.__init__(self, "destination path " + dest.path + " not found")
class StreamInfo(object):
def __init__(self, url, info):
self.url = url
self.info = info
try:
self.path = info[0]
self.layout = info[1]
self.layout_type = self.layout.split('_')[0]
self.layout_count = int(self.layout.split('_')[1])
self.total_count = self.layout_count + 1
self.timestamp_min = info[2]
self.timestamp_max = info[3]
self.rows = info[4]
self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5])
except IndexError, TypeError:
pass
def string(self, interhost):
"""Return stream info as a string. If interhost is true,
include the host URL."""
if interhost:
return sprintf("[%s] ", self.url) + str(self)
return str(self)
def __str__(self):
"""Return stream info as a string."""
return sprintf("%s (%s), %.2fM rows, %.2f hours",
self.path, self.layout, self.rows / 1e6,
self.seconds / 3600.0)
def get_stream_info(client, path):
"""Return a StreamInfo object about the given path, or None if it
doesn't exist"""
streams = client.stream_list(path, extended = True)
if len(streams) != 1:
return None
return StreamInfo(client.geturl(), streams[0])
class Filter(object):
def __init__(self):
self._parser = None
self._client_src = None
self._client_dest = None
self._using_client = False
self.src = None
self.dest = None
self.start = None
self.end = None
self.interhost = False
self.force_metadata = False
@property
def client_src(self):
if self._using_client:
raise Exception("Filter client is in use; make another")
return self._client_src
@property
def client_dest(self):
if self._using_client:
raise Exception("Filter client is in use; make another")
return self._client_dest
def setup_parser(self, description = "Filter data", skip_paths = False):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = description)
group = parser.add_argument_group("General filter arguments")
group.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="Server URL (default: %(default)s)")
group.add_argument("-U", "--dest-url", action="store",
help="Destination server URL "
"(default: same as source)")
group.add_argument("-D", "--dry-run", action="store_true",
default = False,
help="Just print intervals that would be "
"processed")
group.add_argument("--force-metadata", action="store_true",
default = False,
help="Force metadata changes if the dest "
"doesn't match")
group.add_argument("-s", "--start",
metavar="TIME", type=self.arg_time,
help="Starting timestamp for intervals "
"(free-form, inclusive)")
group.add_argument("-e", "--end",
metavar="TIME", type=self.arg_time,
help="Ending timestamp for intervals "
"(free-form, noninclusive)")
if not skip_paths:
# Individual filter scripts might want to add these arguments
# themselves, to include multiple sources in a different order
# (for example). "srcpath" and "destpath" arguments must exist,
# though.
group.add_argument("srcpath", action="store",
help="Path of source stream, e.g. /foo/bar")
group.add_argument("destpath", action="store",
help="Path of destination stream, e.g. /foo/bar")
self._parser = parser
return parser
def interval_string(self, interval):
return sprintf("[ %s -> %s ]",
timestamp_to_human(interval.start),
timestamp_to_human(interval.end))
def parse_args(self, argv = None):
args = self._parser.parse_args(argv)
if args.dest_url is None:
args.dest_url = args.url
if args.url != args.dest_url:
self.interhost = True
self._client_src = Client(args.url)
self._client_dest = Client(args.dest_url)
if (not self.interhost) and (args.srcpath == args.destpath):
self._parser.error("source and destination path must be different")
# Open and print info about the streams
self.src = get_stream_info(self._client_src, args.srcpath)
if not self.src:
self._parser.error("source path " + args.srcpath + " not found")
self.dest = get_stream_info(self._client_dest, args.destpath)
if not self.dest:
raise MissingDestination(args, self.src,
StreamInfo(args.dest_url, [args.destpath]))
print "Source:", self.src.string(self.interhost)
print " Dest:", self.dest.string(self.interhost)
if args.dry_run:
for interval in self.intervals():
print self.interval_string(interval)
raise SystemExit(0)
self.force_metadata = args.force_metadata
self.start = args.start
self.end = args.end
return args
def _optimize_int(self, it):
"""Join and yield adjacent intervals from the iterator 'it'"""
saved_int = None
for interval in it:
if saved_int is not None:
if saved_int.end == interval.start:
interval.start = saved_int.start
else:
yield saved_int
saved_int = interval
if saved_int is not None:
yield saved_int
def intervals(self):
"""Generate all the intervals that this filter should process"""
self._using_client = True
if self.interhost:
# Do the difference ourselves
s_intervals = ( Interval(start, end)
for (start, end) in
self._client_src.stream_intervals(
self.src.path,
start = self.start, end = self.end) )
d_intervals = ( Interval(start, end)
for (start, end) in
self._client_dest.stream_intervals(
self.dest.path,
start = self.start, end = self.end) )
intervals = nilmdb.utils.interval.set_difference(s_intervals,
d_intervals)
else:
# Let the server do the difference for us
intervals = ( Interval(start, end)
for (start, end) in
self._client_src.stream_intervals(
self.src.path, diffpath = self.dest.path,
start = self.start, end = self.end) )
# Optimize intervals: join intervals that are adjacent
for interval in self._optimize_int(intervals):
yield interval
self._using_client = False
# Misc helpers
def arg_time(self, toparse):
"""Parse a time string argument"""
try:
return nilmdb.utils.time.parse_time(toparse)
except ValueError as e:
raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
str(e), toparse))
def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. If
there's no conflict, update the metadata to match 'data'."""
metadata = self._client_dest.stream_get_metadata(self.dest.path)
if not self.force_metadata:
for key in data:
wanted = str(data[key])
val = metadata.get(key, wanted)
if val != wanted and self.dest.rows > 0:
m = "Metadata in destination stream:\n"
m += " %s = %s\n" % (key, val)
m += "doesn't match desired data:\n"
m += " %s = %s\n" % (key, wanted)
m += "Refusing to change it. To prevent this error, "
m += "change or delete the metadata with nilmtool,\n"
m += "remove existing data from the stream, or "
m += "retry with --force-metadata."
raise Exception(m)
# All good -- write the metadata in case it's not already there
self._client_dest.stream_update_metadata(self.dest.path, data)
# The main filter processing method.
def process_numpy(self, function, args = None, rows = 100000):
"""For all intervals that exist in self.src but don't exist in
self.dest, call 'function' with a Numpy array corresponding to
the data. The data is converted to a Numpy array in chunks of
'rows' rows at a time.
'function' should be defined as:
def function(data, interval, args, insert_func, final)
'data': array of data to process -- may be empty
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
"""
if args is None:
args = []
extractor = NumpyClient(self.src.url).stream_extract_numpy
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
# Format output data.
formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"
def batch(iterable, size):
c = itertools.count()
for k, g in itertools.groupby(iterable, lambda x: c.next() // size):
yield g
for interval in self.intervals():
print "Processing", self.interval_string(interval)
with inserter(self.dest.path,
interval.start, interval.end) as insert_ctx:
insert_function = insert_ctx.insert
old_array = np.array([])
for new_array in extractor(self.src.path,
interval.start, interval.end,
layout = self.src.layout,
maxrows = rows):
# If we still had old data left, combine it
if old_array.shape[0] != 0:
array = np.vstack((old_array, new_array))
else:
array = new_array
# Pass it to the process function
processed = function(array, interval, args,
insert_function, False)
# Send any pending data
insert_ctx.send()
# Save the unprocessed parts
if processed >= 0:
old_array = array[processed:]
else:
raise Exception(
sprintf("%s return value %s must be >= 0",
str(function), str(processed)))
# Warn if there's too much data remaining
if old_array.shape[0] > 3 * rows:
printf("warning: %d unprocessed rows in buffer\n",
old_array.shape[0])
# Last call for this contiguous interval
if old_array.shape[0] != 0:
function(old_array, interval, args, insert_function, True)
def main(argv = None):
# This is just a dummy function; actual filters can use the other
# functions to prepare stuff, and then do something with the data.
f = Filter()
parser = f.setup_parser()
args = f.parse_args(argv)
for i in f.intervals():
print "Generic filter: need to handle", f.interval_string(i)
if __name__ == "__main__":
main()

268
src/insert.py Executable file
View File

@@ -0,0 +1,268 @@
#!/usr/bin/python
import nilmdb.client
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds, seconds_to_timestamp,
rate_to_period, now as time_now)
import nilmtools
import time
import sys
import re
import argparse
import subprocess
import textwrap
class ParseError(Exception):
def __init__(self, filename, error):
msg = filename + ": " + error
super(ParseError, self).__init__(msg)
def parse_args(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = textwrap.dedent("""\
Insert large amount of data from an external source like ethstream.
This code tracks two timestamps:
(1) The 'data' timestamp is the precise timestamp corresponding to
a particular row of data, and is the timestamp that gets
inserted into the database. It increases by 'data_delta' for
every row of input.
'data_delta' can come from one of two sources. If '--delta'
is specified, it is pulled from the first column of data. If
'--rate' is specified, 'data_delta' is set to a fixed value of
(1 / rate).
(2) The 'clock' timestamp is the less precise timestamp that gives
the absolute time. It can come from two sources. If '--live'
is specified, it is pulled directly from the system clock. If
'--file' is specified, it is extracted from the input filename
every time a new file is opened for read, and from comments
that appear in the file.
Small discrepencies between 'data' and 'clock' are ignored. If
the 'data' timestamp ever differs from the 'clock' timestamp by
more than 'max_gap' seconds:
- If 'data' is running behind, there is a gap in the data, so it
is stepped forward to match 'clock'.
- If 'data' is running ahead, there is overlap in the data, and an
error is raised.
"""))
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
group = parser.add_argument_group("Misc options")
group.add_argument("-D", "--dry-run", action="store_true",
help="Parse files, but don't insert any data")
group.add_argument("-m", "--max-gap", action="store", default=10.0,
metavar="SEC", type=float,
help="Max discrepency between clock and data "
"timestamps (default: %(default)s)")
group = parser.add_argument_group("Data timestamp delta")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-r", "--rate", action="store", default=8000.0,
type=float,
help="Data_delta is constant 1/RATE "
"(default: %(default)s Hz)")
exc.add_argument("-d", "--delta", action="store_true",
help="Data_delta is the first number in each line")
group = parser.add_argument_group("Clock timestamp source")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-l", "--live", action="store_true",
help="Use live system time for clock timestamp")
exc.add_argument("-f", "--file", action="store_true", default=True,
help="Use filename or comments for clock timestamp")
group.add_argument("-o", "--offset-filename", metavar="SEC",
action="store", default=-3600.0, type=float,
help="Offset to add to filename timestamps "
"(default: %(default)s)")
group.add_argument("-O", "--offset-comment", metavar="SEC",
action="store", default=0.0, type=float,
help="Offset to add to comment timestamps "
"(default: %(default)s)")
group = parser.add_argument_group("Database path")
group.add_argument("path", action="store",
help="Path of stream, e.g. /foo/bar")
group = parser.add_argument_group("Input files")
group.add_argument("infile", type=argparse.FileType('r'), nargs='*',
default=[sys.stdin],
help="Input files (default: stdin)")
args = parser.parse_args(argv)
printf(" Stream path: %s\n", args.path)
printf(" Data timestamp: ")
if args.delta:
printf("delta on each input line\n")
else:
printf("fixed rate %s Hz\n", repr(args.rate))
printf(" Clock timestamp: ")
if args.live:
printf("live system clock\n")
else:
printf("from filenames and comments\n")
printf(" Filename offset: %s seconds\n", repr(args.offset_filename))
printf(" Comment offset: %s seconds\n", repr(args.offset_comment))
printf(" Max gap: %s seconds\n", repr(args.max_gap))
if args.dry_run:
printf("Dry run (no data will be inserted)\n")
return args
def main(argv = None):
args = parse_args(argv)
client = nilmdb.client.Client(args.url)
# data_ts is the timestamp that we'll use for the current line
data_ts_base = 0
data_ts_inc = 0
data_ts_rate = args.rate
data_ts_delta = 0
def get_data_ts():
if args.delta:
return data_ts_base + data_ts_delta
else:
return data_ts_base + rate_to_period(data_ts_rate,
data_ts_inc)
# clock_ts is the imprecise "real" timestamp (from the filename,
# comments, or system clock)
clock_ts = None
def print_clock_updated():
printf("Clock timestamp updated to %s\n", timestamp_to_human(clock_ts))
if data_ts_base != 0:
diff = get_data_ts() - clock_ts
if diff >= 0:
printf(" (data timestamp ahead by %.6f sec)\n",
timestamp_to_seconds(diff))
else:
printf(" (data timestamp behind by %.6f sec)\n",
timestamp_to_seconds(-diff))
offset_filename = seconds_to_timestamp(args.offset_filename)
offset_comment = seconds_to_timestamp(args.offset_comment)
max_gap = seconds_to_timestamp(args.max_gap)
with client.stream_insert_context(args.path) as stream:
for f in args.infile:
filename = f.name
printf("Processing %s\n", filename)
# If the filename ends in .gz, re-open it with gzip to
# decompress.
if filename.endswith(".gz"):
p = subprocess.Popen(["gzip", "-dc"],
stdin = f, stdout = subprocess.PIPE)
f = p.stdout
# Try to get a real timestamp from the filename
try:
# Subtract 1 hour because files are created at the end
# of the hour. Hopefully, we'll be able to use
# internal comments and this value won't matter anyway.
clock_ts = parse_time(filename) + offset_filename
print_clock_updated()
except ValueError:
pass
truncated_lines = 0
# Read each line
for line in f:
# Once in a while a line might be truncated, if we're
# at the end of a file. Ignore it, but if we ignore
# too many, bail out.
if line[-1] != '\n':
truncated_lines += 1
if truncated_lines > 3:
raise ParseError(filename, "too many short lines")
printf("Ignoring short line in %s\n", filename)
continue
# If no content other than the newline, skip it
if len(line) <= 1:
continue
# If line starts with a comment, look for a timestamp
if line[0] == '#':
try:
clock_ts = parse_time(line[1:]) + offset_comment
print_clock_updated()
except ValueError:
pass
continue
# If --delta mode, increment data_ts_delta by the
# delta from the file.
if args.delta:
try:
(delta, line) = line.split(None, 1)
data_ts_delta += float(delta)
except ValueError:
raise ParseError(filename, "can't parse delta")
# Calculate data_ts for this row
data_ts = get_data_ts()
# If inserting live, use clock timestamp
if args.live:
clock_ts = time_now()
# If we have a real timestamp, compare it to the data
# timestamp, and make sure things match up.
if clock_ts is not None:
if (data_ts - max_gap) > clock_ts:
# Accumulated line timestamps are in the future.
# If we were to set data_ts=clock_ts, we'd create
# an overlap, so we have to just bail out here.
err = sprintf("Data is coming in too fast: data time "
"is %s but clock time is only %s",
timestamp_to_human(data_ts),
timestamp_to_human(clock_ts))
raise ParseError(filename, err)
if (data_ts + max_gap) < clock_ts:
# Accumulated line timetamps are in the past. We
# can just skip some time and leave a gap in the
# data.
if data_ts_base != 0:
printf("Skipping data timestamp forward from "
"%s to %s to match clock time\n",
timestamp_to_human(data_ts),
timestamp_to_human(clock_ts))
stream.finalize()
data_ts_base = data_ts = clock_ts
data_ts_inc = data_ts_delta = 0
# Don't use this clock time anymore until we update it
clock_ts = None
if data_ts_base == 0:
raise ParseError(filename, "No idea what timestamp to use")
# This line is legit, so increment timestamp (for --rate)
data_ts_inc += 1
# Insert it
if not args.dry_run:
stream.insert("%d %s" % (data_ts, line))
print "Done"
if __name__ == "__main__":
main()

126
src/prep.py Executable file
View File

@@ -0,0 +1,126 @@
#!/usr/bin/python
# Spectral envelope preprocessor.
# Requires two streams as input: the original raw data, and sinefit data.
import nilmtools.filter
import nilmdb.client
from numpy import *
import scipy.fftpack
import scipy.signal
from matplotlib import pyplot as p
import bisect
def main(argv = None):
# Set up argument parser
f = nilmtools.filter.Filter()
parser = f.setup_parser("Spectral Envelope Preprocessor", skip_paths = True)
group = parser.add_argument_group("Prep options")
group.add_argument("-c", "--column", action="store", type=int,
help="Column number (first data column is 1)")
group.add_argument("-n", "--nharm", action="store", type=int, default=4,
help="number of odd harmonics to compute")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-r", "--rotate", action="store", type=float,
help="rotate FFT output by this many degrees")
exc.add_argument("-R", "--rotate-rad", action="store", type=float,
help="rotate FFT output by this many radians")
group.add_argument("srcpath", action="store",
help="Path of raw input, e.g. /foo/raw")
group.add_argument("sinepath", action="store",
help="Path of sinefit input, e.g. /foo/sinefit")
group.add_argument("destpath", action="store",
help="Path of prep output, e.g. /foo/prep")
# Parse arguments
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
rec = "float32_%d" % (e.parsed_args.nharm * 2)
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec)
raise SystemExit(1)
# Check arguments
if args.column is None or args.column < 1:
parser.error("need a column number >= 1")
if args.nharm < 1 or args.nharm > 32:
parser.error("number of odd harmonics must be 1-32")
if args.rotate is not None:
rotation = args.rotate * 2.0 * pi / 360.0
else:
rotation = args.rotate_rad or 0.0
# Check the sine fit stream
client_sinefit = nilmdb.client.Client(args.url)
sinefit = nilmtools.filter.get_stream_info(client_sinefit, args.sinepath)
if not sinefit:
raise Exception("sinefit data not found")
if sinefit.layout != "float32_3":
raise Exception("sinefit data type is " + sinefit.layout
+ "; expected float32_3")
# Check and set metadata in prep stream
f.check_dest_metadata({ "prep_raw_source": f.src.path,
"prep_sinefit_source": sinefit.path,
"prep_column": args.column })
# Run the processing function on all data
f.process_numpy(process, args = (client_sinefit, sinefit.path, args.column,
args.nharm, rotation))
def process(data, interval, args, insert_function, final):
(client, sinefit_path, column, nharm, rotation) = args
rows = data.shape[0]
data_timestamps = data[:,0]
processed = 0
out = zeros((1, nharm * 2 + 1))
# Pull out sinefit data for the entire time range of this block
for sinefit_line in client.stream_extract(sinefit_path,
data[0, 0], data[rows-1, 0]):
# Extract sinefit data to get zero crossing timestamps
(t_min, f0, A, C) = [ float(x) for x in sinefit_line.split() ]
t_max = t_min + 1e6 / f0
# Find the indices of data that correspond to (t_min, t_max)
idx_min = bisect.bisect_left(data_timestamps, t_min)
idx_max = bisect.bisect_left(data_timestamps, t_max)
if idx_min >= idx_max:
# something's wonky; ignore this period
continue
if idx_max >= len(data_timestamps):
# max is likely past the end of our chunk, so stop
# processing this chunk now.
break
# Perform FFT over those indices
N = idx_max - idx_min
d = data[idx_min:idx_max, column]
F = scipy.fftpack.fft(d) / N
# If we wanted more harmonics than we have, pad with zeros
if N < (nharm * 2):
F = r_[F, zeros(nharm * 2 - N)]
# Fill output data
out[0, 0] = t_min
for k in range(nharm):
Fk = F[2 * k + 1] * e**(rotation * 1j * (k+1))
out[0, 2 * k + 1] = -imag(Fk) # Pk
out[0, 2 * k + 2] = real(Fk) # Qk
# Insert it and continue
insert_function(out)
processed = idx_max
print "Processed", processed, "of", rows, "rows"
return processed
if __name__ == "__main__":
main()

187
src/sinefit.py Executable file
View File

@@ -0,0 +1,187 @@
#!/usr/bin/python
# Sine wave fitting. This runs about 5x faster than realtime on raw data.
import nilmtools.filter
import nilmdb.client
from numpy import *
from scipy import *
#import pylab as p
import operator
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Sine wave fitting")
group = parser.add_argument_group("Sine fit options")
group.add_argument('-c', '--column', action='store', type=int,
help='Column number (first data column is 1)')
group.add_argument('-f', '--frequency', action='store', type=float,
default=60.0,
help='Approximate frequency (default: %(default)s)')
# Parse arguments
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
rec = "float32_3"
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec)
raise SystemExit(1)
if args.column is None or args.column < 1:
parser.error("need a column number >= 1")
if args.frequency < 0.1:
parser.error("frequency must be >= 0.1")
f.check_dest_metadata({ "sinefit_source": f.src.path,
"sinefit_column": args.column })
f.process_numpy(process, args = (args.column, args.frequency))
def process(data, interval, args, insert_function, final):
(column, f_expected) = args
rows = data.shape[0]
# Estimate sampling frequency from timestamps
fs = 1e6 * (rows-1) / (data[-1][0] - data[0][0])
# Pull out about 3.5 periods of data at once;
# we'll expect to match 3 zero crossings in each window
N = max(int(3.5 * fs / f_expected), 10)
# If we don't have enough data, don't bother processing it
if rows < N:
return 0
# Process overlapping windows
start = 0
num_zc = 0
while start < (rows - N):
this = data[start:start+N, column]
t_min = data[start, 0]/1e6
t_max = data[start+N-1, 0]/1e6
# Do 4-parameter sine wave fit
(A, f0, phi, C) = sfit4(this, fs)
# Check bounds. If frequency is too crazy, ignore this window
if f0 < (f_expected/2) or f0 > (f_expected*2):
print "frequency", f0, "too far from expected value", f_expected
start += N
continue
#p.plot(arange(N), this)
#p.plot(arange(N), A * cos(f0/fs * 2 * pi * arange(N) + phi) + C, 'g')
# Period starts when the argument of cosine is 3*pi/2 degrees,
# so we're looking for sample number:
# n = (3 * pi / 2 - phi) / (f0/fs * 2 * pi)
zc_n = (3 * pi / 2 - phi) / (f0 / fs * 2 * pi)
period_n = fs/f0
# Add periods to make N positive
while zc_n < 0:
zc_n += period_n
last_zc = None
# Mark the zero crossings until we're a half period away
# from the end of the window
while zc_n < (N - period_n/2):
#p.plot(zc_n, C, 'ro')
t = t_min + zc_n / fs
insert_function([[t * 1e6, f0, A, C]])
num_zc += 1
last_zc = zc_n
zc_n += period_n
# Advance the window one quarter period past the last marked
# zero crossing, or advance the window by half its size if we
# didn't mark any.
if last_zc is not None:
advance = min(last_zc + period_n/4, N)
else:
advance = N/2
#p.plot(advance, C, 'go')
#p.show()
start = int(round(start + advance))
# Return the number of rows we've processed
print "Marked", num_zc, "zero-crossings in", start, "rows"
return start
def sfit4(data, fs):
"""(A, f0, phi, C) = sfit4(data, fs)
Compute 4-parameter (unknown-frequency) least-squares fit to
sine-wave data, according to IEEE Std 1241-2010 Annex B
Input:
data vector of input samples
fs sampling rate (Hz)
Output:
Parameters [A, f0, phi, C] to fit the equation
x[n] = A * cos(f0/fs * 2 * pi * n + phi) + C
where n is sample number. Or, as a function of time:
x(t) = A * cos(f0 * 2 * pi * t + phi) + C
by Jim Paris
(Verified to match sfit4.m)
"""
N = len(data)
t = linspace(0, (N-1) / fs, N)
## Estimate frequency using FFT (step b)
Fc = fft(data)
F = abs(Fc)
F[0] = 0 # eliminate DC
# Find pair of spectral lines with largest amplitude:
# resulting values are in F(i) and F(i+1)
i = argmax(F[0:int(N/2)] + F[1:int(N/2+1)])
# Interpolate FFT to get a better result (from Markus [B37])
U1 = real(Fc[i])
U2 = real(Fc[i+1])
V1 = imag(Fc[i])
V2 = imag(Fc[i+1])
n = 2 * pi / N
ni1 = n * i
ni2 = n * (i+1)
K = ((V2-V1)*sin(ni1) + (U2-U1)*cos(ni1)) / (U2-U1)
Z1 = V1 * (K - cos(ni1)) / sin(ni1) + U1
Z2 = V2 * (K - cos(ni2)) / sin(ni2) + U2
i = arccos((Z2*cos(ni2) - Z1*cos(ni1)) / (Z2-Z1)) / n
# Convert to Hz
f0 = i * fs / N
## Fit it
# first guess for A0, B0 using 3-parameter fit (step c)
w = 2*pi*f0
D = c_[cos(w*t), sin(w*t), ones(N)]
s = linalg.lstsq(D, data)[0]
# Now iterate 6 times (step i)
for idx in range(6):
D = c_[cos(w*t), sin(w*t), ones(N),
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
s = linalg.lstsq(D, data)[0] # eqn B.18
w = w + s[3] # update frequency estimate
## Extract results
A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
f0 = w / (2*pi)
try:
phi = -arctan2(s[1], s[0]) # eqn B.22
except TypeError:
# something broke down, just return zeros
return (0, 0, 0, 0)
C = s[2]
return (A, f0, phi, C)
if __name__ == "__main__":
main()