Compare commits

...

37 Commits

Author SHA1 Message Date
9e321d9e41 Add --force-metadata option; fix bugs 2013-04-04 15:33:57 -04:00
f2bebea5d0 Add copy_wildcard script 2013-04-04 15:33:57 -04:00
d919a73387 Use parser error rather than systemexit exception 2013-04-04 15:33:57 -04:00
17fa79a5dc Makefile updates 2013-04-04 15:33:57 -04:00
ca970fa1fd Rename copy_ to copy_one 2013-04-04 15:33:57 -04:00
805d8fb24f Add prep 2013-04-03 21:04:03 -04:00
05da75e34a Allow processing 0 lines; warn if data is building up 2013-04-03 21:03:44 -04:00
56e778df71 Add skip_paths option to filter.setup_parser 2013-04-03 21:03:29 -04:00
87178e9599 self.args is used for other stuff in an exception 2013-04-03 21:03:17 -04:00
f8b1a001c3 makefile updates 2013-04-03 21:02:42 -04:00
7e88da3c26 Bring get_stream_info into nilmtools.filter module; use it 2013-04-02 18:04:28 -04:00
b637f17887 Add nilm-decimate-auto script 2013-04-02 16:13:44 -04:00
9a7a1df537 Simplify StreamInfo constructor 2013-04-02 15:47:43 -04:00
101b701882 Fix sys.argv stuff; remove outdated shell scripts 2013-04-01 18:40:48 -04:00
457c518809 Add some docs about building new tools 2013-04-01 18:14:59 -04:00
3eff3d81fe Change default URL to http://localhost/nilmdb/ 2013-04-01 18:03:43 -04:00
a56dc22030 Allow arguments to be passed in as a list to main() functions 2013-04-01 18:03:22 -04:00
9b770cd28f Note speed 2013-04-01 16:39:23 -04:00
348c435d1e Catch arctan2 errors 2013-04-01 16:38:50 -04:00
7f1c1a6c32 Add nilm-sinefit script link 2013-03-30 22:53:29 -04:00
bdfc29887b Proper window advancement & insertion for sine fit; comment out plots 2013-03-30 22:51:54 -04:00
4e5907f381 Send pending data after each block 2013-03-30 22:51:37 -04:00
9078a014ae Require 2-dimensional array in insert_function 2013-03-30 22:51:20 -04:00
533892e624 Update Makefile and setup.py dependencies 2013-03-30 22:50:55 -04:00
e0f973b449 Update makefile and setup.py requirements 2013-03-30 17:30:10 -04:00
698cb6ef26 More work on sinefit 2013-03-28 14:08:50 -04:00
1db38cc5da Port sfit4 function to Python, start writing sinefit filter 2013-03-26 19:38:53 -04:00
a984e54f23 Add process_numpy function to filter.py
This converts data into big Numpy arrays and lets the user-provided
function process as much as they'd like at a time.
2013-03-26 19:38:05 -04:00
974c9a3050 Fix error message when decimate target is missing 2013-03-22 14:59:08 -04:00
320c32cfdc Sample script for copying wildcard paths between hosts 2013-03-19 17:55:18 -04:00
0f1e442cd4 Support filtering (or copying) between two different hosts 2013-03-19 17:54:59 -04:00
3e78da12dc Rename copy.py to avoid stupid Python conflicts
Any other module (in this case, nilmdb -> datetime_tz -> pytz -> gettext)
that does an "import copy" would pull in the copy.py, if it's in the
current working directory.  Python is dumb...
2013-03-19 16:19:14 -04:00
ef9277cbff Rename nilmtools directory to just src 2013-03-19 15:42:29 -04:00
de68956f76 Update copy tool 2013-03-16 23:13:45 -04:00
e73dd313d5 Reworked things to match nilmdb updates; a bit faster 2013-03-16 14:46:49 -04:00
d23fa9ee78 Remove unnecessary option group 2013-03-12 19:02:29 -04:00
2b9ecc6697 Add nilm-copy tool 2013-03-12 18:52:39 -04:00
16 changed files with 1072 additions and 320 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,6 @@
oldprep
newprep
*.dat
build/
*.pyc
dist/

View File

@@ -1,11 +1,33 @@
test:
nilmtool remove /lees-compressor/noleak/raw~4 -s 2000 -e 2020
nilmtool remove /lees-compressor/noleak/raw~16 -s 2000 -e 2020
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/noleak/raw /lees-compressor/noleak/raw~4
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/noleak/raw~4 /lees-compressor/noleak/raw~16
#URL="http://bucket.mit.edu:8080/nilmdb"
URL="http://localhost/nilmdb"
all:
ifeq ($(INSIDE_EMACS), t)
@make test
else
@echo "Try 'make install'"
endif
test:
@make install >/dev/null
src/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees*
test_prep:
@make install >/dev/null
src/prep.py -c 3 \
/lees-compressor/no-leak/raw \
/lees-compressor/no-leak/sinefit \
/lees-compressor/no-leak/prep \
-s '2013-02-19 18:00:00' \
-r 0
test_decimate:
-@nilmtool destroy /lees-compressor/no-leak/raw/4 || true
-@nilmtool destroy /lees-compressor/no-leak/raw/16 || true
-@nilmtool create /lees-compressor/no-leak/raw/4 float32_18 || true
-@nilmtool create /lees-compressor/no-leak/raw/16 float32_18 || true
time python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
version:
python setup.py version

View File

@@ -12,3 +12,15 @@ Prerequisites:
Install:
python setup.py install
Building new tools:
The tools in this package are meant to be installed with
"python setup.py install". If you want to make a new one,
an easier way to develop would be to first install this package,
and then copy a specific script like "src/sinefit.py" to a new
location, and modify it as desired.
To add a tool to the package, place it in "src/" and add the
appropriate configuration to "setup.py".

View File

