Compare commits

..

71 Commits

Author SHA1 Message Date
f4fda837ae Bump required nilmdb version to 1.6.0 2013-04-11 11:55:11 -04:00
5547d266d0 filter: Don't include trailing unprocessed data in the inserted intervals 2013-04-11 11:53:17 -04:00
372e977e4a Reverse cleanup order to handle interruptions better 2013-04-10 18:38:41 -04:00
640a680704 Increase default min amplitude in sinefit 2013-04-10 17:09:52 -04:00
2e74e6cd63 Skip over data if we aren't able to process any. Change output format 2013-04-10 17:01:07 -04:00
de2a794e00 Support wildcards in nilm-decimate-auto 2013-04-10 16:05:16 -04:00
065a40f265 sinefit: add minimum amplitude check 2013-04-10 15:33:51 -04:00
65fa43aff1 sinefit: catch all errors in sfit4 2013-04-10 14:36:50 -04:00
57c23c3792 sinefit: allow user to override min/max frequency detection 2013-04-10 14:36:40 -04:00
d4c8e4acb4 Include rotation in metadata 2013-04-10 14:36:05 -04:00
fd1b33401f Require a --yes argument before actually cleaning data 2013-04-09 20:13:38 -04:00
4c748ec00c Fix minor bugs 2013-04-09 20:08:25 -04:00
b72d6b6908 Warn if column count is wrong for this nharm value 2013-04-09 19:59:59 -04:00
80d642e52e Change nilm-cleanup config file format, tweak output 2013-04-09 19:43:41 -04:00
001b89b1d2 Support multiple shifted FFTs per period in nilm-prep.
New option --nshift controls how many shifted FFT windows to perform
per period.  "nilm-prep -N 2" is similar to old prep behavior.  Note
that this is redundant information and takes up extra storage space,
though.
2013-04-09 18:53:27 -04:00
f978823505 Fix prep scaling and fix comments 2013-04-09 17:44:13 -04:00
ffd6675979 Remove outdated code 2013-04-08 19:46:16 -04:00
5b67b68fd2 Don't import matplotlib if we don't need it 2013-04-08 18:59:23 -04:00
97503b73b9 Fix dependencies 2013-04-08 18:50:27 -04:00
4e64c804bf Merge branch 'binary' 2013-04-08 18:45:21 -04:00
189fb9df3a Use binary interface for copy_one too 2013-04-08 18:45:16 -04:00
3323c997a7 Use the new stream_insert_numpy_context function 2013-04-08 18:39:14 -04:00
e09153e34b Use the new NumpyClient for extracting data in filter 2013-04-07 18:14:35 -04:00
5c56e9d075 Remove ounused process_python function 2013-04-06 16:39:39 -04:00
60f09427cf Update decimate to use process_numpy 2013-04-06 15:56:36 -04:00
d6d31190eb Fix fromstring usage 2013-04-06 13:40:09 -04:00
2ec574c59d Use np.fromstring instead of np.loadtxt 2013-04-06 13:32:16 -04:00
1988955671 Accumulate delta separately from data timestamp 2013-04-05 17:41:48 -04:00
36e5af4be1 Fix data_ts when clock is updated 2013-04-05 16:40:04 -04:00
ca175bd9dd Improve nilm-insert to support deltas, etc, for accelerometer data 2013-04-05 16:13:56 -04:00
aa9656bc10 Fix off-by-one error in prep rotation 2013-04-04 19:23:12 -04:00
10ab2cc2de Build nilm-prep tool 2013-04-04 19:07:18 -04:00
eb6d7a8809 Fix recommended layout 2013-04-04 16:32:38 -04:00
c8be6755ae Update README dependencies 2013-04-04 15:36:30 -04:00
9e321d9e41 Add --force-metadata option; fix bugs 2013-04-04 15:33:57 -04:00
f2bebea5d0 Add copy_wildcard script 2013-04-04 15:33:57 -04:00
d919a73387 Use parser error rather than systemexit exception 2013-04-04 15:33:57 -04:00
17fa79a5dc Makefile updates 2013-04-04 15:33:57 -04:00
ca970fa1fd Rename copy_ to copy_one 2013-04-04 15:33:57 -04:00
805d8fb24f Add prep 2013-04-03 21:04:03 -04:00
05da75e34a Allow processing 0 lines; warn if data is building up 2013-04-03 21:03:44 -04:00
56e778df71 Add skip_paths option to filter.setup_parser 2013-04-03 21:03:29 -04:00
87178e9599 self.args is used for other stuff in an exception 2013-04-03 21:03:17 -04:00
f8b1a001c3 makefile updates 2013-04-03 21:02:42 -04:00
7e88da3c26 Bring get_stream_info into nilmtools.filter module; use it 2013-04-02 18:04:28 -04:00
b637f17887 Add nilm-decimate-auto script 2013-04-02 16:13:44 -04:00
9a7a1df537 Simplify StreamInfo constructor 2013-04-02 15:47:43 -04:00
101b701882 Fix sys.argv stuff; remove outdated shell scripts 2013-04-01 18:40:48 -04:00
457c518809 Add some docs about building new tools 2013-04-01 18:14:59 -04:00
3eff3d81fe Change default URL to http://localhost/nilmdb/ 2013-04-01 18:03:43 -04:00
a56dc22030 Allow arguments to be passed in as a list to main() functions 2013-04-01 18:03:22 -04:00
9b770cd28f Note speed 2013-04-01 16:39:23 -04:00
348c435d1e Catch arctan2 errors 2013-04-01 16:38:50 -04:00
7f1c1a6c32 Add nilm-sinefit script link 2013-03-30 22:53:29 -04:00
bdfc29887b Proper window advancement & insertion for sine fit; comment out plots 2013-03-30 22:51:54 -04:00
4e5907f381 Send pending data after each block 2013-03-30 22:51:37 -04:00
9078a014ae Require 2-dimensional array in insert_function 2013-03-30 22:51:20 -04:00
533892e624 Update Makefile and setup.py dependencies 2013-03-30 22:50:55 -04:00
e0f973b449 Update makefile and setup.py requirements 2013-03-30 17:30:10 -04:00
698cb6ef26 More work on sinefit 2013-03-28 14:08:50 -04:00
1db38cc5da Port sfit4 function to Python, start writing sinefit filter 2013-03-26 19:38:53 -04:00
a984e54f23 Add process_numpy function to filter.py
This converts data into big Numpy arrays and lets the user-provided
function process as much as they'd like at a time.
2013-03-26 19:38:05 -04:00
974c9a3050 Fix error message when decimate target is missing 2013-03-22 14:59:08 -04:00
320c32cfdc Sample script for copying wildcard paths between hosts 2013-03-19 17:55:18 -04:00
0f1e442cd4 Support filtering (or copying) between two different hosts 2013-03-19 17:54:59 -04:00
3e78da12dc Rename copy.py to avoid stupid Python conflicts
Any other module (in this case, nilmdb -> datetime_tz -> pytz -> gettext)
that does an "import copy" would pull in the copy.py, if it's in the
current working directory.  Python is dumb...
2013-03-19 16:19:14 -04:00
ef9277cbff Rename nilmtools directory to just src 2013-03-19 15:42:29 -04:00
de68956f76 Update copy tool 2013-03-16 23:13:45 -04:00
e73dd313d5 Reworked things to match nilmdb updates; a bit faster 2013-03-16 14:46:49 -04:00
d23fa9ee78 Remove unnecessary option group 2013-03-12 19:02:29 -04:00
2b9ecc6697 Add nilm-copy tool 2013-03-12 18:52:39 -04:00
19 changed files with 1648 additions and 472 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,6 @@
oldprep
newprep
*.dat
build/
*.pyc
dist/

View File

@@ -1,11 +1,47 @@
test:
nilmtool remove /lees-compressor/noleak/raw~4 -s 2000 -e 2020
nilmtool remove /lees-compressor/noleak/raw~16 -s 2000 -e 2020
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/noleak/raw /lees-compressor/noleak/raw~4
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/noleak/raw~4 /lees-compressor/noleak/raw~16
#URL="http://bucket.mit.edu:8080/nilmdb"
URL="http://localhost/nilmdb"
all:
ifeq ($(INSIDE_EMACS), t)
@make test
else
@echo "Try 'make install'"
endif
test: test_cleanup
test_cleanup:
src/cleanup.py -e extras/cleanup.cfg
src/cleanup.py extras/cleanup.cfg
test_insert:
@make install >/dev/null
src/insert.py --file --dry-run /test/foo </dev/null
test_copy:
@make install >/dev/null
src/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees*
test_prep:
@make install >/dev/null
-nilmtool destroy -R /test/raw
-nilmtool destroy -R /test/sinefit
-nilmtool destroy -R /test/prep
nilmtool create /test/raw float32_2
nilmtool create /test/sinefit float32_3
nilmtool create /test/prep float32_8
nilmtool insert -s '@0' -t -r 8000 /test/raw /tmp/raw.dat
src/sinefit.py -c 1 /test/raw /test/sinefit
src/prep.py -c 2 /test/raw /test/sinefit /test/prep
nilmtool extract -s min -e max /test/prep | head -20
test_decimate:
-@nilmtool destroy /lees-compressor/no-leak/raw/4 || true
-@nilmtool destroy /lees-compressor/no-leak/raw/16 || true
-@nilmtool create /lees-compressor/no-leak/raw/4 float32_18 || true
-@nilmtool create /lees-compressor/no-leak/raw/16 float32_18 || true
time python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
version:
python setup.py version

View File

@@ -6,9 +6,22 @@ Prerequisites:
# Runtime and build environments
sudo apt-get install python2.7 python2.7-dev python-setuptools
sudo apt-get install python-numpy python-scipy python-matplotlib
nilmdb (1.3.1+)
nilmdb (1.5.0+)
Install:
python setup.py install
Building new tools:
The tools in this package are meant to be installed with
"python setup.py install". If you want to make a new one,
an easier way to develop would be to first install this package,
and then copy a specific script like "src/sinefit.py" to a new
location, and modify it as desired.
To add a tool to the package, place it in "src/" and add the
appropriate configuration to "setup.py".

22
extras/cleanup.cfg Normal file
View File

@@ -0,0 +1,22 @@
[/lees-compressor/no-leak/prep]
keep = 2d
rate = 60
[*/raw]
keep = 2d
[*/something]
rate = 10
[*/sinefit]
keep = 1w
decimated = False
[/test/raw]
keep = 0.01d
[/test/sinefit]
keep = 0.01d
[/test/prep]
keep = 0.01d

View File

@@ -1,70 +0,0 @@
#!/usr/bin/python
import nilmtools.filter
import nilmdb.client
import numpy as np
def main():
f = nilmtools.filter.Filter()
parser = f.setup_parser("Decimate a stream")
group = parser.add_argument_group("Decimate options")
group.add_argument('-f', '--factor', action='store', default=4, type=int,
help='Decimation factor (default: %(default)s)')
# Parse arguments
try:
args = f.parse_args()
except nilmtools.filter.MissingDestination as e:
# If no destination, suggest how to create it by figuring out
# a recommended layout.
print "Source is %s (%s)" % (e.src, e.layout)
print "Destination %s doesn't exist" % (e.dest)
if "decimate_source" in f.client.stream_get_metadata(e.src):
rec = e.layout
elif 'int32' in e.layout_type or 'float64' in e.layout_type:
rec = 'float64_' + str(e.layout_count * 3)
else:
rec = 'float32_' + str(e.layout_count * 3)
print "You could make it with a command like:"
print " nilmtool create", e.dest, rec
raise SystemExit(1)
f.check_dest_metadata({ "decimate_source": args.srcpath,
"decimate_factor": args.factor })
# If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client.stream_get_metadata(args.srcpath):
f.process(function = decimate_again, rows = args.factor)
else:
f.process(function = decimate_first, rows = args.factor)
def decimate_first(data):
"""Decimate original data -- result has 3 times as many columns"""
data = np.array(data)
rows, cols = data.shape
n = cols - 1
out = np.zeros(1 + 3 * n)
out[0] = np.mean(data[:, 0], 0)
out[ 1 : n+1 ] = np.mean(data[:, 1 : n+1], 0)
out[ n+1 : 2*n+1] = np.min( data[:, 1 : n+1], 0)
out[2*n+1 : 3*n+1] = np.max( data[:, 1 : n+1], 0)
return [out]
def decimate_again(data):
"""Decimate already-decimated data -- result has the same number
of columns"""
data = np.array(data)
rows, cols = data.shape
n = (cols - 1) // 3
out = np.zeros(1 + 3 * n)
out[0] = np.mean(data[:, 0], 0)
out[ 1 : n+1 ] = np.mean(data[:, 1 : n+1], 0)
out[ n+1 : 2*n+1] = np.min( data[:, n+1 : 2*n+1], 0)
out[2*n+1 : 3*n+1] = np.max( data[:, 2*n+1 : 3*n+1], 0)
return [out]
if __name__ == "__main__":
main()

View File

@@ -1,220 +0,0 @@
#!/usr/bin/python
import nilmdb.client
from nilmdb.utils.printf import *
from nilmdb.utils.time import parse_time, format_time
import nilmtools
import itertools
import time
import sys
import re
import argparse
class MissingDestination(Exception):
def __init__(self, src, layout, dest):
self.src = src
self.layout = layout
self.layout_type = layout.split('_')[0]
self.layout_count = int(layout.split('_')[1])
self.dest = dest
Exception.__init__(self, "destination path " + dest + " not found")
class Filter(object):
def __init__(self):
self._parser = None
self._args = None
self._client = None
self._using_client = False
self.srcinfo = None
self.destinfo = None
@property
def client(self):
if self._using_client:
raise Exception("Filter client is in use; make another")
return self._client
def setup_parser(self, description = "Filter data"):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = description)
group = parser.add_argument_group("General filter arguments")
group.add_argument("-u", "--url", action="store",
default="http://localhost:12380/",
help="Server URL (default: %(default)s)")
group.add_argument("-D", "--dry-run", action="store_true",
default = False,
help="Just print intervals that would be "
"processed")
group.add_argument("-s", "--start",
metavar="TIME", type=self.arg_time,
help="Starting timestamp for intervals "
"(free-form, inclusive)")
group.add_argument("-e", "--end",
metavar="TIME", type=self.arg_time,
help="Ending timestamp for intervals "
"(free-form, noninclusive)")
group.add_argument("srcpath", action="store",
help="Path of source stream, e.g. /foo/bar")
group.add_argument("destpath", action="store",
help="Path of destination stream, e.g. /foo/bar")
self._parser = parser
return parser
def parse_args(self):
args = self._parser.parse_args()
self._args = args
self._client = nilmdb.client.Client(args.url)
if args.srcpath == args.destpath:
raise Exception("source and destination path must be different")
# Open and print info about the streams
src = self._client.stream_list(args.srcpath, extended = True)
if len(src) != 1:
raise Exception("source path " + args.srcpath + " not found")
self.srcinfo = src[0]
dest = self._client.stream_list(args.destpath, extended = True)
if len(dest) != 1:
raise MissingDestination(self.srcinfo[0], self.srcinfo[1],
args.destpath)
self.destinfo = dest[0]
print "Source:", self.stream_info_string(self.srcinfo)
print " Dest:", self.stream_info_string(self.destinfo)
if args.dry_run:
for interval in self.intervals():
print self.interval_string(interval)
raise SystemExit(0)
return args
def intervals(self):
"""Generate all the intervals that this filter should process"""
self._using_client = True
for i in self._client.stream_intervals(
self._args.srcpath, diffpath = self._args.destpath,
start = self._args.start, end = self._args.end):
yield i
self._using_client = False
# Misc helpers
def arg_time(self, toparse):
"""Parse a time string argument"""
try:
return nilmdb.utils.time.parse_time(toparse).totimestamp()
except ValueError as e:
raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
str(e), toparse))
def stream_info_string(self, info):
"""Print stream info as a string"""
return sprintf("%s (%s), %.2fM rows, %.2f hours",
info[0], info[1], info[4] / 1e6, info[5] / 3600)
def interval_string(self, interval):
"""Print interval as a string"""
return sprintf("[ %s -> %s ]", format_time(interval[0]),
format_time(interval[1]))
def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. If
there's no conflict, update the metadata to match 'data'."""
metadata = self._client.stream_get_metadata(self._args.destpath)
rows = self.destinfo[4]
for key in data:
wanted = str(data[key])
val = metadata.get(key, wanted)
if val != wanted and rows > 0:
m = "Metadata in destination stream:\n"
m += " %s = %s\n" % (key, val)
m += "doesn't match desired data:\n"
m += " %s = %s\n" % (key, wanted)
m += "Refusing to change it. You can change the stream's "
m += "metadata manually, or\n"
m += "remove existing data from the stream, to prevent "
m += "this error.\n"
raise Exception(m)
# All good -- write the metadata in case it's not already there
self._client.stream_update_metadata(self._args.destpath, data)
# Main processing helper
def process(self, function, rows, partial = True, args = None):
"""Process data in chunks of 'rows' data at a time.
function: function to process the data
rows: maximum number of rows to pass to 'function' at once
args: tuple containing extra arguments to pass to 'function'
partial: if true, less than 'rows' may be passed to 'function'.
if false, partial data at the end of an interval will
be dropped.
'function' should be defined like:
function(data, *args)
It will be passed an array containing up to 'rows' rows of
data from the source stream, and any arguments passed in
'args'. It should transform the data as desired, and return a
new array of data, which will be inserted into the destination
stream.
"""
if args is None:
args = []
extractor = nilmdb.client.Client(self._args.url).stream_extract
inserter = nilmdb.client.Client(self._args.url).stream_insert_context
src = self._args.srcpath
dest = self._args.destpath
islice = itertools.islice
# Figure out how to format output data
dest_layout = self.destinfo[1].split('_')[1]
def int_formatter(row):
return ("%.6f " % row[0]) + " ".join(str(int(x)) for x in row[1:])
def float_formatter(row):
return ("%.6f " % row[0]) + " ".join(repr(x) for x in row[1:])
if "int" in dest_layout:
formatter = int_formatter
else:
formatter = float_formatter
for (start, end) in self.intervals():
print "Processing", self.interval_string((start, end))
with inserter(dest, start, end) as insert_ctx:
src_array = []
for line in extractor(src, start, end):
# Read in data
src_array.append([ float(x) for x in line.split() ])
if len(src_array) == rows:
# Pass through filter function
dest_array = function(src_array, *args)
# Write result to destination
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("\n".join(out) + "\n")
# Clear source array
src_array = []
# Take care of partial chunk
if len(src_array) and partial:
dest_array = function(src_array, *args)
out = [ formatter(row) for row in dest_array ]
insert_ctx.insert("\n".join(out) + "\n")
def main():
# This is just a dummy function; actual filters can use the other
# functions to prepare stuff, and then do something with the data.
f = Filter()
parser = f.setup_parser()
args = f.parse_args()
for (start, end) in f.intervals():
print "Generic filter: need to handle", start, " to ", end
if __name__ == "__main__":
main()