@@ -1,70 +0,0 @@
#!/usr/bin/python
import nilmtools.filter
import nilmdb.client
import numpy as np
def main():
f = nilmtools.filter.Filter()
parser = f.setup_parser("Decimate a stream")
group = parser.add_argument_group("Decimate options")
group.add_argument('-f', '--factor', action='store', default=4, type=int,
help='Decimation factor (default: %(default)s)')
# Parse arguments
try:
args = f.parse_args()
except nilmtools.filter.MissingDestination as e:
# If no destination, suggest how to create it by figuring out
# a recommended layout.
print "Source is %s (%s)" % (e.src, e.layout)
print "Destination %s doesn't exist" % (e.dest)
if "decimate_source" in f.client.stream_get_metadata(e.src):
rec = e.layout
elif 'int32' in e.layout_type or 'float64' in e.layout_type:
rec = 'float64_' + str(e.layout_count * 3)
else:
rec = 'float32_' + str(e.layout_count * 3)
print "You could make it with a command like:"
print " nilmtool create", e.dest, rec
raise SystemExit(1)
f.check_dest_metadata({ "decimate_source": args.srcpath,
"decimate_factor": args.factor })
# If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client.stream_get_metadata(args.srcpath):
f.process(function = decimate_again, rows = args.factor)
else:
f.process(function = decimate_first, rows = args.factor)
def decimate_first(data):
"""Decimate original data -- result has 3 times as many columns"""
data = np.array(data)
rows, cols = data.shape
n = cols - 1
out = np.zeros(1 + 3 * n)
out[0] = np.mean(data[:, 0], 0)
out[ 1 : n+1 ] = np.mean(data[:, 1 : n+1], 0)
out[ n+1 : 2*n+1] = np.min( data[:, 1 : n+1], 0)
out[2*n+1 : 3*n+1] = np.max( data[:, 1 : n+1], 0)
return [out]
def decimate_again(data):
"""Decimate already-decimated data -- result has the same number
of columns"""
data = np.array(data)
rows, cols = data.shape
n = (cols - 1) // 3
out = np.zeros(1 + 3 * n)
out[0] = np.mean(data[:, 0], 0)
out[ 1 : n+1 ] = np.mean(data[:, 1 : n+1], 0)
out[ n+1 : 2*n+1] = np.min( data[:, n+1 : 2*n+1], 0)
out[2*n+1 : 3*n+1] = np.max( data[:, 2*n+1 : 3*n+1], 0)
return [out]
if __name__ == "__main__":
main()

View File

@@ -1,220 +0,0 @@
#!/usr/bin/python
import nilmdb.client
from nilmdb.utils.printf import *
from nilmdb.utils.time import parse_time, format_time
import nilmtools
import itertools
import time
import sys
import re
import argparse
class MissingDestination(Exception):
def __init__(self, src, layout, dest):
self.src = src
self.layout = layout
self.layout_type = layout.split('_')[0]
self.layout_count = int(layout.split('_')[1])
self.dest = dest
Exception.__init__(self, "destination path " + dest + " not found")
class Filter(object):
def __init__(self):
self._parser = None
self._args = None
self._client = None
self._using_client = False
self.srcinfo = None
self.destinfo = None
@property
def client(self):
if self._using_client:
raise Exception("Filter client is in use; make another")
return self._client
def setup_parser(self, description = "Filter data"):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = description)
group = parser.add_argument_group("General filter arguments")
group.add_argument("-u", "--url", action="store",
default="http://localhost:12380/",
help="Server URL (default: %(default)s)")
group.add_argument("-D", "--dry-run", action="store_true",
default = False,
help="Just print intervals that would be "
"processed")
group.add_argument("-s", "--start",
metavar="TIME", type=self.arg_time,
help="Starting timestamp for intervals "
"(free-form, inclusive)")
group.add_argument("-e", "--end",
metavar="TIME", type=self.arg_time,
help="Ending timestamp for intervals "
"(free-form, noninclusive)")
group.add_argument("srcpath", action="store",
help="Path of source stream, e.g. /foo/bar")
group.add_argument("destpath", action="store",
help="Path of destination stream, e.g. /foo/bar")
self._parser = parser
return parser
def parse_args(self):
args = self._parser.parse_args()
self._args = args
self._client = nilmdb.client.Client(args.url)
if args.srcpath == args.destpath:
raise Exception("source and destination path must be different")
# Open and print info about the streams
src = self._client.stream_list(args.srcpath, extended = True)
if len(src) != 1:
raise Exception("source path " + args.srcpath + " not found")
self.srcinfo = src[0]
dest = self._client.stream_list(args.destpath, extended = True)
if len(dest) != 1:
raise MissingDestination(self.srcinfo[0], self.srcinfo[1],
args.destpath)
self.destinfo = dest[0]
print "Source:", self.stream_info_string(self.srcinfo)
print " Dest:", self.stream_info_string(self.destinfo)
if args.dry_run:
for interval in self.intervals():
print self.interval_string(interval)
raise SystemExit(0)
return args
def intervals(self):
"""Generate all the intervals that this filter should process"""
self._using_client = True
for i in self._client.stream_intervals(
self._args.srcpath, diffpath = self._args.destpath,
start = self._args.start, end = self._args.end):
yield i
self._using_client = False
# Misc helpers
def arg_time(self, toparse):
"""Parse a time string argument"""
try:
return nilmdb.utils.time.parse_time(toparse).totimestamp()
except ValueError as e:
raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
str(e), toparse))
def stream_info_string(self, info):
"""Print stream info as a string"""
return sprintf("%s (%s), %.2fM rows, %.2f hours",
info[0], info[1], info[4] / 1e6, info[5] / 3600)
def interval_string(self, interval):
"""Print interval as a string"""
return sprintf("[ %s -> %s ]", format_time(interval[0]),
format_time(interval[1]))
def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. If
there's no conflict, update the metadata to match 'data'."""
metadata = self._client.stream_get_metadata(self._args.destpath)
rows = self.destinfo[4]
for key in data:
wanted = str(data[key])
val = metadata.get(key, wanted)
if val != wanted and rows > 0:
m = "Metadata in destination stream:\n"
m += " %s = %s\n" % (key, val)
m += "doesn't match desired data:\n"
m += " %s = %s\n" % (key, wanted)
m += "Refusing to change it. You can change the stream's "
m += "metadata manually, or\n"
m += "remove existing data from the stream, to prevent "
m += "this error.\n"
raise Exception(m)
# All good -- write the metadata in case it's not already there
self._client.stream_update_metadata(self._args.destpath, data)
# Main processing helper
def process(self, function, rows, partial = True, args = None):
"""Process data in chunks of 'rows' data at a time.
function: function to process the data
rows: maximum number of rows to pass to 'function' at once
args: tuple containing extra arguments to pass to 'function'
partial: if true, less than 'rows' may be passed to 'function'.
if false, partial data at the end of an interval will
be dropped.
'function' should be defined like:
function(data, *args)
It will be passed an array containing up to 'rows' rows of
data from the source stream, and any arguments passed in
'args'. It should transform the data as desired, and return a
new array of data, which will be inserted into the destination
stream.
"""
if args is None:
args = []
extractor = nilmdb.client.Client(self._args.url).stream_extract
inserter = nilmdb.client.Client(self._args.url).stream_insert_context
src = self._args.srcpath
dest = self._args.destpath
islice = itertools.islice
# Figure out how to format output data
dest_layout = self.destinfo[1].split('_')[1]
def int_formatter(row):
return ("%.6f " % row[0]) + " ".join(str(int(x)) for x in row[1:])
def float_formatter(row):
return ("%.6f " % row[0]) + " ".join(repr(x) for x in row[1:])
if "int" in dest_layout:
formatter = int_formatter
else:
formatter = float_formatter
for (start, end) in self.intervals():
print "Processing", self.interval_string((start, end))
with inserter(dest, start, end) as insert_ctx:
src_array = []
for line in extractor(src, start, end):
# Read in data
src_array.append([ float(x) for x in line.split() ])
if len(src_array) == rows:
# Pass through filter function
dest_array = function(src_array, *args)
# Write result to destination
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("\n".join(out) + "\n")
# Clear source array
src_array = []
# Take care of partial chunk
if len(src_array) and partial:
dest_array = function(src_array, *args)
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("\n".join(out) + "\n")
def main():
# This is just a dummy function; actual filters can use the other
# functions to prepare stuff, and then do something with the data.
f = Filter()
parser = f.setup_parser()
args = f.parse_args()
for (start, end) in f.intervals():
print "Generic filter: need to handle", start, " to ", end
if __name__ == "__main__":
main()