View File

@@ -1,173 +0,0 @@
#!/usr/bin/python
import nilmdb.client
from nilmdb.utils.printf import *
from nilmdb.utils.time import parse_time, format_time
import nilmtools
import time
import sys
import re
import argparse
import subprocess
class ParseError(Exception):
def __init__(self, filename, error):
msg = filename + ": " + error
super(ParseError, self).__init__(msg)
def parse_args():
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = """\
Insert data from ethstream, either live (using the system time as a
reference) or prerecorded (using comments in the file as a reference).
The data is assumed to have been recorded at the specified rate.
Small discrepencies between the accumulated timestamps and the
reference time are ignored; larger discrepencies cause gaps to be
created in the stream. Overlapping data returns an error.
""")
parser.add_argument("-u", "--url", action="store",
default="http://localhost:12380/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-r", "--rate", action="store", default=8000,
type=float,
help="Data rate in Hz (default: %(default)s)")
parser.add_argument("-l", "--live", action="store_true",
help="Live capture; use system time to verify rate")
parser.add_argument("path", action="store",
help="Path of stream, e.g. /foo/bar")
parser.add_argument("infile", type=argparse.FileType('r'), nargs='*',
default=[sys.stdin],
help="Input files (default: stdin)")
args = parser.parse_args()
printf("Stream path: %s\n", args.path)
printf(" Data rate: %s Hz\n", repr(args.rate))
return args
def main(args = None):
if args is None:
args = parse_args()
client = nilmdb.client.Client(args.url)
# Local copies to save dictionary lookups
live = args.live
# data_ts is the timestamp that we'll use for the current line
data_ts_base = 0
data_ts_inc = 0
data_ts_step = 1.0 / args.rate
# clock_ts is the imprecise "real" timestamp (from the filename,
# comments, or or system clock)
clock_ts = None
def print_clock_updated():
printf("Clock time updated to %s\n", format_time(clock_ts))
if data_ts_base != 0:
diff = data_ts - clock_ts
if diff >= 0:
printf(" (data timestamp ahead by %.6f sec)\n", diff)
else:
printf(" (data timestamp behind by %.6f sec)\n", -diff)
with client.stream_insert_context(args.path) as stream:
for f in args.infile:
filename = f.name
printf("Processing %s\n", filename)
# If the filename ends in .gz, open it with gzcat instead.
if filename.endswith(".gz"):
p = subprocess.Popen(["gzip", "-dc"],
stdin = f, stdout = subprocess.PIPE)
f = p.stdout
# Try to get a real timestamp from the filename
try:
# Subtract 1 hour because files are created at the end
# of the hour. Hopefully, we'll be able to use
# internal comments and this value won't matter anyway.
clock_ts = parse_time(filename).totimestamp() - 3600
print_clock_updated()
except ValueError:
pass
truncated_lines = 0
# Read each line
for line in f:
data_ts = data_ts_base + data_ts_inc * data_ts_step
# If no content other than the newline, skip it
if len(line) <= 1:
continue
# If line starts with a comment, look for a timestamp
if line[0] == '#':
try:
clock_ts = parse_time(line[1:]).totimestamp()
print_clock_updated()
except ValueError:
pass
continue
# If inserting live, use clock timestamp
if live:
clock_ts = time.time()
# If we have a real timestamp, compare it to the data
# timestamp, and make sure things match up.
if clock_ts is not None:
if (data_ts - 10) > clock_ts:
# Accumulated line timestamps are in the future.
# If we were to set data_ts=clock_ts, we'd create
# an overlap, so we have to just bail out here.
err = sprintf("Data is coming in too fast: data time "
"is %s but clock time is only %s",
format_time(data_ts),
format_time(clock_ts))
raise ParseError(filename, err)
if (data_ts + 10) < clock_ts:
# Accumulated line timetamps are in the past. We
# can just skip some time and leave a gap in the
# data.
if data_ts_base != 0:
printf("Skipping data timestamp forward from "
"%s to %s to match clock time\n",
format_time(data_ts),
format_time(clock_ts))
stream.finalize()
data_ts_base = data_ts = clock_ts
data_ts_inc = 0
# Don't use this clock time anymore until we update it
clock_ts = None
if data_ts_base == 0:
raise ParseError(filename, "No idea what timestamp to use")
# This line is legit, so increment timestamp
data_ts_inc += 1
# Once in a while a line might be truncated, if we're at
# the end of a file. Ignore it, but if we ignore too many,
# bail out.
if line[-1] != '\n':
truncated_lines += 1
if truncated_lines > 3:
raise ParseError(filename, "too many short lines")
printf("Ignoring short line in %s\n", filename)
continue
# Insert it
stream.insert("%.6f %s" % (data_ts, line))
print "Done"
if __name__ == "__main__":
main()

View File

@@ -30,7 +30,7 @@ except ImportError:
# Versioneer manages version numbers from git tags.
# https://github.com/warner/python-versioneer
import versioneer
versioneer.versionfile_source = 'nilmtools/_version.py'
versioneer.versionfile_source = 'src/_version.py'
versioneer.versionfile_build = 'nilmtools/_version.py'
versioneer.tag_prefix = 'nilmtools-'
versioneer.parentdir_prefix = 'nilmtools-'
@@ -61,14 +61,24 @@ setup(name='nilmtools',
long_description = "NILM Database Tools",
license = "Proprietary",
author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.3.0',
install_requires = [ 'nilmdb >= 1.6.0',
'numpy',
'scipy',
'matplotlib',
],
packages = [ 'nilmtools',
],
package_dir = { 'nilmtools': 'src' },
entry_points = {
'console_scripts': [
'nilm-decimate = nilmtools.decimate:main',
'nilm-decimate-auto = nilmtools.decimate_auto:main',
'nilm-insert = nilmtools.insert:main',
'nilm-copy = nilmtools.copy_one:main',
'nilm-prep = nilmtools.prep:main',
'nilm-copy-wildcard = nilmtools.copy_wildcard:main',
'nilm-sinefit = nilmtools.sinefit:main',
'nilm-cleanup = nilmtools.cleanup:main',
],
},
zip_safe = False,

View File

View File

@@ -181,7 +181,7 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False)
tag_prefix = "nilmtools-"
parentdir_prefix = "nilmtools-"
versionfile_source = "nilmtools/_version.py"
versionfile_source = "src/_version.py"
def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
variables = { "refnames": git_refnames, "full": git_full }

257
src/cleanup.py Executable file
View File