View File

@@ -30,7 +30,7 @@ except ImportError:
# Versioneer manages version numbers from git tags.
# https://github.com/warner/python-versioneer
import versioneer
versioneer.versionfile_source = 'nilmtools/_version.py'
versioneer.versionfile_source = 'src/_version.py'
versioneer.versionfile_build = 'nilmtools/_version.py'
versioneer.tag_prefix = 'nilmtools-'
versioneer.parentdir_prefix = 'nilmtools-'
@@ -61,14 +61,22 @@ setup(name='nilmtools',
long_description = "NILM Database Tools",
license = "Proprietary",
author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.3.0',
install_requires = [ 'nilmdb >= 1.4.6',
'numpy',
'scipy',
'matplotlib',
],
packages = [ 'nilmtools',
],
package_dir = { 'nilmtools': 'src' },
entry_points = {
'console_scripts': [
'nilm-decimate = nilmtools.decimate:main',
'nilm-decimate-auto = nilmtools.decimate_auto:main',
'nilm-insert = nilmtools.insert:main',
'nilm-copy = nilmtools.copy_one:main',
'nilm-copy-wildcard = nilmtools.copy_wildcard:main',
'nilm-sinefit = nilmtools.sinefit:main',
],
},
zip_safe = False,

View File

@@ -181,7 +181,7 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False)
tag_prefix = "nilmtools-"
parentdir_prefix = "nilmtools-"
versionfile_source = "nilmtools/_version.py"
versionfile_source = "src/_version.py"
def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
variables = { "refnames": git_refnames, "full": git_full }

40
src/copy_one.py Executable file
View File

@@ -0,0 +1,40 @@
#!/usr/bin/python
# This is called copy_one instead of copy to avoid name conflicts with
# the Python standard library.
import nilmtools.filter
import nilmdb.client
import numpy as np
import sys
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Copy a stream")
# Parse arguments
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, e.src.layout)
raise SystemExit(1)
# Copy metadata
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata(meta)
# Copy all rows of data as ASCII strings
extractor = nilmdb.client.Client(f.src.url).stream_extract
inserter = nilmdb.client.Client(f.dest.url).stream_insert_context
for i in f.intervals():
print "Processing", f.interval_string(i)
with inserter(f.dest.path, i.start, i.end) as insert_ctx:
for row in extractor(f.src.path, i.start, i.end):
insert_ctx.insert(row + "\n")
if __name__ == "__main__":
main()

70
src/copy_wildcard.py Executable file
View File

@@ -0,0 +1,70 @@
#!/usr/bin/python
# Copy streams between NilmDB servers with wildcards
import nilmtools.filter
import nilmtools.copy_one
import nilmdb.client
import argparse
import fnmatch
def main(argv = None):
f = nilmtools.filter.Filter()
# Reuse filter's parser, since it handles most options we need.
parser = f.setup_parser(description = """\
Copy all streams matching the given wildcard from one host to another.
Example: %(prog)s -u http://host1/nilmdb -U http://host2/nilmdb /sharon/*
""", skip_paths = True)
parser.add_argument("path", action="store", nargs="+",
help='Wildcard paths to copy')
args = parser.parse_args(argv)
# Verify arguments
if args.dest_url is None:
parser.error("must provide both source and destination URL")
client_src = nilmdb.client.Client(args.url)
client_dest = nilmdb.client.Client(args.dest_url)
if client_src.geturl() == client_dest.geturl():
parser.error("source and destination URL must be different")
print "Source URL:", client_src.geturl()
print " Dest URL:", client_dest.geturl()
# Find matching streams
matched = []
for path in args.path:
matched.extend([s for s in client_src.stream_list(extended = True)
if fnmatch.fnmatch(s[0], path)
and s not in matched])
# Create destination streams if they don't exist
for stream in matched:
src = nilmtools.filter.StreamInfo(client_src.geturl(), stream)
dest = nilmtools.filter.get_stream_info(client_dest, src.path)
if not dest:
print "Creating destination stream", src.path
client_dest.stream_create(src.path, src.layout)
# Copy them all by running the "copy" tool as if it were
# invoked from the command line.
for stream in matched:
new_argv = ["--url", client_src.geturl(),
"--dest-url", client_dest.geturl() ]
if args.start:
new_argv.extend(["--start", "@" + repr(args.start)])
if args.end:
new_argv.extend(["--end", "@" + repr(args.end)])
if args.dry_run:
new_argv.extend(["--dry-run"])
if args.force_metadata:
new_argv.extend(["--force-metadata"])
new_argv.extend([stream[0], stream[0]])
try:
nilmtools.copy_one.main(new_argv)
except SystemExit as e:
# Ignore SystemExit which could be raised on --dry-run
if e.code != 0:
raise
if __name__ == "__main__":
main()