@@ -0,0 +1,257 @@
#!/usr/bin/python
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds, seconds_to_timestamp)
from nilmdb.utils.diskusage import human_size
from nilmdb.utils.interval import Interval
import nilmdb.client
import nilmdb.client.numpyclient
import nilmtools
import argparse
import ConfigParser
import sys
import collections
import fnmatch
import re
def warn(msg, *args):
fprintf(sys.stderr, "warning: " + msg + "\n", *args)
class TimePeriod(object):
_units = { 'h': ('hour', 60*60),
'd': ('day', 60*60*24),
'w': ('week', 60*60*24*7),
'm': ('month', 60*60*24*30),
'y': ('year', 60*60*24*365) }
def __init__(self, val):
for u in self._units:
if val.endswith(u):
self.unit = self._units[u][0]
self.scale = self._units[u][1]
self.count = float(val[:-len(u)])
break
else:
raise ValueError("unknown units: " + units)
def seconds(self):
return self.count * self.scale
def describe_seconds(self, seconds):
count = seconds / self.scale
units = self.unit if count == 1 else (self.unit + "s")
if count == int(count):
return sprintf("%d %s", count, units)
else:
return sprintf("%.2f %s", count, units)
def __str__(self):
return self.describe_seconds(self.seconds())
class StreamCleanupConfig(object):
def __init__(self, info):
self.path = info[0]
self.layout = info[1]
if info[4] != 0 and info[5] != 0:
self.rate = info[4] / timestamp_to_seconds(info[5])
else:
self.rate = None
self.keep = None
self.clean_decimated = True
self.decimated_from = None
self.also_clean_paths = []
def main(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = """\
Clean up old data from streams using a configuration file to specify
which data to remove.
The format of the config file is as follows:
[/stream/path]
keep = 3w # keep up to 3 weeks of data
rate = 8000 # optional, used for the --estimate option
decimated = false # whether to delete decimated data too (default true)
[*/prep]
keep = 3.5m # or 2520h or 105d or 15w or 0.29y
The suffix for 'keep' is 'h' for hours, 'd' for days, 'w' for weeks,
'm' for months, or 'y' for years.
Streams paths may include wildcards. If a path is matched by more than
one config section, data from the last config section counts.
Decimated streams (paths containing '~decim-') are treated specially:
- They don't match wildcards
- When deleting data from a parent stream, data is also deleted
from its decimated streams, unless decimated=false
Rate is optional and is only used for the --estimate option.
""")
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-y", "--yes", action="store_true",
default = False,
help="Actually remove the data (default: no)")
parser.add_argument("-e", "--estimate", action="store_true",
default = False,
help="Estimate how much disk space will be used")
parser.add_argument("configfile", type=argparse.FileType('r'),
help="Configuration file")
args = parser.parse_args(argv)
# Parse config file
config = ConfigParser.RawConfigParser()
config.readfp(args.configfile)
# List all streams
client = nilmdb.client.Client(args.url)
streamlist = client.stream_list(extended = True)
# Create config objects
streams = collections.OrderedDict()
for s in streamlist:
streams[s[0]] = StreamCleanupConfig(s)
m = re.search(r"^(.*)~decim-[0-9]+$", s[0])
if m:
streams[s[0]].decimated_from = m.group(1)
# Build up configuration
for section in config.sections():
matched = False
for path in streams.iterkeys():
# Decimated streams only allow exact matches
if streams[path].decimated_from and path != section:
continue
if not fnmatch.fnmatch(path, section):
continue
matched = True
options = config.options(section)
# Keep period (days, weeks, months, years)
if 'keep' in options:
streams[path].keep = TimePeriod(config.get(section, 'keep'))
options.remove('keep')
# Rate
if 'rate' in options:
streams[path].rate = config.getfloat(section, 'rate')
options.remove('rate')
# Decimated
if 'decimated' in options:
val = config.getboolean(section, 'decimated')
streams[path].clean_decimated = val
options.remove('decimated')
for leftover in options:
warn("option '%s' for '%s' is unknown", leftover, section)
if not matched:
warn("config for '%s' did not match any existing streams", section)
# List all decimated streams in the parent stream's info
for path in streams.keys():
src = streams[path].decimated_from
if src and src in streams:
if streams[src].clean_decimated:
streams[src].also_clean_paths.append(path)
del streams[path]
# Warn about streams that aren't getting cleaned up
for path in streams.keys():
if streams[path].keep is None or streams[path].keep.seconds() < 0:
warn("no config for existing stream '%s'", path)
del streams[path]
if args.estimate:
# Estimate disk usage
total = 0
for path in streams.keys():
rate = streams[path].rate
if not rate or rate < 0:
warn("unable to estimate disk usage for stream '%s' because "
"the data rate is unknown", path)
continue
printf("%s:\n", path)
layout = streams[path].layout
dtype = nilmdb.client.numpyclient.layout_to_dtype(layout)
per_row = dtype.itemsize
per_sec = per_row * rate
printf("%17s: %s per row, %s rows per second\n",
"base rate",
human_size(per_row),
round(rate,1))
printf("%17s: %s per hour, %s per day\n",
"base size",
human_size(per_sec * 3600),
human_size(per_sec * 3600 * 24))
# If we'll be cleaning up decimated data, add an
# estimation for how much room decimated data takes up.
if streams[path].clean_decimated:
d_layout = "float32_" + str(3*(int(layout.split('_')[1])))
d_dtype = nilmdb.client.numpyclient.layout_to_dtype(d_layout)
# Assume the decimations will be a factor of 4
# sum_{k=0..inf} (rate / (n^k)) * d_dtype.itemsize
d_per_row = d_dtype.itemsize
factor = 4.0
d_per_sec = d_per_row * (rate / factor) * (1 / (1 - (1/factor)))
per_sec += d_per_sec
printf("%17s: %s per hour, %s per day\n",
"with decimation",
human_size(per_sec * 3600),
human_size(per_sec * 3600 * 24))
keep = per_sec * streams[path].keep.seconds()
printf("%17s: %s\n\n",
"keep " + str(streams[path].keep), human_size(keep))
total += keep
printf("Total estimated disk usage for these streams:\n")
printf(" %s\n", human_size(total))
raise SystemExit(0)
# Do the cleanup
for path in streams:
printf("%s: keep %s\n", path, streams[path].keep)
# Figure out the earliest timestamp we should keep.
intervals = [ Interval(start, end) for (start, end) in
reversed(list(client.stream_intervals(path))) ]
total = 0
keep = seconds_to_timestamp(streams[path].keep.seconds())
for i in intervals:
total += i.end - i.start
if total <= keep:
continue
remove_before = i.start + (total - keep)
break
else:
printf(" nothing to do (only %s of data present)\n",
streams[path].keep.describe_seconds(
timestamp_to_seconds(total)))
continue
printf(" removing data before %s\n", timestamp_to_human(remove_before))
# Clean in reverse order. Since we only use the primary stream and not
# the decimated streams to figure out which data to remove, removing
# the primary stream last means that we might recover more nicely if
# we are interrupted and restarted.
clean_paths = list(reversed(streams[path].also_clean_paths)) + [ path ]
for p in clean_paths:
printf(" removing from %s\n", p)
if args.yes:
client.stream_remove(p, None, remove_before)
# All done
if not args.yes:
printf("Note: specify --yes to actually perform removals\n")
return
if __name__ == "__main__":
main()

41
src/copy_one.py Executable file
View File

@@ -0,0 +1,41 @@
#!/usr/bin/python
# This is called copy_one instead of copy to avoid name conflicts with
# the Python standard library.
import nilmtools.filter
import nilmdb.client
from nilmdb.client.numpyclient import NumpyClient
import numpy as np
import sys
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Copy a stream")
# Parse arguments
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, e.src.layout)
raise SystemExit(1)
# Copy metadata
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata(meta)
# Copy all rows of data using the faster Numpy interfaces
extractor = NumpyClient(f.src.url).stream_extract_numpy
inserter = NumpyClient(f.dest.url).stream_insert_numpy_context
for i in f.intervals():
print "Processing", f.interval_string(i)
with inserter(f.dest.path, i.start, i.end) as insert_ctx:
for data in extractor(f.src.path, i.start, i.end):
insert_ctx.insert(data)
if __name__ == "__main__":
main()

70
src/copy_wildcard.py Executable file
View File

@@ -0,0 +1,70 @@
#!/usr/bin/python
# Copy streams between NilmDB servers with wildcards
import nilmtools.filter
import nilmtools.copy_one
import nilmdb.client
import argparse
import fnmatch
def main(argv = None):
f = nilmtools.filter.Filter()
# Reuse filter's parser, since it handles most options we need.
parser = f.setup_parser(description = """\
Copy all streams matching the given wildcard from one host to another.
Example: %(prog)s -u http://host1/nilmdb -U http://host2/nilmdb /sharon/*
""", skip_paths = True)
parser.add_argument("path", action="store", nargs="+",
help='Wildcard paths to copy')
args = parser.parse_args(argv)
# Verify arguments
if args.dest_url is None:
parser.error("must provide both source and destination URL")
client_src = nilmdb.client.Client(args.url)
client_dest = nilmdb.client.Client(args.dest_url)
if client_src.geturl() == client_dest.geturl():
parser.error("source and destination URL must be different")
print "Source URL:", client_src.geturl()
print " Dest URL:", client_dest.geturl()
# Find matching streams
matched = []
for path in args.path:
matched.extend([s for s in client_src.stream_list(extended = True)
if fnmatch.fnmatch(s[0], path)
and s not in matched])
# Create destination streams if they don't exist
for stream in matched:
src = nilmtools.filter.StreamInfo(client_src.geturl(), stream)
dest = nilmtools.filter.get_stream_info(client_dest, src.path)
if not dest:
print "Creating destination stream", src.path
client_dest.stream_create(src.path, src.layout)
# Copy them all by running the "copy" tool as if it were
# invoked from the command line.
for stream in matched:
new_argv = ["--url", client_src.geturl(),
"--dest-url", client_dest.geturl() ]
if args.start:
new_argv.extend(["--start", "@" + repr(args.start)])
if args.end:
new_argv.extend(["--end", "@" + repr(args.end)])
if args.dry_run:
new_argv.extend(["--dry-run"])
if args.force_metadata:
new_argv.extend(["--force-metadata"])
new_argv.extend([stream[0], stream[0]])
try:
nilmtools.copy_one.main(new_argv)
except SystemExit as e:
# Ignore SystemExit which could be raised on --dry-run
if e.code != 0:
raise
if __name__ == "__main__":
main()

85
src/decimate.py Executable file
View File

@@ -0,0 +1,85 @@
#!/usr/bin/python
import nilmtools.filter
import nilmdb.client
import numpy as np
import operator
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Decimate a stream")
group = parser.add_argument_group("Decimate options")
group.add_argument('-f', '--factor', action='store', default=4, type=int,
help='Decimation factor (default: %(default)s)')
# Parse arguments
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
# If no destination, suggest how to create it by figuring out
# a recommended layout.
src = e.src
dest = e.dest
print "Source is %s (%s)" % (src.path, src.layout)
print "Destination %s doesn't exist" % (dest.path)
if "decimate_source" in f.client_src.stream_get_metadata(src.path):
rec = src.layout
elif 'int32' in src.layout_type or 'float64' in src.layout_type:
rec = 'float64_' + str(src.layout_count * 3)
else:
rec = 'float32_' + str(src.layout_count * 3)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, rec)
raise SystemExit(1)
if not (args.factor >= 2):
raise Exception("factor needs to be 2 or more")
f.check_dest_metadata({ "decimate_source": f.src.path,
"decimate_factor": args.factor })
# If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath):
again = True
else:
again = False
f.process_numpy(decimate, args = (args.factor, again))
def decimate(data, interval, args, insert_function, final):
"""Decimate data"""
(factor, again) = args
(n, m) = data.shape
# Figure out which columns to use as the source for mean, min, and max,
# depending on whether this is the first decimation or we're decimating
# again. Note that we include the timestamp in the means.
if again:
c = (m - 1) // 3
# e.g. c = 3
# ts mean1 mean2 mean3 min1 min2 min3 max1 max2 max3
mean_col = slice(0, c + 1)
min_col = slice(c + 1, 2 * c + 1)
max_col = slice(2 * c + 1, 3 * c + 1)
else:
mean_col = slice(0, m)
min_col = slice(1, m)
max_col = slice(1, m)
# Discard extra rows that aren't a multiple of factor
n = n // factor * factor
data = data[:n,:]
# Reshape it into 3D so we can process 'factor' rows at a time
data = data.reshape(n // factor, factor, m)
# Fill the result
out = np.c_[ np.mean(data[:,:,mean_col], axis=1),
np.min(data[:,:,min_col], axis=1),
np.max(data[:,:,max_col], axis=1) ]
insert_function(out)
return n
if __name__ == "__main__":
main()

98
src/decimate_auto.py Executable file
View File

@@ -0,0 +1,98 @@
#!/usr/bin/python
import nilmtools.filter
import nilmtools.decimate
import nilmdb.client
import argparse
import fnmatch
def main(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = """\
Automatically create multiple decimations from a single source
stream, continuing until the last decimated level contains fewer
than 500 points total.
Wildcards and multiple paths are accepted. Decimated paths are
ignored when matching wildcards.
""")
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument('-f', '--factor', action='store', default=4, type=int,
help='Decimation factor (default: %(default)s)')
parser.add_argument("--force-metadata", action="store_true",
default = False,
help="Force metadata changes if the dest "
"doesn't match")
parser.add_argument("path", action="store", nargs='+',
help='Path of base stream')
args = parser.parse_args(argv)
# Pull out info about the base stream
client = nilmdb.client.Client(args.url)
# Find list of paths to process
streams = [ unicode(s[0]) for s in client.stream_list() ]
streams = [ s for s in streams if "~decim-" not in s ]
paths = []
for path in args.path:
new = fnmatch.filter(streams, unicode(path))
if not new:
print "error: no stream matched path:", path
raise SystemExit(1)
paths.extend(new)
for path in paths:
do_decimation(client, args, path)
def do_decimation(client, args, path):
print "Decimating", path
info = nilmtools.filter.get_stream_info(client, path)
if not info:
raise Exception("path " + path + " not found")
meta = client.stream_get_metadata(path)
if "decimate_source" in meta:
print "Stream", path, "was decimated from", meta["decimate_source"]
print "You need to pass the base stream instead"
raise SystemExit(1)
# Figure out the type we should use for decimated streams
if 'int32' in info.layout_type or 'float64' in info.layout_type:
decimated_type = 'float64_' + str(info.layout_count * 3)
else:
decimated_type = 'float32_' + str(info.layout_count * 3)
# Now do the decimations until we have few enough points
factor = 1
while True:
print "Level", factor, "decimation has", info.rows, "rows"
if info.rows <= 500:
break
factor *= args.factor
new_path = "%s~decim-%d" % (path, factor)
# Create the stream if needed
new_info = nilmtools.filter.get_stream_info(client, new_path)
if not new_info:
print "Creating stream", new_path
client.stream_create(new_path, decimated_type)
# Run the decimation as if it were run from the commandline
new_argv = [ "-u", args.url,
"-f", str(args.factor) ]
if args.force_metadata:
new_argv.extend([ "--force-metadata" ])
new_argv.extend([info.path, new_path])
nilmtools.decimate.main(new_argv)
# Update info using the newly decimated stream
info = nilmtools.filter.get_stream_info(client, new_path)
return
if __name__ == "__main__":
main()

344
src/filter.py Normal file
View File

@@ -0,0 +1,344 @@
#!/usr/bin/python
from __future__ import absolute_import
import nilmdb.client
from nilmdb.client import Client
from nilmdb.client.numpyclient import NumpyClient
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds)
from nilmdb.utils.interval import Interval
import nilmtools
import itertools
import time
import sys
import re
import argparse
import numpy as np
import cStringIO
class MissingDestination(Exception):
def __init__(self, args, src, dest):
self.parsed_args = args
self.src = src
self.dest = dest
Exception.__init__(self, "destination path " + dest.path + " not found")
class StreamInfo(object):
def __init__(self, url, info):
self.url = url
self.info = info
try:
self.path = info[0]
self.layout = info[1]
self.layout_type = self.layout.split('_')[0]
self.layout_count = int(self.layout.split('_')[1])
self.total_count = self.layout_count + 1
self.timestamp_min = info[2]
self.timestamp_max = info[3]
self.rows = info[4]
self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5])
except IndexError, TypeError:
pass
def string(self, interhost):
"""Return stream info as a string. If interhost is true,
include the host URL."""
if interhost:
return sprintf("[%s] ", self.url) + str(self)
return str(self)
def __str__(self):
"""Return stream info as a string."""
return sprintf("%s (%s), %.2fM rows, %.2f hours",
self.path, self.layout, self.rows / 1e6,
self.seconds / 3600.0)
def get_stream_info(client, path):
"""Return a StreamInfo object about the given path, or None if it
doesn't exist"""
streams = client.stream_list(path, extended = True)
if len(streams) != 1:
return None
return StreamInfo(client.geturl(), streams[0])
class Filter(object):
def __init__(self):
self._parser = None
self._client_src = None
self._client_dest = None
self._using_client = False
self.src = None
self.dest = None
self.start = None
self.end = None
self.interhost = False
self.force_metadata = False
@property
def client_src(self):
if self._using_client:
raise Exception("Filter client is in use; make another")
return self._client_src
@property
def client_dest(self):
if self._using_client:
raise Exception("Filter client is in use; make another")
return self._client_dest
def setup_parser(self, description = "Filter data", skip_paths = False):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = description)
group = parser.add_argument_group("General filter arguments")
group.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="Server URL (default: %(default)s)")
group.add_argument("-U", "--dest-url", action="store",
help="Destination server URL "
"(default: same as source)")
group.add_argument("-D", "--dry-run", action="store_true",
default = False,
help="Just print intervals that would be "
"processed")
group.add_argument("--force-metadata", action="store_true",
default = False,
help="Force metadata changes if the dest "
"doesn't match")
group.add_argument("-s", "--start",
metavar="TIME", type=self.arg_time,
help="Starting timestamp for intervals "
"(free-form, inclusive)")
group.add_argument("-e", "--end",
metavar="TIME", type=self.arg_time,
help="Ending timestamp for intervals "
"(free-form, noninclusive)")
if not skip_paths:
# Individual filter scripts might want to add these arguments
# themselves, to include multiple sources in a different order
# (for example). "srcpath" and "destpath" arguments must exist,
# though.
group.add_argument("srcpath", action="store",
help="Path of source stream, e.g. /foo/bar")
group.add_argument("destpath", action="store",
help="Path of destination stream, e.g. /foo/bar")
self._parser = parser
return parser
def interval_string(self, interval):
return sprintf("[ %s -> %s ]",
timestamp_to_human(interval.start),
timestamp_to_human(interval.end))
def parse_args(self, argv = None):
args = self._parser.parse_args(argv)
if args.dest_url is None:
args.dest_url = args.url
if args.url != args.dest_url:
self.interhost = True
self._client_src = Client(args.url)
self._client_dest = Client(args.dest_url)
if (not self.interhost) and (args.srcpath == args.destpath):
self._parser.error("source and destination path must be different")
# Open and print info about the streams
self.src = get_stream_info(self._client_src, args.srcpath)
if not self.src:
self._parser.error("source path " + args.srcpath + " not found")
self.dest = get_stream_info(self._client_dest, args.destpath)
if not self.dest:
raise MissingDestination(args, self.src,
StreamInfo(args.dest_url, [args.destpath]))
print "Source:", self.src.string(self.interhost)
print " Dest:", self.dest.string(self.interhost)
if args.dry_run:
for interval in self.intervals():
print self.interval_string(interval)
raise SystemExit(0)
self.force_metadata = args.force_metadata
self.start = args.start
self.end = args.end
return args
def _optimize_int(self, it):
"""Join and yield adjacent intervals from the iterator 'it'"""
saved_int = None
for interval in it:
if saved_int is not None:
if saved_int.end == interval.start:
interval.start = saved_int.start
else:
yield saved_int
saved_int = interval
if saved_int is not None:
yield saved_int
def intervals(self):
"""Generate all the intervals that this filter should process"""
self._using_client = True
if self.interhost:
# Do the difference ourselves
s_intervals = ( Interval(start, end)
for (start, end) in
self._client_src.stream_intervals(
self.src.path,
start = self.start, end = self.end) )
d_intervals = ( Interval(start, end)
for (start, end) in
self._client_dest.stream_intervals(
self.dest.path,
start = self.start, end = self.end) )
intervals = nilmdb.utils.interval.set_difference(s_intervals,
d_intervals)
else:
# Let the server do the difference for us
intervals = ( Interval(start, end)
for (start, end) in
self._client_src.stream_intervals(
self.src.path, diffpath = self.dest.path,
start = self.start, end = self.end) )
# Optimize intervals: join intervals that are adjacent
for interval in self._optimize_int(intervals):
yield interval
self._using_client = False
# Misc helpers
def arg_time(self, toparse):
"""Parse a time string argument"""
try:
return nilmdb.utils.time.parse_time(toparse)
except ValueError as e:
raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
str(e), toparse))
def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. If
there's no conflict, update the metadata to match 'data'."""
metadata = self._client_dest.stream_get_metadata(self.dest.path)
if not self.force_metadata:
for key in data:
wanted = str(data[key])
val = metadata.get(key, wanted)
if val != wanted and self.dest.rows > 0:
m = "Metadata in destination stream:\n"
m += " %s = %s\n" % (key, val)
m += "doesn't match desired data:\n"
m += " %s = %s\n" % (key, wanted)
m += "Refusing to change it. To prevent this error, "
m += "change or delete the metadata with nilmtool,\n"
m += "remove existing data from the stream, or "
m += "retry with --force-metadata."
raise Exception(m)
# All good -- write the metadata in case it's not already there
self._client_dest.stream_update_metadata(self.dest.path, data)
# The main filter processing method.
def process_numpy(self, function, args = None, rows = 100000):
"""For all intervals that exist in self.src but don't exist in
self.dest, call 'function' with a Numpy array corresponding to
the data. The data is converted to a Numpy array in chunks of
'rows' rows at a time.
'function' should be defined as:
def function(data, interval, args, insert_func, final)
'data': array of data to process -- may be empty
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
If unprocessed data remains after 'final' is True, the interval
being inserted will be ended at the timestamp of the first
unprocessed data point.
"""
if args is None:
args = []
extractor = NumpyClient(self.src.url).stream_extract_numpy
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
for interval in self.intervals():
print "Processing", self.interval_string(interval)
with inserter(self.dest.path,
interval.start, interval.end) as insert_ctx:
insert_function = insert_ctx.insert
old_array = np.array([])
for new_array in extractor(self.src.path,
interval.start, interval.end,
layout = self.src.layout,
maxrows = rows):
# If we still had old data left, combine it
if old_array.shape[0] != 0:
array = np.vstack((old_array, new_array))
else:
array = new_array
# Pass it to the process function
processed = function(array, interval, args,
insert_function, False)
# Send any pending data
insert_ctx.send()
# Save the unprocessed parts
if processed >= 0:
old_array = array[processed:]
else:
raise Exception(
sprintf("%s return value %s must be >= 0",
str(function), str(processed)))
# Warn if there's too much data remaining
if old_array.shape[0] > 3 * rows:
printf("warning: %d unprocessed rows in buffer\n",
old_array.shape[0])
# Last call for this contiguous interval
if old_array.shape[0] != 0:
processed = function(old_array, interval, args,
insert_function, True)
if processed != old_array.shape[0]:
# Truncate the interval we're inserting at the first
# unprocessed data point. This ensures that
# we'll not miss any data when we run again later.
insert_ctx.update_end(old_array[processed][0])
def main(argv = None):
# This is just a dummy function; actual filters can use the other
# functions to prepare stuff, and then do something with the data.
f = Filter()
parser = f.setup_parser()
args = f.parse_args(argv)
for i in f.intervals():
print "Generic filter: need to handle", f.interval_string(i)
if __name__ == "__main__":
main()