81
src/decimate.py Executable file
View File

@@ -0,0 +1,81 @@
#!/usr/bin/python
import nilmtools.filter
import nilmdb.client
import numpy as np
import operator
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Decimate a stream")
group = parser.add_argument_group("Decimate options")
group.add_argument('-f', '--factor', action='store', default=4, type=int,
help='Decimation factor (default: %(default)s)')
# Parse arguments
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
# If no destination, suggest how to create it by figuring out
# a recommended layout.
src = e.src
dest = e.dest
print "Source is %s (%s)" % (src.path, src.layout)
print "Destination %s doesn't exist" % (dest.path)
if "decimate_source" in f.client_src.stream_get_metadata(src.path):
rec = src.layout
elif 'int32' in src.layout_type or 'float64' in src.layout_type:
rec = 'float64_' + str(src.layout_count * 3)
else:
rec = 'float32_' + str(src.layout_count * 3)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, rec)
raise SystemExit(1)
if not (args.factor >= 2):
raise Exception("factor needs to be 2 or more")
f.check_dest_metadata({ "decimate_source": f.src.path,
"decimate_factor": args.factor })
# If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath):
n = f.src.layout_count // 3
f.process_python(function = decimate_again, rows = args.factor,
args = (n,))
else:
n = f.src.layout_count
f.process_python(function = decimate_first, rows = args.factor,
args = (n,))
def decimate_first(data, n):
"""Decimate original data -- result has 3 times as many columns"""
# For this simple calculation, converting to a Numpy array
# and doing the math is slower than just doing it directly.
rows = iter(data)
r_sum = r_min = r_max = rows.next()
for row in rows:
r_sum = map(operator.add, r_sum, row)
r_min = map(min, r_min, row)
r_max = map(max, r_max, row)
r_mean = [ x / len(data) for x in r_sum ]
return [ [ r_mean[0] ] + r_mean[1:] + r_min[1:] + r_max[1:] ]
def decimate_again(data, n):
"""Decimate already-decimated data -- result has the same number
of columns"""
rows = iter(data)
r = rows.next()
r_sum = r[0:(n+1)]
r_min = r[(n+1):(2*n+1)]
r_max = r[(2*n+1):(3*n+1)]
for r in rows:
r_sum = map(operator.add, r_sum, r[0:(n+1)])
r_min = map(min, r_min, r[(n+1):(2*n+1)])
r_max = map(max, r_max, r[(2*n+1):(3*n+1)])
r_mean = [ x / len(data) for x in r_sum ]
return [ r_mean + r_min + r_max ]
if __name__ == "__main__":
main()

76
src/decimate_auto.py Executable file
View File

@@ -0,0 +1,76 @@
#!/usr/bin/python
import nilmtools.filter
import nilmtools.decimate
import nilmdb.client
import argparse
def main(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = "1.0",
description = """\
Automatically create multiple decimations from a single source
stream, continuing until the last decimated level contains fewer
than 500 points total.
""")
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument('-f', '--factor', action='store', default=4, type=int,
help='Decimation factor (default: %(default)s)')
parser.add_argument("--force-metadata", action="store_true",
default = False,
help="Force metadata changes if the dest "
"doesn't match")
parser.add_argument("path", action="store",
help='Path of base stream')
args = parser.parse_args(argv)
# Pull out info about the base stream
client = nilmdb.client.Client(args.url)
info = nilmtools.filter.get_stream_info(client, args.path)
if not info:
raise Exception("path " + args.path + " not found")
meta = client.stream_get_metadata(args.path)
if "decimate_source" in meta:
print "Stream", args.path, "was decimated from", meta["decimate_source"]
print "You need to pass the base stream instead"
raise SystemExit(1)
# Figure out the type we should use for decimated streams
if 'int32' in info.layout_type or 'float64' in info.layout_type:
decimated_type = 'float64_' + str(info.layout_count * 3)
else:
decimated_type = 'float32_' + str(info.layout_count * 3)
# Now do the decimations until we have few enough points
factor = 1
while True:
print "Level", factor, "decimation has", info.rows, "rows"
if info.rows <= 500:
break
factor *= args.factor
new_path = "%s~decim-%d" % (args.path, factor)
# Create the stream if needed
new_info = nilmtools.filter.get_stream_info(client, new_path)
if not new_info:
print "Creating stream", new_path
client.stream_create(new_path, decimated_type)
# Run the decimation as if it were run from the commandline
new_argv = [ "-u", args.url,
"-f", str(args.factor) ]
if args.force_metadata:
new_argv.extend([ "--force-metadata" ])
new_argv.extend([info.path, new_path])
nilmtools.decimate.main(new_argv)
# Update info using the newly decimated stream
info = nilmtools.filter.get_stream_info(client, new_path)
if __name__ == "__main__":
main()

413
src/filter.py Normal file
View File