268
src/insert.py Executable file
View File

@@ -0,0 +1,268 @@
#!/usr/bin/python
import nilmdb.client
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds, seconds_to_timestamp,
rate_to_period, now as time_now)
import nilmtools
import time
import sys
import re
import argparse
import subprocess
import textwrap
class ParseError(Exception):
def __init__(self, filename, error):
msg = filename + ": " + error
super(ParseError, self).__init__(msg)
def parse_args(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = textwrap.dedent("""\
Insert large amount of data from an external source like ethstream.
This code tracks two timestamps:
(1) The 'data' timestamp is the precise timestamp corresponding to
a particular row of data, and is the timestamp that gets
inserted into the database. It increases by 'data_delta' for
every row of input.
'data_delta' can come from one of two sources. If '--delta'
is specified, it is pulled from the first column of data. If
'--rate' is specified, 'data_delta' is set to a fixed value of
(1 / rate).
(2) The 'clock' timestamp is the less precise timestamp that gives
the absolute time. It can come from two sources. If '--live'
is specified, it is pulled directly from the system clock. If
'--file' is specified, it is extracted from the input filename
every time a new file is opened for read, and from comments
that appear in the file.
Small discrepencies between 'data' and 'clock' are ignored. If
the 'data' timestamp ever differs from the 'clock' timestamp by
more than 'max_gap' seconds:
- If 'data' is running behind, there is a gap in the data, so it
is stepped forward to match 'clock'.
- If 'data' is running ahead, there is overlap in the data, and an
error is raised.
"""))
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
group = parser.add_argument_group("Misc options")
group.add_argument("-D", "--dry-run", action="store_true",
help="Parse files, but don't insert any data")
group.add_argument("-m", "--max-gap", action="store", default=10.0,
metavar="SEC", type=float,
help="Max discrepency between clock and data "
"timestamps (default: %(default)s)")
group = parser.add_argument_group("Data timestamp delta")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-r", "--rate", action="store", default=8000.0,
type=float,
help="Data_delta is constant 1/RATE "
"(default: %(default)s Hz)")
exc.add_argument("-d", "--delta", action="store_true",
help="Data_delta is the first number in each line")
group = parser.add_argument_group("Clock timestamp source")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-l", "--live", action="store_true",
help="Use live system time for clock timestamp")
exc.add_argument("-f", "--file", action="store_true", default=True,
help="Use filename or comments for clock timestamp")
group.add_argument("-o", "--offset-filename", metavar="SEC",
action="store", default=-3600.0, type=float,
help="Offset to add to filename timestamps "
"(default: %(default)s)")
group.add_argument("-O", "--offset-comment", metavar="SEC",
action="store", default=0.0, type=float,
help="Offset to add to comment timestamps "
"(default: %(default)s)")
group = parser.add_argument_group("Database path")
group.add_argument("path", action="store",
help="Path of stream, e.g. /foo/bar")
group = parser.add_argument_group("Input files")
group.add_argument("infile", type=argparse.FileType('r'), nargs='*',
default=[sys.stdin],
help="Input files (default: stdin)")
args = parser.parse_args(argv)
printf(" Stream path: %s\n", args.path)
printf(" Data timestamp: ")
if args.delta:
printf("delta on each input line\n")
else:
printf("fixed rate %s Hz\n", repr(args.rate))
printf(" Clock timestamp: ")
if args.live:
printf("live system clock\n")
else:
printf("from filenames and comments\n")
printf(" Filename offset: %s seconds\n", repr(args.offset_filename))
printf(" Comment offset: %s seconds\n", repr(args.offset_comment))
printf(" Max gap: %s seconds\n", repr(args.max_gap))
if args.dry_run:
printf("Dry run (no data will be inserted)\n")
return args
def main(argv = None):
args = parse_args(argv)
client = nilmdb.client.Client(args.url)
# data_ts is the timestamp that we'll use for the current line
data_ts_base = 0
data_ts_inc = 0
data_ts_rate = args.rate
data_ts_delta = 0
def get_data_ts():
if args.delta:
return data_ts_base + data_ts_delta
else:
return data_ts_base + rate_to_period(data_ts_rate,
data_ts_inc)
# clock_ts is the imprecise "real" timestamp (from the filename,
# comments, or system clock)
clock_ts = None
def print_clock_updated():
printf("Clock timestamp updated to %s\n", timestamp_to_human(clock_ts))
if data_ts_base != 0:
diff = get_data_ts() - clock_ts
if diff >= 0:
printf(" (data timestamp ahead by %.6f sec)\n",
timestamp_to_seconds(diff))
else:
printf(" (data timestamp behind by %.6f sec)\n",
timestamp_to_seconds(-diff))
offset_filename = seconds_to_timestamp(args.offset_filename)
offset_comment = seconds_to_timestamp(args.offset_comment)
max_gap = seconds_to_timestamp(args.max_gap)
with client.stream_insert_context(args.path) as stream:
for f in args.infile:
filename = f.name
printf("Processing %s\n", filename)
# If the filename ends in .gz, re-open it with gzip to
# decompress.
if filename.endswith(".gz"):
p = subprocess.Popen(["gzip", "-dc"],
stdin = f, stdout = subprocess.PIPE)
f = p.stdout
# Try to get a real timestamp from the filename
try:
# Subtract 1 hour because files are created at the end
# of the hour. Hopefully, we'll be able to use
# internal comments and this value won't matter anyway.
clock_ts = parse_time(filename) + offset_filename
print_clock_updated()
except ValueError:
pass
truncated_lines = 0
# Read each line
for line in f:
# Once in a while a line might be truncated, if we're
# at the end of a file. Ignore it, but if we ignore
# too many, bail out.
if line[-1] != '\n':
truncated_lines += 1
if truncated_lines > 3:
raise ParseError(filename, "too many short lines")
printf("Ignoring short line in %s\n", filename)
continue
# If no content other than the newline, skip it
if len(line) <= 1:
continue
# If line starts with a comment, look for a timestamp
if line[0] == '#':
try:
clock_ts = parse_time(line[1:]) + offset_comment
print_clock_updated()
except ValueError:
pass
continue
# If --delta mode, increment data_ts_delta by the
# delta from the file.
if args.delta:
try:
(delta, line) = line.split(None, 1)
data_ts_delta += float(delta)
except ValueError:
raise ParseError(filename, "can't parse delta")
# Calculate data_ts for this row
data_ts = get_data_ts()
# If inserting live, use clock timestamp
if args.live:
clock_ts = time_now()
# If we have a real timestamp, compare it to the data
# timestamp, and make sure things match up.
if clock_ts is not None:
if (data_ts - max_gap) > clock_ts:
# Accumulated line timestamps are in the future.
# If we were to set data_ts=clock_ts, we'd create
# an overlap, so we have to just bail out here.
err = sprintf("Data is coming in too fast: data time "
"is %s but clock time is only %s",
timestamp_to_human(data_ts),
timestamp_to_human(clock_ts))
raise ParseError(filename, err)
if (data_ts + max_gap) < clock_ts:
# Accumulated line timetamps are in the past. We
# can just skip some time and leave a gap in the
# data.
if data_ts_base != 0:
printf("Skipping data timestamp forward from "
"%s to %s to match clock time\n",
timestamp_to_human(data_ts),
timestamp_to_human(clock_ts))
stream.finalize()
data_ts_base = data_ts = clock_ts
data_ts_inc = data_ts_delta = 0
# Don't use this clock time anymore until we update it
clock_ts = None
if data_ts_base == 0:
raise ParseError(filename, "No idea what timestamp to use")
# This line is legit, so increment timestamp (for --rate)
data_ts_inc += 1
# Insert it
if not args.dry_run:
stream.insert("%d %s" % (data_ts, line))
print "Done"
if __name__ == "__main__":
main()

179
src/prep.py Executable file
View File

@@ -0,0 +1,179 @@
#!/usr/bin/python
# Spectral envelope preprocessor.
# Requires two streams as input: the original raw data, and sinefit data.
from nilmdb.utils.printf import *
from nilmdb.utils.time import timestamp_to_human
import nilmtools.filter
import nilmdb.client
from numpy import *
import scipy.fftpack
import scipy.signal
#from matplotlib import pyplot as p
import bisect
def main(argv = None):
# Set up argument parser
f = nilmtools.filter.Filter()
parser = f.setup_parser("Spectral Envelope Preprocessor", skip_paths = True)
group = parser.add_argument_group("Prep options")
group.add_argument("-c", "--column", action="store", type=int,
help="Column number (first data column is 1)")
group.add_argument("-n", "--nharm", action="store", type=int, default=4,
help="number of odd harmonics to compute (default 4)")
group.add_argument("-N", "--nshift", action="store", type=int, default=1,
help="number of shifted FFTs per period (default 1)")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-r", "--rotate", action="store", type=float,
help="rotate FFT output by this many degrees (default 0)")
exc.add_argument("-R", "--rotate-rad", action="store", type=float,
help="rotate FFT output by this many radians (default 0)")
group.add_argument("srcpath", action="store",
help="Path of raw input, e.g. /foo/raw")
group.add_argument("sinepath", action="store",
help="Path of sinefit input, e.g. /foo/sinefit")
group.add_argument("destpath", action="store",
help="Path of prep output, e.g. /foo/prep")
# Parse arguments
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
rec = "float32_%d" % (e.parsed_args.nharm * 2)
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec)
raise SystemExit(1)
if f.dest.layout_count != args.nharm * 2:
print "error: need", args.nharm*2, "columns in destination stream"
raise SystemExit(1)
# Check arguments
if args.column is None or args.column < 1:
parser.error("need a column number >= 1")
if args.nharm < 1 or args.nharm > 32:
parser.error("number of odd harmonics must be 1-32")
if args.nshift < 1:
parser.error("number of shifted FFTs must be >= 1")
if args.rotate is not None:
rotation = args.rotate * 2.0 * pi / 360.0
else:
rotation = args.rotate_rad or 0.0
# Check the sine fit stream
client_sinefit = nilmdb.client.Client(args.url)
sinefit = nilmtools.filter.get_stream_info(client_sinefit, args.sinepath)
if not sinefit:
raise Exception("sinefit data not found")
if sinefit.layout != "float32_3":
raise Exception("sinefit data type is " + sinefit.layout
+ "; expected float32_3")
# Check and set metadata in prep stream
f.check_dest_metadata({ "prep_raw_source": f.src.path,
"prep_sinefit_source": sinefit.path,
"prep_column": args.column,
"prep_rotation": rotation })
# Run the processing function on all data
f.process_numpy(process, args = (client_sinefit, sinefit.path, args.column,
args.nharm, rotation, args.nshift))
def process(data, interval, args, insert_function, final):
(client, sinefit_path, column, nharm, rotation, nshift) = args
rows = data.shape[0]
data_timestamps = data[:,0]
if rows < 2:
return 0
last_inserted = [nilmdb.utils.time.min_timestamp]
def insert_if_nonoverlapping(data):
"""Call insert_function to insert data, but only if this
data doesn't overlap with other data that we inserted."""
if data[0][0] <= last_inserted[0]:
return
last_inserted[0] = data[-1][0]
insert_function(data)
processed = 0
out = zeros((1, nharm * 2 + 1))
# Pull out sinefit data for the entire time range of this block
for sinefit_line in client.stream_extract(sinefit_path,
data[0, 0], data[rows-1, 0]):
def prep_period(t_min, t_max, rot):
"""
Compute prep coefficients from time t_min to t_max, which
are the timestamps of the start and end of one period.
Results are rotated by an additional extra_rot before
being inserted into the database. Returns the maximum
index processed, or None if the period couldn't be
processed.
"""
# Find the indices of data that correspond to (t_min, t_max)
idx_min = bisect.bisect_left(data_timestamps, t_min)
idx_max = bisect.bisect_left(data_timestamps, t_max)
if idx_min >= idx_max or idx_max >= len(data_timestamps):
return None
# Perform FFT over those indices
N = idx_max - idx_min
d = data[idx_min:idx_max, column]
F = scipy.fftpack.fft(d) * 2.0 / N
# If we wanted more harmonics than the FFT gave us, pad with zeros
if N < (nharm * 2):
F = r_[F, zeros(nharm * 2 - N)]
# Fill output data.
out[0, 0] = round(t_min)
for k in range(nharm):
Fk = F[2 * k + 1] * e**(rot * 1j * (k+1))
out[0, 2 * k + 1] = -imag(Fk) # Pk
out[0, 2 * k + 2] = real(Fk) # Qk
insert_if_nonoverlapping(out)
return idx_max
# Extract sinefit data to get zero crossing timestamps.
# t_min = beginning of period
# t_max = end of period
(t_min, f0, A, C) = [ float(x) for x in sinefit_line.split() ]
t_max = t_min + 1e6 / f0
# Compute prep over shifted windows of the period
# (nshift is typically 1)
for n in range(nshift):
# Compute timestamps and rotations for shifted window
time_shift = n * (t_max - t_min) / nshift
shifted_min = t_min + time_shift
shifted_max = t_max + time_shift
angle_shift = n * 2 * pi / nshift
shifted_rot = rotation - angle_shift
# Run prep computation
idx_max = prep_period(shifted_min, shifted_max, shifted_rot)
if not idx_max:
break
processed = idx_max
# If we processed no data but there's lots in here, pretend we
# processed half of it.
if processed == 0 and rows > 10000:
processed = rows / 2
printf("%s: warning: no periods found; skipping %d rows\n",
timestamp_to_human(data[0][0]), processed)
else:
printf("%s: processed %d of %d rows\n",
timestamp_to_human(data[0][0]), processed, rows)
return processed
if __name__ == "__main__":
main()