@@ -0,0 +1,413 @@
#!/usr/bin/python
from __future__ import absolute_import
import nilmdb.client
from nilmdb.client import Client
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds)
from nilmdb.utils.interval import Interval
import nilmtools
import itertools
import time
import sys
import re
import argparse
import numpy as np
import cStringIO
class MissingDestination(Exception):
def __init__(self, args, src, dest):
self.parsed_args = args
self.src = src
self.dest = dest
Exception.__init__(self, "destination path " + dest.path + " not found")
class StreamInfo(object):
def __init__(self, url, info):
self.url = url
self.info = info
try:
self.path = info[0]
self.layout = info[1]
self.layout_type = self.layout.split('_')[0]
self.layout_count = int(self.layout.split('_')[1])
self.total_count = self.layout_count + 1
self.timestamp_min = info[2]
self.timestamp_max = info[3]
self.rows = info[4]
self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5])
except IndexError, TypeError:
pass
def string(self, interhost):
"""Return stream info as a string. If interhost is true,
include the host URL."""
if interhost:
return sprintf("[%s] ", self.url) + str(self)
return str(self)
def __str__(self):
"""Return stream info as a string."""
return sprintf("%s (%s), %.2fM rows, %.2f hours",
self.path, self.layout, self.rows / 1e6,
self.seconds / 3600.0)
def get_stream_info(client, path):
"""Return a StreamInfo object about the given path, or None if it
doesn't exist"""
streams = client.stream_list(path, extended = True)
if len(streams) != 1:
return None
return StreamInfo(client.geturl(), streams[0])
class Filter(object):
def __init__(self):
self._parser = None
self._client_src = None
self._client_dest = None
self._using_client = False
self.src = None
self.dest = None
self.start = None
self.end = None
self.interhost = False
self.force_metadata = False
@property
def client_src(self):
if self._using_client:
raise Exception("Filter client is in use; make another")
return self._client_src
@property
def client_dest(self):
if self._using_client:
raise Exception("Filter client is in use; make another")
return self._client_dest
def setup_parser(self, description = "Filter data", skip_paths = False):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = description)
group = parser.add_argument_group("General filter arguments")
group.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="Server URL (default: %(default)s)")
group.add_argument("-U", "--dest-url", action="store",
help="Destination server URL "
"(default: same as source)")
group.add_argument("-D", "--dry-run", action="store_true",
default = False,
help="Just print intervals that would be "
"processed")
group.add_argument("--force-metadata", action="store_true",
default = False,
help="Force metadata changes if the dest "
"doesn't match")
group.add_argument("-s", "--start",
metavar="TIME", type=self.arg_time,
help="Starting timestamp for intervals "
"(free-form, inclusive)")
group.add_argument("-e", "--end",
metavar="TIME", type=self.arg_time,
help="Ending timestamp for intervals "
"(free-form, noninclusive)")
if not skip_paths:
# Individual filter scripts might want to add these arguments
# themselves, to include multiple sources in a different order
# (for example). "srcpath" and "destpath" arguments must exist,
# though.
group.add_argument("srcpath", action="store",
help="Path of source stream, e.g. /foo/bar")
group.add_argument("destpath", action="store",
help="Path of destination stream, e.g. /foo/bar")
self._parser = parser
return parser
def interval_string(self, interval):
return sprintf("[ %s -> %s ]",
timestamp_to_human(interval.start),
timestamp_to_human(interval.end))
def parse_args(self, argv = None):
args = self._parser.parse_args(argv)
if args.dest_url is None:
args.dest_url = args.url
if args.url != args.dest_url:
self.interhost = True
self._client_src = Client(args.url)
self._client_dest = Client(args.dest_url)
if (not self.interhost) and (args.srcpath == args.destpath):
self._parser.error("source and destination path must be different")
# Open and print info about the streams
self.src = get_stream_info(self._client_src, args.srcpath)
if not self.src:
self._parser.error("source path " + args.srcpath + " not found")
self.dest = get_stream_info(self._client_dest, args.destpath)
if not self.dest:
raise MissingDestination(args, self.src,
StreamInfo(args.dest_url, [args.destpath]))
print "Source:", self.src.string(self.interhost)
print " Dest:", self.dest.string(self.interhost)
if args.dry_run:
for interval in self.intervals():
print self.interval_string(interval)
raise SystemExit(0)
self.force_metadata = args.force_metadata
self.start = args.start
self.end = args.end
return args
def _optimize_int(self, it):
"""Join and yield adjacent intervals from the iterator 'it'"""
saved_int = None
for interval in it:
if saved_int is not None:
if saved_int.end == interval.start:
interval.start = saved_int.start
else:
yield saved_int
saved_int = interval
if saved_int is not None:
yield saved_int
def intervals(self):
"""Generate all the intervals that this filter should process"""
self._using_client = True
if self.interhost:
# Do the difference ourselves
s_intervals = ( Interval(start, end)
for (start, end) in
self._client_src.stream_intervals(
self.src.path,
start = self.start, end = self.end) )
d_intervals = ( Interval(start, end)
for (start, end) in
self._client_dest.stream_intervals(
self.dest.path,
start = self.start, end = self.end) )
intervals = nilmdb.utils.interval.set_difference(s_intervals,
d_intervals)
else:
# Let the server do the difference for us
intervals = ( Interval(start, end)
for (start, end) in
self._client_src.stream_intervals(
self.src.path, diffpath = self.dest.path,
start = self.start, end = self.end) )
# Optimize intervals: join intervals that are adjacent
for interval in self._optimize_int(intervals):
yield interval
self._using_client = False
# Misc helpers
def arg_time(self, toparse):
"""Parse a time string argument"""
try:
return nilmdb.utils.time.parse_time(toparse)
except ValueError as e:
raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
str(e), toparse))
def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. If
there's no conflict, update the metadata to match 'data'."""
metadata = self._client_dest.stream_get_metadata(self.dest.path)
if not self.force_metadata:
for key in data:
wanted = str(data[key])
val = metadata.get(key, wanted)
if val != wanted and self.dest.rows > 0:
m = "Metadata in destination stream:\n"
m += " %s = %s\n" % (key, val)
m += "doesn't match desired data:\n"
m += " %s = %s\n" % (key, wanted)
m += "Refusing to change it. To prevent this error, "
m += "change or delete the metadata with nilmtool,\n"
m += "remove existing data from the stream, or "
m += "retry with --force-metadata."
raise Exception(m)
# All good -- write the metadata in case it's not already there
self._client_dest.stream_update_metadata(self.dest.path, data)
# Main processing helper
def process_python(self, function, rows, args = None, partial = False):
"""Process data in chunks of 'rows' data at a time.
This provides data as nested Python lists and expects the same
back.
function: function to process the data
rows: maximum number of rows to pass to 'function' at once
args: tuple containing extra arguments to pass to 'function'
partial: if true, less than 'rows' may be passed to 'function'.
if false, partial data at the end of an interval will
be dropped.
'function' should be defined like:
function(data, *args)
It will be passed a list containing up to 'rows' rows of
data from the source stream, and any arguments passed in
'args'. It should transform the data as desired, and return a
new list of rdata, which will be inserted into the destination
stream.
"""
if args is None:
args = []
extractor = Client(self.src.url).stream_extract
inserter = Client(self.dest.url).stream_insert_context
# Parse input data. We use homogenous types for now, which
# means the timestamp type will be either float or int.
if "int" in self.src.layout_type:
parser = lambda line: [ int(x) for x in line.split() ]
else:
parser = lambda line: [ float(x) for x in line.split() ]
# Format output data.
formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"
for interval in self.intervals():
print "Processing", self.interval_string(interval)
with inserter(self.dest.path,
interval.start, interval.end) as insert_ctx:
src_array = []
for line in extractor(self.src.path,
interval.start, interval.end):
# Read in data
src_array.append([ float(x) for x in line.split() ])
if len(src_array) == rows:
# Pass through filter function
dest_array = function(src_array, *args)
# Write result to destination
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("".join(out))
# Clear source array
src_array = []
# Take care of partial chunk
if len(src_array) and partial:
dest_array = function(src_array, *args)
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("".join(out))
# Like process_python, but provides Numpy arrays and allows for
# partial processing.
def process_numpy(self, function, args = None, rows = 100000):
"""For all intervals that exist in self.src but don't exist in
self.dest, call 'function' with a Numpy array corresponding to
the data. The data is converted to a Numpy array in chunks of
'rows' rows at a time.
'function' should be defined as:
def function(data, interval, args, insert_func, final)
'data': array of data to process -- may be empty
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
"""
if args is None:
args = []
extractor = Client(self.src.url).stream_extract
inserter = Client(self.dest.url).stream_insert_context
# Format output data.
formatter = lambda row: " ".join([repr(x) for x in row]) + "\n"
def batch(iterable, size):
c = itertools.count()
for k, g in itertools.groupby(iterable, lambda x: c.next() // size):
yield g
for interval in self.intervals():
print "Processing", self.interval_string(interval)
with inserter(self.dest.path,
interval.start, interval.end) as insert_ctx:
def insert_function(array):
s = cStringIO.StringIO()
if len(np.shape(array)) != 2:
raise Exception("array must be 2-dimensional")
np.savetxt(s, array)
insert_ctx.insert(s.getvalue())
extract = extractor(self.src.path, interval.start, interval.end)
old_array = np.array([])
for batched in batch(extract, rows):
# Read in this batch of data
new_array = np.loadtxt(batched)
# If we still had old data left, combine it
if old_array.shape[0] != 0:
array = np.vstack((old_array, new_array))
else:
array = new_array
# Pass it to the process function
processed = function(array, interval, args,
insert_function, False)
# Send any pending data
insert_ctx.send()
# Save the unprocessed parts
if processed >= 0:
old_array = array[processed:]
else:
raise Exception(
sprintf("%s return value %s must be >= 0",
str(function), str(processed)))
# Warn if there's too much data remaining
if old_array.shape[0] > 3 * rows:
printf("warning: %d unprocessed rows in buffer\n",
old_array.shape[0])
# Last call for this contiguous interval
if old_array.shape[0] != 0:
function(old_array, interval, args, insert_function, True)
def main(argv = None):
# This is just a dummy function; actual filters can use the other
# functions to prepare stuff, and then do something with the data.
f = Filter()
parser = f.setup_parser()
args = f.parse_args(argv)
for i in f.intervals():
print "Generic filter: need to handle", f.interval_string(i)
if __name__ == "__main__":
main()

View File

@@ -2,7 +2,9 @@
import nilmdb.client
from nilmdb.utils.printf import *
from nilmdb.utils.time import parse_time, format_time
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds, seconds_to_timestamp,
rate_to_period, now as time_now)
import nilmtools
import time
@@ -16,7 +18,7 @@ class ParseError(Exception):
msg = filename + ": " + error
super(ParseError, self).__init__(msg)
def parse_args():
def parse_args(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
@@ -30,7 +32,7 @@ def parse_args():
created in the stream. Overlapping data returns an error.
""")
parser.add_argument("-u", "--url", action="store",
default="http://localhost:12380/",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-r", "--rate", action="store", default=8000,
type=float,
@@ -42,16 +44,15 @@ def parse_args():
parser.add_argument("infile", type=argparse.FileType('r'), nargs='*',
default=[sys.stdin],
help="Input files (default: stdin)")
args = parser.parse_args()
args = parser.parse_args(argv)
printf("Stream path: %s\n", args.path)
printf(" Data rate: %s Hz\n", repr(args.rate))
return args
def main(args = None):
if args is None:
args = parse_args()
def main(argv = None):
args = parse_args(argv)
client = nilmdb.client.Client(args.url)
@@ -61,20 +62,22 @@ def main(args = None):
# data_ts is the timestamp that we'll use for the current line
data_ts_base = 0
data_ts_inc = 0
data_ts_step = 1.0 / args.rate
data_ts_rate = args.rate
# clock_ts is the imprecise "real" timestamp (from the filename,
# comments, or or system clock)
clock_ts = None
def print_clock_updated():
printf("Clock time updated to %s\n", format_time(clock_ts))
printf("Clock time updated to %s\n", timestamp_to_human(clock_ts))
if data_ts_base != 0:
diff = data_ts - clock_ts
if diff >= 0:
printf(" (data timestamp ahead by %.6f sec)\n", diff)
printf(" (data timestamp ahead by %.6f sec)\n",
timestamp_to_seconds(diff))
else:
printf(" (data timestamp behind by %.6f sec)\n", -diff)
printf(" (data timestamp behind by %.6f sec)\n",
timestamp_to_seconds(-diff))
with client.stream_insert_context(args.path) as stream:
for f in args.infile:
@@ -92,7 +95,7 @@ def main(args = None):
# Subtract 1 hour because files are created at the end
# of the hour. Hopefully, we'll be able to use
# internal comments and this value won't matter anyway.
clock_ts = parse_time(filename).totimestamp() - 3600
clock_ts = parse_time(filename) - seconds_to_timestamp(3600)
print_clock_updated()
except ValueError:
pass
@@ -101,7 +104,8 @@ def main(args = None):
# Read each line
for line in f:
data_ts = data_ts_base + data_ts_inc * data_ts_step
data_ts = data_ts_base + rate_to_period(data_ts_rate,
data_ts_inc)
# If no content other than the newline, skip it
if len(line) <= 1:
@@ -110,7 +114,7 @@ def main(args = None):
# If line starts with a comment, look for a timestamp
if line[0] == '#':
try:
clock_ts = parse_time(line[1:]).totimestamp()
clock_ts = parse_time(line[1:])
print_clock_updated()
except ValueError:
pass
@@ -118,30 +122,30 @@ def main(args = None):
# If inserting live, use clock timestamp
if live:
clock_ts = time.time()
clock_ts = time_now()
# If we have a real timestamp, compare it to the data
# timestamp, and make sure things match up.
if clock_ts is not None:
if (data_ts - 10) > clock_ts:
if (data_ts - seconds_to_timestamp(10)) > clock_ts:
# Accumulated line timestamps are in the future.
# If we were to set data_ts=clock_ts, we'd create
# an overlap, so we have to just bail out here.
err = sprintf("Data is coming in too fast: data time "
"is %s but clock time is only %s",
format_time(data_ts),
format_time(clock_ts))
timestamp_to_human(data_ts),
timestamp_to_human(clock_ts))
raise ParseError(filename, err)
if (data_ts + 10) < clock_ts:
if (data_ts + seconds_to_timestamp(10)) < clock_ts:
# Accumulated line timetamps are in the past. We
# can just skip some time and leave a gap in the
# data.
if data_ts_base != 0:
printf("Skipping data timestamp forward from "
"%s to %s to match clock time\n",
format_time(data_ts),
format_time(clock_ts))
timestamp_to_human(data_ts),
timestamp_to_human(clock_ts))
stream.finalize()
data_ts_base = data_ts = clock_ts
data_ts_inc = 0
@@ -166,7 +170,7 @@ def main(args = None):
continue
# Insert it
stream.insert("%.6f %s" % (data_ts, line))
stream.insert("%d %s" % (data_ts, line))
print "Done"
if __name__ == "__main__":

126
src/prep.py Executable file
View File

@@ -0,0 +1,126 @@
#!/usr/bin/python
# Spectral envelope preprocessor.
# Requires two streams as input: the original raw data, and sinefit data.
import nilmtools.filter
import nilmdb.client
from numpy import *
import scipy.fftpack
import scipy.signal
from matplotlib import pyplot as p
import bisect
def main(argv = None):
# Set up argument parser
f = nilmtools.filter.Filter()
parser = f.setup_parser("Spectral Envelope Preprocessor", skip_paths = True)
group = parser.add_argument_group("Prep options")
group.add_argument("-c", "--column", action="store", type=int,
help="Column number (first data column is 1)")
group.add_argument("-n", "--nharm", action="store", type=int, default=4,
help="number of odd harmonics to compute")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-r", "--rotate", action="store", type=float,
help="rotate FFT output by this many degrees")
exc.add_argument("-R", "--rotate-rad", action="store", type=float,
help="rotate FFT output by this many radians")
group.add_argument("srcpath", action="store",
help="Path of raw input, e.g. /foo/raw")
group.add_argument("sinepath", action="store",
help="Path of sinefit input, e.g. /foo/sinefit")
group.add_argument("destpath", action="store",
help="Path of prep output, e.g. /foo/prep")
# Parse arguments
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
rec = "float32_%d" % (e.parsed_args.nharm * 2)
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec)
raise SystemExit(1)
# Check arguments
if args.column is None or args.column < 1:
parser.error("need a column number >= 1")
if args.nharm < 1 or args.nharm > 32:
parser.error("number of odd harmonics must be 1-32")
if args.rotate is not None:
rotation = args.rotate * 2.0 * pi / 360.0
else:
rotation = args.rotate_rad or 0.0
# Check the sine fit stream
client_sinefit = nilmdb.client.Client(args.url)
sinefit = nilmtools.filter.get_stream_info(client_sinefit, args.sinepath)
if not sinefit:
raise Exception("sinefit data not found")
if sinefit.layout != "float32_3":
raise Exception("sinefit data type is " + sinefit.layout
+ "; expected float32_3")
# Check and set metadata in prep stream
f.check_dest_metadata({ "prep_raw_source": f.src.path,
"prep_sinefit_source": sinefit.path,
"prep_column": args.column })
# Run the processing function on all data
f.process_numpy(process, args = (client_sinefit, sinefit.path, args.column,
args.nharm, rotation))
def process(data, interval, args, insert_function, final):
(client, sinefit_path, column, nharm, rotation) = args
rows = data.shape[0]
data_timestamps = data[:,0]
processed = 0
out = zeros((1, nharm * 2 + 1))
# Pull out sinefit data for the entire time range of this block
for sinefit_line in client.stream_extract(sinefit_path,
data[0, 0], data[rows-1, 0]):
# Extract sinefit data to get zero crossing timestamps
(t_min, f0, A, C) = [ float(x) for x in sinefit_line.split() ]
t_max = t_min + 1e6 / f0
# Find the indices of data that correspond to (t_min, t_max)
idx_min = bisect.bisect_left(data_timestamps, t_min)
idx_max = bisect.bisect_left(data_timestamps, t_max)
if idx_min >= idx_max:
# something's wonky; ignore this period
continue
if idx_max >= len(data_timestamps):
# max is likely past the end of our chunk, so stop
# processing this chunk now.
break
# Perform FFT over those indices
N = idx_max - idx_min
d = data[idx_min:idx_max, column]
F = scipy.fftpack.fft(d) / N
# If we wanted more harmonics than we have, pad with zeros
if N < (nharm * 2):
F = r_[F, zeros(nharm * 2 - N)]
# Fill output data
out[0, 0] = t_min
for k in range(nharm):
Fk = F[2 * k + 1] * e**(rotation * 1j * k)
out[0, 2 * k + 1] = -imag(Fk) # Pk
out[0, 2 * k + 2] = real(Fk) # Qk
# Insert it and continue
insert_function(out)
processed = idx_max
print "Processed", processed, "of", rows, "rows"
return processed
if __name__ == "__main__":
main()

187
src/sinefit.py Executable file
View File

@@ -0,0 +1,187 @@
#!/usr/bin/python
# Sine wave fitting. This runs about 5x faster than realtime on raw data.
import nilmtools.filter
import nilmdb.client
from numpy import *
from scipy import *
#import pylab as p
import operator
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Sine wave fitting")
group = parser.add_argument_group("Sine fit options")
group.add_argument('-c', '--column', action='store', type=int,
help='Column number (first data column is 1)')
group.add_argument('-f', '--frequency', action='store', type=float,
default=60.0,
help='Approximate frequency (default: %(default)s)')
# Parse arguments
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
rec = "float32_4"
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec)
raise SystemExit(1)
if args.column is None or args.column < 1:
parser.error("need a column number >= 1")
if args.frequency < 0.1:
parser.error("frequency must be >= 0.1")
f.check_dest_metadata({ "sinefit_source": f.src.path,
"sinefit_column": args.column })
f.process_numpy(process, args = (args.column, args.frequency))
def process(data, interval, args, insert_function, final):
(column, f_expected) = args
rows = data.shape[0]
# Estimate sampling frequency from timestamps
fs = 1e6 * (rows-1) / (data[-1][0] - data[0][0])
# Pull out about 3.5 periods of data at once;
# we'll expect to match 3 zero crossings in each window
N = max(int(3.5 * fs / f_expected), 10)
# If we don't have enough data, don't bother processing it
if rows < N:
return 0
# Process overlapping windows
start = 0
num_zc = 0
while start < (rows - N):
this = data[start:start+N, column]
t_min = data[start, 0]/1e6
t_max = data[start+N-1, 0]/1e6
# Do 4-parameter sine wave fit
(A, f0, phi, C) = sfit4(this, fs)
# Check bounds. If frequency is too crazy, ignore this window
if f0 < (f_expected/2) or f0 > (f_expected*2):
print "frequency", f0, "too far from expected value", f_expected
start += N
continue
#p.plot(arange(N), this)
#p.plot(arange(N), A * cos(f0/fs * 2 * pi * arange(N) + phi) + C, 'g')
# Period starts when the argument of cosine is 3*pi/2 degrees,
# so we're looking for sample number:
# n = (3 * pi / 2 - phi) / (f0/fs * 2 * pi)
zc_n = (3 * pi / 2 - phi) / (f0 / fs * 2 * pi)
period_n = fs/f0
# Add periods to make N positive
while zc_n < 0:
zc_n += period_n
last_zc = None
# Mark the zero crossings until we're a half period away
# from the end of the window
while zc_n < (N - period_n/2):
#p.plot(zc_n, C, 'ro')
t = t_min + zc_n / fs
insert_function([[t * 1e6, f0, A, C]])
num_zc += 1
last_zc = zc_n
zc_n += period_n
# Advance the window one quarter period past the last marked
# zero crossing, or advance the window by half its size if we
# didn't mark any.
if last_zc is not None:
advance = min(last_zc + period_n/4, N)
else:
advance = N/2
#p.plot(advance, C, 'go')
#p.show()
start = int(round(start + advance))
# Return the number of rows we've processed
print "Marked", num_zc, "zero-crossings in", start, "rows"
return start
def sfit4(data, fs):
"""(A, f0, phi, C) = sfit4(data, fs)
Compute 4-parameter (unknown-frequency) least-squares fit to
sine-wave data, according to IEEE Std 1241-2010 Annex B
Input:
data vector of input samples
fs sampling rate (Hz)
Output:
Parameters [A, f0, phi, C] to fit the equation
x[n] = A * cos(f0/fs * 2 * pi * n + phi) + C
where n is sample number. Or, as a function of time:
x(t) = A * cos(f0 * 2 * pi * t + phi) + C
by Jim Paris
(Verified to match sfit4.m)
"""
N = len(data)
t = linspace(0, (N-1) / fs, N)
## Estimate frequency using FFT (step b)
Fc = fft(data)
F = abs(Fc)
F[0] = 0 # eliminate DC
# Find pair of spectral lines with largest amplitude:
# resulting values are in F(i) and F(i+1)
i = argmax(F[0:int(N/2)] + F[1:int(N/2+1)])
# Interpolate FFT to get a better result (from Markus [B37])
U1 = real(Fc[i])
U2 = real(Fc[i+1])
V1 = imag(Fc[i])
V2 = imag(Fc[i+1])
n = 2 * pi / N
ni1 = n * i
ni2 = n * (i+1)
K = ((V2-V1)*sin(ni1) + (U2-U1)*cos(ni1)) / (U2-U1)
Z1 = V1 * (K - cos(ni1)) / sin(ni1) + U1
Z2 = V2 * (K - cos(ni2)) / sin(ni2) + U2
i = arccos((Z2*cos(ni2) - Z1*cos(ni1)) / (Z2-Z1)) / n
# Convert to Hz
f0 = i * fs / N
## Fit it
# first guess for A0, B0 using 3-parameter fit (step c)
w = 2*pi*f0
D = c_[cos(w*t), sin(w*t), ones(N)]
s = linalg.lstsq(D, data)[0]
# Now iterate 6 times (step i)
for idx in range(6):
D = c_[cos(w*t), sin(w*t), ones(N),
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
s = linalg.lstsq(D, data)[0] # eqn B.18
w = w + s[3] # update frequency estimate
## Extract results
A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
f0 = w / (2*pi)
try:
phi = -arctan2(s[1], s[0]) # eqn B.22
except TypeError:
# something broke down, just return zeros
return (0, 0, 0, 0)
C = s[2]
return (A, f0, phi, C)
if __name__ == "__main__":
main()