213
src/sinefit.py Executable file
View File

@@ -0,0 +1,213 @@
#!/usr/bin/python
# Sine wave fitting. This runs about 5x faster than realtime on raw data.
import nilmtools.filter
import nilmdb.client
from numpy import *
from scipy import *
#import pylab as p
import operator
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Sine wave fitting")
group = parser.add_argument_group("Sine fit options")
group.add_argument('-c', '--column', action='store', type=int,
help='Column number (first data column is 1)')
group.add_argument('-f', '--frequency', action='store', type=float,
default=60.0,
help='Approximate frequency (default: %(default)s)')
group.add_argument('-m', '--min-freq', action='store', type=float,
help='Minimum valid frequency '
'(default: approximate frequency / 2))')
group.add_argument('-M', '--max-freq', action='store', type=float,
help='Maximum valid frequency '
'(default: approximate frequency * 2))')
group.add_argument('-a', '--min-amp', action='store', type=float,
default=20.0,
help='Minimum signal amplitude (default: %(default)s)')
# Parse arguments
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
rec = "float32_3"
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec)
raise SystemExit(1)
if args.column is None or args.column < 1:
parser.error("need a column number >= 1")
if args.frequency < 0.1:
parser.error("frequency must be >= 0.1")
if args.min_freq is None:
args.min_freq = args.frequency / 2
if args.max_freq is None:
args.max_freq = args.frequency * 2
if (args.min_freq > args.max_freq or
args.min_freq > args.frequency or
args.max_freq < args.frequency):
parser.error("invalid min or max frequency")
if args.min_amp < 0:
parser.error("min amplitude must be >= 0")
f.check_dest_metadata({ "sinefit_source": f.src.path,
"sinefit_column": args.column })
f.process_numpy(process, args = (args.column, args.frequency, args.min_amp,
args.min_freq, args.max_freq))
def process(data, interval, args, insert_function, final):
(column, f_expected, a_min, f_min, f_max) = args
rows = data.shape[0]
# Estimate sampling frequency from timestamps
fs = 1e6 * (rows-1) / (data[-1][0] - data[0][0])
# Pull out about 3.5 periods of data at once;
# we'll expect to match 3 zero crossings in each window
N = max(int(3.5 * fs / f_expected), 10)
# If we don't have enough data, don't bother processing it
if rows < N:
return 0
# Process overlapping windows
start = 0
num_zc = 0
while start < (rows - N):
this = data[start:start+N, column]
t_min = data[start, 0]/1e6
t_max = data[start+N-1, 0]/1e6
# Do 4-parameter sine wave fit
(A, f0, phi, C) = sfit4(this, fs)
# Check bounds. If frequency is too crazy, ignore this window
if f0 < f_min or f0 > f_max:
print "frequency", f0, "outside valid range", f_min, "-", f_max
start += N
continue
# If amplitude is too low, results are probably just noise
if A < a_min:
print "amplitude", A, "below minimum threshold", a_min
start += N
continue
#p.plot(arange(N), this)
#p.plot(arange(N), A * cos(f0/fs * 2 * pi * arange(N) + phi) + C, 'g')
# Period starts when the argument of cosine is 3*pi/2 degrees,
# so we're looking for sample number:
# n = (3 * pi / 2 - phi) / (f0/fs * 2 * pi)
zc_n = (3 * pi / 2 - phi) / (f0 / fs * 2 * pi)
period_n = fs/f0
# Add periods to make N positive
while zc_n < 0:
zc_n += period_n
last_zc = None
# Mark the zero crossings until we're a half period away
# from the end of the window
while zc_n < (N - period_n/2):
#p.plot(zc_n, C, 'ro')
t = t_min + zc_n / fs
insert_function([[t * 1e6, f0, A, C]])
num_zc += 1
last_zc = zc_n
zc_n += period_n
# Advance the window one quarter period past the last marked
# zero crossing, or advance the window by half its size if we
# didn't mark any.
if last_zc is not None:
advance = min(last_zc + period_n/4, N)
else:
advance = N/2
#p.plot(advance, C, 'go')
#p.show()
start = int(round(start + advance))
# Return the number of rows we've processed
print "Marked", num_zc, "zero-crossings in", start, "rows"
return start
def sfit4(data, fs):
"""(A, f0, phi, C) = sfit4(data, fs)
Compute 4-parameter (unknown-frequency) least-squares fit to
sine-wave data, according to IEEE Std 1241-2010 Annex B
Input:
data vector of input samples
fs sampling rate (Hz)
Output:
Parameters [A, f0, phi, C] to fit the equation
x[n] = A * cos(f0/fs * 2 * pi * n + phi) + C
where n is sample number. Or, as a function of time:
x(t) = A * cos(f0 * 2 * pi * t + phi) + C
by Jim Paris
(Verified to match sfit4.m)
"""
N = len(data)
t = linspace(0, (N-1) / fs, N)
## Estimate frequency using FFT (step b)
Fc = fft(data)
F = abs(Fc)
F[0] = 0 # eliminate DC
# Find pair of spectral lines with largest amplitude:
# resulting values are in F(i) and F(i+1)
i = argmax(F[0:int(N/2)] + F[1:int(N/2+1)])
# Interpolate FFT to get a better result (from Markus [B37])
U1 = real(Fc[i])
U2 = real(Fc[i+1])
V1 = imag(Fc[i])
V2 = imag(Fc[i+1])
n = 2 * pi / N
ni1 = n * i
ni2 = n * (i+1)
K = ((V2-V1)*sin(ni1) + (U2-U1)*cos(ni1)) / (U2-U1)
Z1 = V1 * (K - cos(ni1)) / sin(ni1) + U1
Z2 = V2 * (K - cos(ni2)) / sin(ni2) + U2
i = arccos((Z2*cos(ni2) - Z1*cos(ni1)) / (Z2-Z1)) / n
# Convert to Hz
f0 = i * fs / N
# Fit it. We'll catch exceptions here and just returns zeros
# if something fails with the least squares fit, etc.
try:
# first guess for A0, B0 using 3-parameter fit (step c)
w = 2*pi*f0
D = c_[cos(w*t), sin(w*t), ones(N)]
s = linalg.lstsq(D, data)[0]
# Now iterate 6 times (step i)
for idx in range(6):
D = c_[cos(w*t), sin(w*t), ones(N),
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
s = linalg.lstsq(D, data)[0] # eqn B.18
w = w + s[3] # update frequency estimate
## Extract results
A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
f0 = w / (2*pi)
phi = -arctan2(s[1], s[0]) # eqn B.22
C = s[2]
return (A, f0, phi, C)
except Exception as e:
# something broke down, just return zeros
return (0, 0, 0, 0)
if __name__ == "__main__":
main()