Compare commits

..

117 Commits

Author SHA1 Message Date
dc26e32b6e Make interhost, force_metadata private to Filter 2013-08-02 23:14:19 -04:00
981f23ff14 Better documentation for callback function 2013-08-02 23:14:19 -04:00
492445a469 Split off useful math functions to math.py 2013-08-02 17:27:39 -04:00
33c3586bea trainola: suppress peaks if larger ones are nearby
Might fix the problem Mark noticed where turn-off transients
are erroneously matching the drop that follows startup transients.
2013-07-31 19:12:16 -04:00
c1e0f8ffbc Fix bug in copy_one 2013-07-31 14:47:16 -04:00
d2853bdb0e Add test case for bad trainola detections 2013-07-30 20:35:54 -04:00
a4d4bc22fc Add --skip option to nilm-insert 2013-07-30 18:25:47 -04:00
6090dd6112 prep: only process intervals present in both raw & sinefit 2013-07-30 14:55:06 -04:00
Sharon NILM
9c0d9ad324 Sample scripts from Sharon 2013-07-29 18:37:55 -04:00
Sharon NILM
8b9c5d4898 Fix daemon dependency 2013-07-29 17:40:51 -04:00
cf2c28b0fb Add --daemon flag 2013-07-29 17:16:18 -04:00
87a26c907b Watch for process termination too 2013-07-29 15:08:49 -04:00
def465b57c Improve pipewatch; add nilm-pipewatch script 2013-07-29 14:58:15 -04:00
0589b8d316 start of pipewatch util 2013-07-29 14:10:56 -04:00
9c5f07106d Don't need python-pip 2013-07-20 16:15:29 -04:00
62e11a11c0 Fix issue with column ordering in the exemplars
If the max scale in the exemplar was a column we weren't using, it
would bail out when looking for that correlation later.  Change things
around so exemplars in RAM only keep around the columns we care about.
2013-07-18 22:51:27 -04:00
2bdcee2c36 More helpful error if exemplar stream doesn't exist 2013-07-15 15:19:52 -04:00
6dce8c5296 More output 2013-07-11 18:56:53 -04:00
25c35a56f6 Trainola inserts into the destination stream now 2013-07-10 12:59:39 -04:00
d610deaef0 More trainola work 2013-07-10 11:38:32 -04:00
d7d5ccc9a7 More filter cleanup 2013-07-09 19:27:20 -04:00
f28753ff5c Move process_numpy_interval outside the class 2013-07-09 18:40:49 -04:00
c9c2e0d5a8 Improve split between process_numpy and process_numpy_interval 2013-07-09 18:09:05 -04:00
5a2a32bec5 WIP on trainola improvements 2013-07-09 17:56:26 -04:00
706c3933f9 Add trainola from nilmrun 2013-07-09 17:55:57 -04:00
cfd1719152 Use nilmdb.utils.interval.optimize; bump nilmdb min version 2013-07-09 17:53:04 -04:00
c62fb45980 Makefile cleanup; add nilm-trainola binary 2013-07-09 16:53:47 -04:00
57d856f2fa Split filter.py internals up a little more
This makes it easier to use the filter stuff from other code, but it's
also turning it into more of a spaghetti nightmare.  Might not be
worth continuing down this path.
2013-07-09 16:52:00 -04:00
5d83d93019 Rename src/ directory to nilmtools/ 2013-07-08 11:54:13 -04:00
5f847a0513 Split process_numpy innards process_numpy_interval 2013-07-03 12:07:22 -04:00
29cd7eb6c7 Improve test_prep target in Makefile 2013-07-03 12:06:50 -04:00
62c8af41ea Cleanup comments 2013-06-06 15:34:23 -04:00
4f6bc48619 sinefit: include timestamps on marking output too 2013-05-11 11:00:31 -04:00
cf9eb0ed48 Improve sinefit resiliancy 2013-05-10 14:19:55 -04:00
32066fc260 Remove hard matplotlib dependency 2013-05-09 13:17:36 -04:00
739da3f973 Add median filter 2013-05-08 23:36:50 -04:00
83ad18ebf6 Fix non-string arguments to metadata_check 2013-05-08 12:49:38 -04:00
c76d527f95 Fix unicode handling in filter metadata match 2013-05-07 12:40:53 -04:00
b8a73278e7 Always store metadata rotation as a string 2013-04-29 14:25:11 -04:00
ce0691d6c4 sineefit: Change sfit4 to fit to \sin instead of \cos
And adjust the period locator accordingly.
Fitting \sin is the same mathematically, it's just conceptually more
straightforward since we're locating zero crossings anyway.
2013-04-27 18:12:20 -04:00
4da658e960 sinefit: move initial estimate into the main iteration loop
Just a little less code.  Same results.
2013-04-27 17:50:23 -04:00
8ab31eafc2 Allow shorthand method for creating an option-less parser.
This is mostly just intended to make a simple filter example shorter.
2013-04-21 16:53:28 -04:00
979ab13bff Force fs to be a float in sfit4 2013-04-17 17:58:15 -04:00
f4fda837ae Bump required nilmdb version to 1.6.0 2013-04-11 11:55:11 -04:00
5547d266d0 filter: Don't include trailing unprocessed data in the inserted intervals 2013-04-11 11:53:17 -04:00
372e977e4a Reverse cleanup order to handle interruptions better 2013-04-10 18:38:41 -04:00
640a680704 Increase default min amplitude in sinefit 2013-04-10 17:09:52 -04:00
2e74e6cd63 Skip over data if we aren't able to process any. Change output format 2013-04-10 17:01:07 -04:00
de2a794e00 Support wildcards in nilm-decimate-auto 2013-04-10 16:05:16 -04:00
065a40f265 sinefit: add minimum amplitude check 2013-04-10 15:33:51 -04:00
65fa43aff1 sinefit: catch all errors in sfit4 2013-04-10 14:36:50 -04:00
57c23c3792 sinefit: allow user to override min/max frequency detection 2013-04-10 14:36:40 -04:00
d4c8e4acb4 Include rotation in metadata 2013-04-10 14:36:05 -04:00
fd1b33401f Require a --yes argument before actually cleaning data 2013-04-09 20:13:38 -04:00
4c748ec00c Fix minor bugs 2013-04-09 20:08:25 -04:00
b72d6b6908 Warn if column count is wrong for this nharm value 2013-04-09 19:59:59 -04:00
80d642e52e Change nilm-cleanup config file format, tweak output 2013-04-09 19:43:41 -04:00
001b89b1d2 Support multiple shifted FFTs per period in nilm-prep.
New option --nshift controls how many shifted FFT windows to perform
per period.  "nilm-prep -N 2" is similar to old prep behavior.  Note
that this is redundant information and takes up extra storage space,
though.
2013-04-09 18:53:27 -04:00
f978823505 Fix prep scaling and fix comments 2013-04-09 17:44:13 -04:00
ffd6675979 Remove outdated code 2013-04-08 19:46:16 -04:00
5b67b68fd2 Don't import matplotlib if we don't need it 2013-04-08 18:59:23 -04:00
97503b73b9 Fix dependencies 2013-04-08 18:50:27 -04:00
4e64c804bf Merge branch 'binary' 2013-04-08 18:45:21 -04:00
189fb9df3a Use binary interface for copy_one too 2013-04-08 18:45:16 -04:00
3323c997a7 Use the new stream_insert_numpy_context function 2013-04-08 18:39:14 -04:00
e09153e34b Use the new NumpyClient for extracting data in filter 2013-04-07 18:14:35 -04:00
5c56e9d075 Remove ounused process_python function 2013-04-06 16:39:39 -04:00
60f09427cf Update decimate to use process_numpy 2013-04-06 15:56:36 -04:00
d6d31190eb Fix fromstring usage 2013-04-06 13:40:09 -04:00
2ec574c59d Use np.fromstring instead of np.loadtxt 2013-04-06 13:32:16 -04:00
1988955671 Accumulate delta separately from data timestamp 2013-04-05 17:41:48 -04:00
36e5af4be1 Fix data_ts when clock is updated 2013-04-05 16:40:04 -04:00
ca175bd9dd Improve nilm-insert to support deltas, etc, for accelerometer data 2013-04-05 16:13:56 -04:00
aa9656bc10 Fix off-by-one error in prep rotation 2013-04-04 19:23:12 -04:00
10ab2cc2de Build nilm-prep tool 2013-04-04 19:07:18 -04:00
eb6d7a8809 Fix recommended layout 2013-04-04 16:32:38 -04:00
c8be6755ae Update README dependencies 2013-04-04 15:36:30 -04:00
9e321d9e41 Add --force-metadata option; fix bugs 2013-04-04 15:33:57 -04:00
f2bebea5d0 Add copy_wildcard script 2013-04-04 15:33:57 -04:00
d919a73387 Use parser error rather than systemexit exception 2013-04-04 15:33:57 -04:00
17fa79a5dc Makefile updates 2013-04-04 15:33:57 -04:00
ca970fa1fd Rename copy_ to copy_one 2013-04-04 15:33:57 -04:00
805d8fb24f Add prep 2013-04-03 21:04:03 -04:00
05da75e34a Allow processing 0 lines; warn if data is building up 2013-04-03 21:03:44 -04:00
56e778df71 Add skip_paths option to filter.setup_parser 2013-04-03 21:03:29 -04:00
87178e9599 self.args is used for other stuff in an exception 2013-04-03 21:03:17 -04:00
f8b1a001c3 makefile updates 2013-04-03 21:02:42 -04:00
7e88da3c26 Bring get_stream_info into nilmtools.filter module; use it 2013-04-02 18:04:28 -04:00
b637f17887 Add nilm-decimate-auto script 2013-04-02 16:13:44 -04:00
9a7a1df537 Simplify StreamInfo constructor 2013-04-02 15:47:43 -04:00
101b701882 Fix sys.argv stuff; remove outdated shell scripts 2013-04-01 18:40:48 -04:00
457c518809 Add some docs about building new tools 2013-04-01 18:14:59 -04:00
3eff3d81fe Change default URL to http://localhost/nilmdb/ 2013-04-01 18:03:43 -04:00
a56dc22030 Allow arguments to be passed in as a list to main() functions 2013-04-01 18:03:22 -04:00
9b770cd28f Note speed 2013-04-01 16:39:23 -04:00
348c435d1e Catch arctan2 errors 2013-04-01 16:38:50 -04:00
7f1c1a6c32 Add nilm-sinefit script link 2013-03-30 22:53:29 -04:00
bdfc29887b Proper window advancement & insertion for sine fit; comment out plots 2013-03-30 22:51:54 -04:00
4e5907f381 Send pending data after each block 2013-03-30 22:51:37 -04:00
9078a014ae Require 2-dimensional array in insert_function 2013-03-30 22:51:20 -04:00
533892e624 Update Makefile and setup.py dependencies 2013-03-30 22:50:55 -04:00
e0f973b449 Update makefile and setup.py requirements 2013-03-30 17:30:10 -04:00
698cb6ef26 More work on sinefit 2013-03-28 14:08:50 -04:00
1db38cc5da Port sfit4 function to Python, start writing sinefit filter 2013-03-26 19:38:53 -04:00
a984e54f23 Add process_numpy function to filter.py
This converts data into big Numpy arrays and lets the user-provided
function process as much as they'd like at a time.
2013-03-26 19:38:05 -04:00
974c9a3050 Fix error message when decimate target is missing 2013-03-22 14:59:08 -04:00
320c32cfdc Sample script for copying wildcard paths between hosts 2013-03-19 17:55:18 -04:00
0f1e442cd4 Support filtering (or copying) between two different hosts 2013-03-19 17:54:59 -04:00
3e78da12dc Rename copy.py to avoid stupid Python conflicts
Any other module (in this case, nilmdb -> datetime_tz -> pytz -> gettext)
that does an "import copy" would pull in the copy.py, if it's in the
current working directory.  Python is dumb...
2013-03-19 16:19:14 -04:00
ef9277cbff Rename nilmtools directory to just src 2013-03-19 15:42:29 -04:00
de68956f76 Update copy tool 2013-03-16 23:13:45 -04:00
e73dd313d5 Reworked things to match nilmdb updates; a bit faster 2013-03-16 14:46:49 -04:00
d23fa9ee78 Remove unnecessary option group 2013-03-12 19:02:29 -04:00
2b9ecc6697 Add nilm-copy tool 2013-03-12 18:52:39 -04:00
54f8c34f8e Decimate seems to work pretty well right now 2013-03-12 18:09:39 -04:00
9d38d6c21b work in progress 2013-03-11 20:38:33 -04:00
4243301434 Lots of updates to generic filter, specific decimate module 2013-03-11 19:48:09 -04:00
25 changed files with 2358 additions and 101 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,6 @@
oldprep
newprep
*.dat
build/
*.pyc
dist/

View File

@@ -1,5 +1,70 @@
#URL="http://bucket.mit.edu:8080/nilmdb"
URL="http://localhost/nilmdb"
all:
ifeq ($(INSIDE_EMACS), t)
@make test
else
@echo "Try 'make install'"
endif
test: test_trainola3
test_pipewatch:
nilmtools/pipewatch.py -t 3 "seq 10 20" "seq 20 30"
test_trainola:
-nilmtool -u http://bucket/nilmdb remove -s min -e max \
/sharon/prep-a-matches
nilmtools/trainola.py "$$(cat extras/trainola-test-param.js)"
test_trainola2:
-nilmtool -u http://bucket/nilmdb remove -s min -e max \
/sharon/prep-a-matches
nilmtools/trainola.py "$$(cat extras/trainola-test-param-2.js)"
test_trainola3:
-nilmtool -u "http://bucket/nilmdb" destroy -R /test/jim
nilmtool -u "http://bucket/nilmdb" create /test/jim uint8_3
nilmtools/trainola.py "$$(cat extras/trainola-test-param-3.js)"
nilmtool -u "http://bucket/nilmdb" extract /test/jim -s min -e max
test_cleanup:
nilmtools/cleanup.py -e extras/cleanup.cfg
nilmtools/cleanup.py extras/cleanup.cfg
test_insert:
nilmtools/insert.py --skip --file --dry-run /foo/bar ~/data/20130311T2100.prep1.gz ~/data/20130311T2100.prep1.gz ~/data/20130311T2200.prep1.gz
test_copy:
nilmtools/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees*
/tmp/raw.dat:
octave --eval 'fs = 8000;' \
--eval 't = (0:fs*10)*2*pi*60/fs;' \
--eval 'raw = transpose([sin(t); 0.3*sin(3*t)+sin(t)]);' \
--eval 'save("-ascii","/tmp/raw.dat","raw");'
test_prep: /tmp/raw.dat
-nilmtool destroy -R /test/raw
-nilmtool destroy -R /test/sinefit
-nilmtool destroy -R /test/prep
nilmtool create /test/raw float32_2
nilmtool create /test/sinefit float32_3
nilmtool create /test/prep float32_8
nilmtool insert -s '@0' -t -r 8000 /test/raw /tmp/raw.dat
nilmtools/sinefit.py -a 0.5 -c 1 -s '@0' -e '@5000000' /test/raw /test/sinefit
nilmtools/prep.py -c 2 /test/raw /test/sinefit /test/prep
nilmtools/prep.py -c 2 /test/raw /test/sinefit /test/prep
nilmtool extract -s min -e max /test/prep | head -20
test_decimate:
-@nilmtool destroy /lees-compressor/no-leak/raw/4 || true
-@nilmtool destroy /lees-compressor/no-leak/raw/16 || true
-@nilmtool create /lees-compressor/no-leak/raw/4 float32_18 || true
-@nilmtool create /lees-compressor/no-leak/raw/16 float32_18 || true
time python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
version:
python setup.py version
@@ -21,4 +86,4 @@ clean::
gitclean::
git clean -dXf
.PHONY: all version dist sdist install clean gitclean
.PHONY: all version dist sdist install clean gitclean test

View File

@@ -6,9 +6,22 @@ Prerequisites:
# Runtime and build environments
sudo apt-get install python2.7 python2.7-dev python-setuptools
sudo apt-get install python-numpy python-scipy python-daemon
nilmdb (1.3.1+)
nilmdb (1.8.5+)
Install:
python setup.py install
Building new tools:
The tools in this package are meant to be installed with
"python setup.py install". If you want to make a new one,
an easier way to develop would be to first install this package,
and then copy a specific script like "src/sinefit.py" to a new
location, and modify it as desired.
To add a tool to the package, place it in "src/" and add the
appropriate configuration to "setup.py".

22
extras/cleanup.cfg Normal file
View File

@@ -0,0 +1,22 @@
[/lees-compressor/no-leak/prep]
keep = 2d
rate = 60
[*/raw]
keep = 2d
[*/something]
rate = 10
[*/sinefit]
keep = 1w
decimated = False
[/test/raw]
keep = 0.01d
[/test/sinefit]
keep = 0.01d
[/test/prep]
keep = 0.01d

View File

@@ -0,0 +1,10 @@
#!/bin/bash
# Start the ethstream capture using nilm-pipewatch
# Bail out on errors
set -e
nilm-pipewatch --daemon --lock "/tmp/nilmdb-capture.lock" --timeout 30 \
"ethstream -a 192.168.1.209 -n 9 -r 8000 -N" \
"nilm-insert -m 10 -r 8000 --live /sharon/raw"

View File

@@ -0,0 +1,8 @@
[/sharon/prep-*]
keep = 1y
[/sharon/raw]
keep = 2w
[/sharon/sinefit]
keep = 1y

View File

@@ -0,0 +1,9 @@
# Install this by running "crontab crontab" (will replace existing crontab)
# m h dom mon dow cmd
# Run NilmDB processing every 5 minutes
*/5 * * * * chronic /home/nilm/data/process.sh
# Check the capture process every minute
*/1 * * * * chronic /home/nilm/data/capture.sh

View File

@@ -0,0 +1,28 @@
#!/bin/bash
# Run all necessary processing on NilmDB data.
# Bail out on errors
set -e
# Ensure only one copy of this code runs at a time:
LOCKFILE="/tmp/nilmdb-process.lock"
exec 99>"$LOCKFILE"
if ! flock -n -x 99 ; then
echo "NilmDB processing already running, giving up..."
exit 0
fi
trap 'rm -f "$LOCKFILE"' 0
# sinefit on phase A voltage
nilm-sinefit -c 5 /sharon/raw /sharon/sinefit
# prep on A, B, C with appropriate rotations
nilm-prep -c 1 -r 0 /sharon/raw /sharon/sinefit /sharon/prep-a
nilm-prep -c 2 -r 120 /sharon/raw /sharon/sinefit /sharon/prep-b
nilm-prep -c 3 -r 240 /sharon/raw /sharon/sinefit /sharon/prep-c
# decimate raw and prep data
nilm-decimate-auto /sharon/raw /sharon/prep*
# run cleanup
nilm-cleanup --yes /home/nilm/data/cleanup.cfg

View File

@@ -0,0 +1,29 @@
{ "columns" : [ { "index" : 0, "name" : "P1" },
{ "index" : 1, "name" : "Q1" },
{ "index" : 2, "name" : "P3" } ],
"stream" : "/sharon/prep-a",
"url" : "http://bucket.mit.edu/nilmdb",
"dest_stream" : "/sharon/prep-a-matches",
"start" : 1365153062643133.5,
"end" : 1365168814443575.5,
"exemplars" : [ { "columns" : [ { "index" : 0,
"name" : "P1"
} ],
"dest_column" : 0,
"end" : 1365073657682000,
"name" : "Turn ON",
"start" : 1365073654321000,
"stream" : "/sharon/prep-a",
"url" : "http://bucket.mit.edu/nilmdb"
},
{ "columns" : [ { "index" : 2, "name" : "P3" },
{ "index" : 0, "name" : "P1" } ],
"dest_column" : 1,
"end" : 1365176528818000,
"name" : "Type 2 turn ON",
"start" : 1365176520030000,
"stream" : "/sharon/prep-a",
"url" : "http://bucket.mit.edu/nilmdb"
}
]
}

View File

@@ -0,0 +1,40 @@
{
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"dest_stream": "/test/jim",
"start": 1364184839901599,
"end": 1364184942407610.2,
"columns": [ { "index": 0, "name": "P1" } ],
"exemplars": [
{
"name": "A - True DBL Freezer ON",
"dest_column": 0,
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"columns": [ { "index": 0, "name": "P1" } ],
"start": 1365277707649000,
"end": 1365277710705000
},
{
"name": "A - Boiler 1 Fan OFF",
"dest_column": 1,
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"columns": [ { "index": 0, "name": "P1" } ],
"start": 1364188370735000,
"end": 1364188373819000
},
{
"name": "A - True DBL Freezer OFF",
"dest_column": 2,
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"columns": [ { "index": 0, "name": "P1" } ],
"start": 1365278087982000,
"end": 1365278089340000
}
]
}

View File

@@ -0,0 +1,31 @@
{ "url": "http://bucket.mit.edu/nilmdb",
"dest_stream": "/sharon/prep-a-matches",
"stream": "/sharon/prep-a",
"start": 1366111383280463,
"end": 1366126163457797,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 },
{ "name": "P3", "index": 2 } ],
"exemplars": [
{ "name": "Boiler Pump ON",
"url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366260494269078,
"end": 1366260608185031,
"dest_column": 0,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
},
{ "name": "Boiler Pump OFF",
"url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366260864215764,
"end": 1366260870882998,
"dest_column": 1,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
}
]
}

257
nilmtools/cleanup.py Executable file
View File

@@ -0,0 +1,257 @@
#!/usr/bin/python
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds, seconds_to_timestamp)
from nilmdb.utils.diskusage import human_size
from nilmdb.utils.interval import Interval
import nilmdb.client
import nilmdb.client.numpyclient
import nilmtools
import argparse
import ConfigParser
import sys
import collections
import fnmatch
import re
def warn(msg, *args):
fprintf(sys.stderr, "warning: " + msg + "\n", *args)
class TimePeriod(object):
_units = { 'h': ('hour', 60*60),
'd': ('day', 60*60*24),
'w': ('week', 60*60*24*7),
'm': ('month', 60*60*24*30),
'y': ('year', 60*60*24*365) }
def __init__(self, val):
for u in self._units:
if val.endswith(u):
self.unit = self._units[u][0]
self.scale = self._units[u][1]
self.count = float(val[:-len(u)])
break
else:
raise ValueError("unknown units: " + units)
def seconds(self):
return self.count * self.scale
def describe_seconds(self, seconds):
count = seconds / self.scale
units = self.unit if count == 1 else (self.unit + "s")
if count == int(count):
return sprintf("%d %s", count, units)
else:
return sprintf("%.2f %s", count, units)
def __str__(self):
return self.describe_seconds(self.seconds())
class StreamCleanupConfig(object):
def __init__(self, info):
self.path = info[0]
self.layout = info[1]
if info[4] != 0 and info[5] != 0:
self.rate = info[4] / timestamp_to_seconds(info[5])
else:
self.rate = None
self.keep = None
self.clean_decimated = True
self.decimated_from = None
self.also_clean_paths = []
def main(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = """\
Clean up old data from streams using a configuration file to specify
which data to remove.
The format of the config file is as follows:
[/stream/path]
keep = 3w # keep up to 3 weeks of data
rate = 8000 # optional, used for the --estimate option
decimated = false # whether to delete decimated data too (default true)
[*/prep]
keep = 3.5m # or 2520h or 105d or 15w or 0.29y
The suffix for 'keep' is 'h' for hours, 'd' for days, 'w' for weeks,
'm' for months, or 'y' for years.
Streams paths may include wildcards. If a path is matched by more than
one config section, data from the last config section counts.
Decimated streams (paths containing '~decim-') are treated specially:
- They don't match wildcards
- When deleting data from a parent stream, data is also deleted
from its decimated streams, unless decimated=false
Rate is optional and is only used for the --estimate option.
""")
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-y", "--yes", action="store_true",
default = False,
help="Actually remove the data (default: no)")
parser.add_argument("-e", "--estimate", action="store_true",
default = False,
help="Estimate how much disk space will be used")
parser.add_argument("configfile", type=argparse.FileType('r'),
help="Configuration file")
args = parser.parse_args(argv)
# Parse config file
config = ConfigParser.RawConfigParser()
config.readfp(args.configfile)
# List all streams
client = nilmdb.client.Client(args.url)
streamlist = client.stream_list(extended = True)
# Create config objects
streams = collections.OrderedDict()
for s in streamlist:
streams[s[0]] = StreamCleanupConfig(s)
m = re.search(r"^(.*)~decim-[0-9]+$", s[0])
if m:
streams[s[0]].decimated_from = m.group(1)
# Build up configuration
for section in config.sections():
matched = False
for path in streams.iterkeys():
# Decimated streams only allow exact matches
if streams[path].decimated_from and path != section:
continue
if not fnmatch.fnmatch(path, section):
continue
matched = True
options = config.options(section)
# Keep period (days, weeks, months, years)
if 'keep' in options:
streams[path].keep = TimePeriod(config.get(section, 'keep'))
options.remove('keep')
# Rate
if 'rate' in options:
streams[path].rate = config.getfloat(section, 'rate')
options.remove('rate')
# Decimated
if 'decimated' in options:
val = config.getboolean(section, 'decimated')
streams[path].clean_decimated = val
options.remove('decimated')
for leftover in options:
warn("option '%s' for '%s' is unknown", leftover, section)
if not matched:
warn("config for '%s' did not match any existing streams", section)
# List all decimated streams in the parent stream's info
for path in streams.keys():
src = streams[path].decimated_from
if src and src in streams:
if streams[src].clean_decimated:
streams[src].also_clean_paths.append(path)
del streams[path]
# Warn about streams that aren't getting cleaned up
for path in streams.keys():
if streams[path].keep is None or streams[path].keep.seconds() < 0:
warn("no config for existing stream '%s'", path)
del streams[path]
if args.estimate:
# Estimate disk usage
total = 0
for path in streams.keys():
rate = streams[path].rate
if not rate or rate < 0:
warn("unable to estimate disk usage for stream '%s' because "
"the data rate is unknown", path)
continue
printf("%s:\n", path)
layout = streams[path].layout
dtype = nilmdb.client.numpyclient.layout_to_dtype(layout)
per_row = dtype.itemsize
per_sec = per_row * rate
printf("%17s: %s per row, %s rows per second\n",
"base rate",
human_size(per_row),
round(rate,1))
printf("%17s: %s per hour, %s per day\n",
"base size",
human_size(per_sec * 3600),
human_size(per_sec * 3600 * 24))
# If we'll be cleaning up decimated data, add an
# estimation for how much room decimated data takes up.
if streams[path].clean_decimated:
d_layout = "float32_" + str(3*(int(layout.split('_')[1])))
d_dtype = nilmdb.client.numpyclient.layout_to_dtype(d_layout)
# Assume the decimations will be a factor of 4
# sum_{k=0..inf} (rate / (n^k)) * d_dtype.itemsize
d_per_row = d_dtype.itemsize
factor = 4.0
d_per_sec = d_per_row * (rate / factor) * (1 / (1 - (1/factor)))
per_sec += d_per_sec
printf("%17s: %s per hour, %s per day\n",
"with decimation",
human_size(per_sec * 3600),
human_size(per_sec * 3600 * 24))
keep = per_sec * streams[path].keep.seconds()
printf("%17s: %s\n\n",
"keep " + str(streams[path].keep), human_size(keep))
total += keep
printf("Total estimated disk usage for these streams:\n")
printf(" %s\n", human_size(total))
raise SystemExit(0)
# Do the cleanup
for path in streams:
printf("%s: keep %s\n", path, streams[path].keep)
# Figure out the earliest timestamp we should keep.
intervals = [ Interval(start, end) for (start, end) in
reversed(list(client.stream_intervals(path))) ]
total = 0
keep = seconds_to_timestamp(streams[path].keep.seconds())
for i in intervals:
total += i.end - i.start
if total <= keep:
continue
remove_before = i.start + (total - keep)
break
else:
printf(" nothing to do (only %s of data present)\n",
streams[path].keep.describe_seconds(
timestamp_to_seconds(total)))
continue
printf(" removing data before %s\n", timestamp_to_human(remove_before))
# Clean in reverse order. Since we only use the primary stream and not
# the decimated streams to figure out which data to remove, removing
# the primary stream last means that we might recover more nicely if
# we are interrupted and restarted.
clean_paths = list(reversed(streams[path].also_clean_paths)) + [ path ]
for p in clean_paths:
printf(" removing from %s\n", p)
if args.yes:
client.stream_remove(p, None, remove_before)
# All done
if not args.yes:
printf("Note: specify --yes to actually perform removals\n")
return
if __name__ == "__main__":
main()

41
nilmtools/copy_one.py Executable file
View File

@@ -0,0 +1,41 @@
#!/usr/bin/python
# This is called copy_one instead of copy to avoid name conflicts with
# the Python standard library.
import nilmtools.filter
import nilmdb.client
from nilmdb.client.numpyclient import NumpyClient
import numpy as np
import sys
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Copy a stream")
# Parse arguments
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, e.src.layout)
raise SystemExit(1)
# Copy metadata
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata(meta)
# Copy all rows of data using the faster Numpy interfaces
extractor = NumpyClient(f.src.url).stream_extract_numpy
inserter = NumpyClient(f.dest.url).stream_insert_numpy_context
for i in f.intervals():
print "Processing", i.human_string()
with inserter(f.dest.path, i.start, i.end) as insert_ctx:
for data in extractor(f.src.path, i.start, i.end):
insert_ctx.insert(data)
if __name__ == "__main__":
main()

70
nilmtools/copy_wildcard.py Executable file
View File

@@ -0,0 +1,70 @@
#!/usr/bin/python
# Copy streams between NilmDB servers with wildcards
import nilmtools.filter
import nilmtools.copy_one
import nilmdb.client
import argparse
import fnmatch
def main(argv = None):
f = nilmtools.filter.Filter()
# Reuse filter's parser, since it handles most options we need.
parser = f.setup_parser(description = """\
Copy all streams matching the given wildcard from one host to another.
Example: %(prog)s -u http://host1/nilmdb -U http://host2/nilmdb /sharon/*
""", skip_paths = True)
parser.add_argument("path", action="store", nargs="+",
help='Wildcard paths to copy')
args = parser.parse_args(argv)
# Verify arguments
if args.dest_url is None:
parser.error("must provide both source and destination URL")
client_src = nilmdb.client.Client(args.url)
client_dest = nilmdb.client.Client(args.dest_url)
if client_src.geturl() == client_dest.geturl():
parser.error("source and destination URL must be different")
print "Source URL:", client_src.geturl()
print " Dest URL:", client_dest.geturl()
# Find matching streams
matched = []
for path in args.path:
matched.extend([s for s in client_src.stream_list(extended = True)
if fnmatch.fnmatch(s[0], path)
and s not in matched])
# Create destination streams if they don't exist
for stream in matched:
src = nilmtools.filter.StreamInfo(client_src.geturl(), stream)
dest = nilmtools.filter.get_stream_info(client_dest, src.path)
if not dest:
print "Creating destination stream", src.path
client_dest.stream_create(src.path, src.layout)
# Copy them all by running the "copy" tool as if it were
# invoked from the command line.
for stream in matched:
new_argv = ["--url", client_src.geturl(),
"--dest-url", client_dest.geturl() ]
if args.start:
new_argv.extend(["--start", "@" + repr(args.start)])
if args.end:
new_argv.extend(["--end", "@" + repr(args.end)])
if args.dry_run:
new_argv.extend(["--dry-run"])
if args.force_metadata:
new_argv.extend(["--force-metadata"])
new_argv.extend([stream[0], stream[0]])
try:
nilmtools.copy_one.main(new_argv)
except SystemExit as e:
# Ignore SystemExit which could be raised on --dry-run
if e.code != 0:
raise
if __name__ == "__main__":
main()

85
nilmtools/decimate.py Executable file
View File

@@ -0,0 +1,85 @@
#!/usr/bin/python
import nilmtools.filter
import nilmdb.client
import numpy as np
import operator
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Decimate a stream")
group = parser.add_argument_group("Decimate options")
group.add_argument('-f', '--factor', action='store', default=4, type=int,
help='Decimation factor (default: %(default)s)')
# Parse arguments
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
# If no destination, suggest how to create it by figuring out
# a recommended layout.
src = e.src
dest = e.dest
print "Source is %s (%s)" % (src.path, src.layout)
print "Destination %s doesn't exist" % (dest.path)
if "decimate_source" in f.client_src.stream_get_metadata(src.path):
rec = src.layout
elif 'int32' in src.layout_type or 'float64' in src.layout_type:
rec = 'float64_' + str(src.layout_count * 3)
else:
rec = 'float32_' + str(src.layout_count * 3)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, rec)
raise SystemExit(1)
if not (args.factor >= 2):
raise Exception("factor needs to be 2 or more")
f.check_dest_metadata({ "decimate_source": f.src.path,
"decimate_factor": args.factor })
# If source is decimated, we have to decimate a bit differently
if "decimate_source" in f.client_src.stream_get_metadata(args.srcpath):
again = True
else:
again = False
f.process_numpy(decimate, args = (args.factor, again))
def decimate(data, interval, args, insert_function, final):
"""Decimate data"""
(factor, again) = args
(n, m) = data.shape
# Figure out which columns to use as the source for mean, min, and max,
# depending on whether this is the first decimation or we're decimating
# again. Note that we include the timestamp in the means.
if again:
c = (m - 1) // 3
# e.g. c = 3
# ts mean1 mean2 mean3 min1 min2 min3 max1 max2 max3
mean_col = slice(0, c + 1)
min_col = slice(c + 1, 2 * c + 1)
max_col = slice(2 * c + 1, 3 * c + 1)
else:
mean_col = slice(0, m)
min_col = slice(1, m)
max_col = slice(1, m)
# Discard extra rows that aren't a multiple of factor
n = n // factor * factor
data = data[:n,:]
# Reshape it into 3D so we can process 'factor' rows at a time
data = data.reshape(n // factor, factor, m)
# Fill the result
out = np.c_[ np.mean(data[:,:,mean_col], axis=1),
np.min(data[:,:,min_col], axis=1),
np.max(data[:,:,max_col], axis=1) ]
insert_function(out)
return n
if __name__ == "__main__":
main()

98
nilmtools/decimate_auto.py Executable file
View File

@@ -0,0 +1,98 @@
#!/usr/bin/python
import nilmtools.filter
import nilmtools.decimate
import nilmdb.client
import argparse
import fnmatch
def main(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = """\
Automatically create multiple decimations from a single source
stream, continuing until the last decimated level contains fewer
than 500 points total.
Wildcards and multiple paths are accepted. Decimated paths are
ignored when matching wildcards.
""")
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument('-f', '--factor', action='store', default=4, type=int,
help='Decimation factor (default: %(default)s)')
parser.add_argument("--force-metadata", action="store_true",
default = False,
help="Force metadata changes if the dest "
"doesn't match")
parser.add_argument("path", action="store", nargs='+',
help='Path of base stream')
args = parser.parse_args(argv)
# Pull out info about the base stream
client = nilmdb.client.Client(args.url)
# Find list of paths to process
streams = [ unicode(s[0]) for s in client.stream_list() ]
streams = [ s for s in streams if "~decim-" not in s ]
paths = []
for path in args.path:
new = fnmatch.filter(streams, unicode(path))
if not new:
print "error: no stream matched path:", path
raise SystemExit(1)
paths.extend(new)
for path in paths:
do_decimation(client, args, path)
def do_decimation(client, args, path):
print "Decimating", path
info = nilmtools.filter.get_stream_info(client, path)
if not info:
raise Exception("path " + path + " not found")
meta = client.stream_get_metadata(path)
if "decimate_source" in meta:
print "Stream", path, "was decimated from", meta["decimate_source"]
print "You need to pass the base stream instead"
raise SystemExit(1)
# Figure out the type we should use for decimated streams
if 'int32' in info.layout_type or 'float64' in info.layout_type:
decimated_type = 'float64_' + str(info.layout_count * 3)
else:
decimated_type = 'float32_' + str(info.layout_count * 3)
# Now do the decimations until we have few enough points
factor = 1
while True:
print "Level", factor, "decimation has", info.rows, "rows"
if info.rows <= 500:
break
factor *= args.factor
new_path = "%s~decim-%d" % (path, factor)
# Create the stream if needed
new_info = nilmtools.filter.get_stream_info(client, new_path)
if not new_info:
print "Creating stream", new_path
client.stream_create(new_path, decimated_type)
# Run the decimation as if it were run from the commandline
new_argv = [ "-u", args.url,
"-f", str(args.factor) ]
if args.force_metadata:
new_argv.extend([ "--force-metadata" ])
new_argv.extend([info.path, new_path])
nilmtools.decimate.main(new_argv)
# Update info using the newly decimated stream
info = nilmtools.filter.get_stream_info(client, new_path)
return
if __name__ == "__main__":
main()

395
nilmtools/filter.py Executable file → Normal file
View File

@@ -1,66 +1,389 @@
#!/usr/bin/python
import nilmdb.client
from nilmdb.utils.printf import *
from nilmdb.utils.time import parse_time, format_time
from __future__ import absolute_import
import nilmdb.client
from nilmdb.client import Client
from nilmdb.client.numpyclient import NumpyClient
from nilmdb.utils.printf import *
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds)
from nilmdb.utils.interval import Interval
import nilmtools
import itertools
import time
import sys
import re
import argparse
import numpy as np
import cStringIO
import functools
class ArgumentError(Exception):
pass
class MissingDestination(Exception):
def __init__(self, args, src, dest):
self.parsed_args = args
self.src = src
self.dest = dest
Exception.__init__(self, "destination path " + dest.path + " not found")
class StreamInfo(object):
def __init__(self, url, info):
self.url = url
self.info = info
try:
self.path = info[0]
self.layout = info[1]
self.layout_type = self.layout.split('_')[0]
self.layout_count = int(self.layout.split('_')[1])
self.total_count = self.layout_count + 1
self.timestamp_min = info[2]
self.timestamp_max = info[3]
self.rows = info[4]
self.seconds = nilmdb.utils.time.timestamp_to_seconds(info[5])
except IndexError, TypeError:
pass
def string(self, interhost):
"""Return stream info as a string. If interhost is true,
include the host URL."""
if interhost:
return sprintf("[%s] ", self.url) + str(self)
return str(self)
def __str__(self):
"""Return stream info as a string."""
return sprintf("%s (%s), %.2fM rows, %.2f hours",
self.path, self.layout, self.rows / 1e6,
self.seconds / 3600.0)
def get_stream_info(client, path):
"""Return a StreamInfo object about the given path, or None if it
doesn't exist"""
streams = client.stream_list(path, extended = True)
if len(streams) != 1:
return None
return StreamInfo(client.geturl(), streams[0])
# Filter processing for a single interval of data.
def process_numpy_interval(interval, extractor, inserter, warn_rows,
function, args = None):
"""For the given 'interval' of data, extract data, process it
through 'function', and insert the result.
'extractor' should be a function like NumpyClient.stream_extract_numpy
but with the the interval 'start' and 'end' as the only parameters,
e.g.:
extractor = functools.partial(NumpyClient.stream_extract_numpy,
src_path, layout = l, maxrows = m)
'inserter' should be a function like NumpyClient.stream_insert_context
but with the interval 'start' and 'end' as the only parameters, e.g.:
inserter = functools.partial(NumpyClient.stream_insert_context,
dest_path)
If 'warn_rows' is not None, print a warning to stdout when the
number of unprocessed rows exceeds this amount.
See process_numpy for details on 'function' and 'args'.
"""
if args is None:
args = []
with inserter(interval.start, interval.end) as insert_ctx:
insert_func = insert_ctx.insert
old_array = np.array([])
for new_array in extractor(interval.start, interval.end):
# If we still had old data left, combine it
if old_array.shape[0] != 0:
array = np.vstack((old_array, new_array))
else:
array = new_array
# Pass the data to the user provided function
processed = function(array, interval, args, insert_func, False)
# Send any pending data that the user function inserted
insert_ctx.send()
# Save the unprocessed parts
if processed >= 0:
old_array = array[processed:]
else:
raise Exception(
sprintf("%s return value %s must be >= 0",
str(function), str(processed)))
# Warn if there's too much data remaining
if warn_rows is not None and old_array.shape[0] > warn_rows:
printf("warning: %d unprocessed rows in buffer\n",
old_array.shape[0])
# Last call for this contiguous interval
if old_array.shape[0] != 0:
processed = function(old_array, interval, args,
insert_func, True)
if processed != old_array.shape[0]:
# Truncate the interval we're inserting at the first
# unprocessed data point. This ensures that
# we'll not miss any data when we run again later.
insert_ctx.update_end(old_array[processed][0])
def example_callback_function(data, interval, args, insert_func, final):
"""Example of the signature for the function that gets passed
to process_numpy_interval.
'data': array of data to process -- may be empty
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
If unprocessed data remains after 'final' is True, the interval
being inserted will be ended at the timestamp of the first
unprocessed data point.
"""
raise NotImplementedError("example_callback_function does nothing")
class Filter(object):
def __init__(self, description = "Filter data"):
self.args = None
self._client = None
self.parse_args(description)
def __init__(self, parser_description = None):
self._parser = None
self._client_src = None
self._client_dest = None
self._using_client = False
self.src = None
self.dest = None
self.start = None
self.end = None
self._interhost = False
self._force_metadata = False
if parser_description is not None:
self.setup_parser(parser_description)
self.parse_args()
def parse_args(self, description):
@property
def client_src(self):
if self._using_client:
raise Exception("Filter client is in use; make another")
return self._client_src
@property
def client_dest(self):
if self._using_client:
raise Exception("Filter client is in use; make another")
return self._client_dest
def setup_parser(self, description = "Filter data", skip_paths = False):
parser = argparse.ArgumentParser(
description = description,
formatter_class = argparse.RawDescriptionHelpFormatter)
parser.add_argument("-u", "--url", action="store",
default="http://localhost:12380/",
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = description)
group = parser.add_argument_group("General filter arguments")
group.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="Server URL (default: %(default)s)")
parser.add_argument("srcpath", action="store",
group.add_argument("-U", "--dest-url", action="store",
help="Destination server URL "
"(default: same as source)")
group.add_argument("-D", "--dry-run", action="store_true",
default = False,
help="Just print intervals that would be "
"processed")
group.add_argument("--force-metadata", action="store_true",
default = False,
help="Force metadata changes if the dest "
"doesn't match")
group.add_argument("-s", "--start",
metavar="TIME", type=self.arg_time,
help="Starting timestamp for intervals "
"(free-form, inclusive)")
group.add_argument("-e", "--end",
metavar="TIME", type=self.arg_time,
help="Ending timestamp for intervals "
"(free-form, noninclusive)")
if not skip_paths:
# Individual filter scripts might want to add these arguments
# themselves, to include multiple sources in a different order
# (for example). "srcpath" and "destpath" arguments must exist,
# though.
group.add_argument("srcpath", action="store",
help="Path of source stream, e.g. /foo/bar")
parser.add_argument("destpath", action="store",
group.add_argument("destpath", action="store",
help="Path of destination stream, e.g. /foo/bar")
self.args = parser.parse_args()
self._parser = parser
return parser
self._client = nilmdb.client.Client(args.url)
def set_args(self, url, dest_url, srcpath, destpath, start, end,
parsed_args = None, quiet = True):
"""Set arguments directly from parameters"""
if dest_url is None:
dest_url = url
if url != dest_url:
self._interhost = True
if args.srcpath == args.destpath:
raise Exception("source and destination path must be different")
self._client_src = Client(url)
self._client_dest = Client(dest_url)
# Open and print info about the streams
def stream_info_string(info):
return sprintf("%s (%s), %.2fM rows, %.2f hours\n",
info[0], info[1], info[4] / 1e6, info[5] / 3600)
if (not self._interhost) and (srcpath == destpath):
raise ArgumentError("source and destination path must be different")
src = self._client.stream_list(args.srcpath, extended = True)
if len(src) != 1:
raise Exception("source path " + args.srcpath + " not found")
print "Source:", stream_info_string(src[0])
# Open the streams
self.src = get_stream_info(self._client_src, srcpath)
if not self.src:
raise ArgumentError("source path " + srcpath + " not found")
dest = self._client.stream_list(args.destpath, extended = True)
if len(dest) != 1:
raise Exception("destination path " + args.destpath + " not found")
print " Dest:", stream_info_string(dest[0])
self.dest = get_stream_info(self._client_dest, destpath)
if not self.dest:
raise MissingDestination(parsed_args, self.src,
StreamInfo(dest_url, [destpath]))
self.start = start
self.end = end
# Print info
if not quiet:
print "Source:", self.src.string(self._interhost)
print " Dest:", self.dest.string(self._interhost)
def parse_args(self, argv = None):
"""Parse arguments from a command line"""
args = self._parser.parse_args(argv)
self.set_args(args.url, args.dest_url, args.srcpath, args.destpath,
args.start, args.end, quiet = False, parsed_args = args)
self._force_metadata = args.force_metadata
if args.dry_run:
for interval in self.intervals():
print interval.human_string()
raise SystemExit(0)
return args
def intervals(self):
"""Generate all the intervals that this filter should process"""
for i in self._client.stream_intervals(
args.srcpath, diffpath = args.destpath):
yield i
self._using_client = True
def main():
if self._interhost:
# Do the difference ourselves
s_intervals = ( Interval(start, end)
for (start, end) in
self._client_src.stream_intervals(
self.src.path,
start = self.start, end = self.end) )
d_intervals = ( Interval(start, end)
for (start, end) in
self._client_dest.stream_intervals(
self.dest.path,
start = self.start, end = self.end) )
intervals = nilmdb.utils.interval.set_difference(s_intervals,
d_intervals)
else:
# Let the server do the difference for us
intervals = ( Interval(start, end)
for (start, end) in
self._client_src.stream_intervals(
self.src.path, diffpath = self.dest.path,
start = self.start, end = self.end) )
# Optimize intervals: join intervals that are adjacent
for interval in nilmdb.utils.interval.optimize(intervals):
yield interval
self._using_client = False
# Misc helpers
@staticmethod
def arg_time(toparse):
"""Parse a time string argument"""
try:
return nilmdb.utils.time.parse_time(toparse)
except ValueError as e:
raise argparse.ArgumentTypeError(sprintf("%s \"%s\"",
str(e), toparse))
def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. For
each key in data, if the stream contains the key, it must match
values. If the stream does not contain the key, it is created."""
metadata = self._client_dest.stream_get_metadata(self.dest.path)
if not self._force_metadata:
for key in data:
wanted = data[key]
if not isinstance(wanted, basestring):
wanted = str(wanted)
val = metadata.get(key, wanted)
# Force UTF-8 encoding for comparison and display
wanted = wanted.encode('utf-8')
val = val.encode('utf-8')
key = key.encode('utf-8')
if val != wanted and self.dest.rows > 0:
m = "Metadata in destination stream:\n"
m += " %s = %s\n" % (key, val)
m += "doesn't match desired data:\n"
m += " %s = %s\n" % (key, wanted)
m += "Refusing to change it. To prevent this error, "
m += "change or delete the metadata with nilmtool,\n"
m += "remove existing data from the stream, or "
m += "retry with --force-metadata."
raise Exception(m)
# All good -- write the metadata in case it's not already there
self._client_dest.stream_update_metadata(self.dest.path, data)
# The main filter processing method.
def process_numpy(self, function, args = None, rows = 100000,
intervals = None):
"""Calls process_numpy_interval for each interval that currently
exists in self.src, but doesn't exist in self.dest. It will
process the data in chunks as follows:
For each chunk of data, call 'function' with a Numpy array
corresponding to the data. The data is converted to a Numpy
array in chunks of 'rows' rows at a time.
If 'intervals' is not None, process those intervals instead of
the default list.
'function' should be defined with the same interface as
nilmtools.filter.example_callback_function. See the
documentation of that for details. 'args' are passed to
'function'.
"""
extractor = NumpyClient(self.src.url).stream_extract_numpy
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
extractor_func = functools.partial(extractor, self.src.path,
layout = self.src.layout,
maxrows = rows)
inserter_func = functools.partial(inserter, self.dest.path)
for interval in (intervals or self.intervals()):
print "Processing", interval.human_string()
process_numpy_interval(interval, extractor_func, inserter_func,
rows * 3, function, args)
def main(argv = None):
# This is just a dummy function; actual filters can use the other
# functions to prepare stuff, and then do something with the data.
f = Filter()
for interval in f.intervals():
print "Generic filter: need to handle interval:", interval
parser = f.setup_parser()
args = f.parse_args(argv)
for i in f.intervals():
print "Generic filter: need to handle", i.human_string()
if __name__ == "__main__":
main()

View File

@@ -2,80 +2,173 @@
import nilmdb.client
from nilmdb.utils.printf import *
from nilmdb.utils.time import parse_time, format_time
from nilmdb.utils.time import (parse_time, timestamp_to_human,
timestamp_to_seconds, seconds_to_timestamp,
rate_to_period, now as time_now)
import nilmtools
import time
import sys
import re
import argparse
import subprocess
import textwrap
class ParseError(Exception):
def __init__(self, filename, error):
msg = filename + ": " + error
super(ParseError, self).__init__(msg)
def parse_args():
parser = argparse.ArgumentParser(description = """\
Insert data from ethstream, either live (using the system time as a
reference) or prerecorded (using comments in the file as a reference).
def parse_args(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = nilmtools.__version__,
description = textwrap.dedent("""\
Insert large amount of data from an external source like ethstream.
The data is assumed to have been recorded at the specified rate.
Small discrepencies between the accumulated timestamps and the
reference time are ignored; larger discrepencies cause gaps to be
created in the stream. Overlapping data returns an error.
""", formatter_class = argparse.RawDescriptionHelpFormatter)
This code tracks two timestamps:
(1) The 'data' timestamp is the precise timestamp corresponding to
a particular row of data, and is the timestamp that gets
inserted into the database. It increases by 'data_delta' for
every row of input.
'data_delta' can come from one of two sources. If '--delta'
is specified, it is pulled from the first column of data. If
'--rate' is specified, 'data_delta' is set to a fixed value of
(1 / rate).
(2) The 'clock' timestamp is the less precise timestamp that gives
the absolute time. It can come from two sources. If '--live'
is specified, it is pulled directly from the system clock. If
'--file' is specified, it is extracted from the input filename
every time a new file is opened for read, and from comments
that appear in the file.
Small discrepencies between 'data' and 'clock' are ignored. If
the 'data' timestamp ever differs from the 'clock' timestamp by
more than 'max_gap' seconds:
- If 'data' is running behind, there is a gap in the data, so it
is stepped forward to match 'clock'.
- If 'data' is running ahead, there is overlap in the data, and an
error is raised. If '--ignore' is specified, the current file
is skipped instead of raising an error.
"""))
parser.add_argument("-u", "--url", action="store",
default="http://localhost:12380/",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-r", "--rate", action="store", default=8000, type=float,
help="Data rate in Hz (default: %(default)s)")
parser.add_argument("-l", "--live", action="store_true",
help="Live capture; use system time to verify rate")
parser.add_argument("path", action="store",
group = parser.add_argument_group("Misc options")
group.add_argument("-D", "--dry-run", action="store_true",
help="Parse files, but don't insert any data")
group.add_argument("-s", "--skip", action="store_true",
help="Skip files if the data would overlap")
group.add_argument("-m", "--max-gap", action="store", default=10.0,
metavar="SEC", type=float,
help="Max discrepency between clock and data "
"timestamps (default: %(default)s)")
group = parser.add_argument_group("Data timestamp delta")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-r", "--rate", action="store", default=8000.0,
type=float,
help="Data_delta is constant 1/RATE "
"(default: %(default)s Hz)")
exc.add_argument("-d", "--delta", action="store_true",
help="Data_delta is the first number in each line")
group = parser.add_argument_group("Clock timestamp source")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-l", "--live", action="store_true",
help="Use live system time for clock timestamp")
exc.add_argument("-f", "--file", action="store_true", default=True,
help="Use filename or comments for clock timestamp")
group.add_argument("-o", "--offset-filename", metavar="SEC",
action="store", default=-3600.0, type=float,
help="Offset to add to filename timestamps "
"(default: %(default)s)")
group.add_argument("-O", "--offset-comment", metavar="SEC",
action="store", default=0.0, type=float,
help="Offset to add to comment timestamps "
"(default: %(default)s)")
group = parser.add_argument_group("Database path")
group.add_argument("path", action="store",
help="Path of stream, e.g. /foo/bar")
parser.add_argument("infile", type=argparse.FileType('r'), nargs='*',
default=[sys.stdin], help="Input files (default: stdin)")
args = parser.parse_args()
group = parser.add_argument_group("Input files")
group.add_argument("infile", type=argparse.FileType('r'), nargs='*',
default=[sys.stdin],
help="Input files (default: stdin)")
args = parser.parse_args(argv)
printf(" Stream path: %s\n", args.path)
printf(" Data rate: %s Hz\n", repr(args.rate))
printf(" Data timestamp: ")
if args.delta:
printf("delta on each input line\n")
else:
printf("fixed rate %s Hz\n", repr(args.rate))
printf(" Clock timestamp: ")
if args.live:
printf("live system clock\n")
else:
printf("from filenames and comments\n")
printf(" Filename offset: %s seconds\n", repr(args.offset_filename))
printf(" Comment offset: %s seconds\n", repr(args.offset_comment))
printf(" Max gap: %s seconds\n", repr(args.max_gap))
if args.dry_run:
printf("Dry run (no data will be inserted)\n")
return args
def main(args = None):
if args is None:
args = parse_args()
def main(argv = None):
args = parse_args(argv)
client = nilmdb.client.Client(args.url)
# Local copies to save dictionary lookups
live = args.live
# data_ts is the timestamp that we'll use for the current line
data_ts_base = 0
data_ts_inc = 0
data_ts_step = 1.0 / args.rate
data_ts_rate = args.rate
data_ts_delta = 0
def get_data_ts():
if args.delta:
return data_ts_base + data_ts_delta
else:
return data_ts_base + rate_to_period(data_ts_rate,
data_ts_inc)
# clock_ts is the imprecise "real" timestamp (from the filename,
# comments, or or system clock)
# comments, or system clock)
clock_ts = None
def print_clock_updated():
printf("Clock time updated to %s\n", format_time(clock_ts))
printf("Clock timestamp updated to %s\n", timestamp_to_human(clock_ts))
if data_ts_base != 0:
diff = data_ts - clock_ts
diff = get_data_ts() - clock_ts
if diff >= 0:
printf(" (data timestamp ahead by %.6f sec)\n", diff)
printf(" (data timestamp ahead by %.6f sec)\n",
timestamp_to_seconds(diff))
else:
printf(" (data timestamp behind by %.6f sec)\n", -diff)
printf(" (data timestamp behind by %.6f sec)\n",
timestamp_to_seconds(-diff))
offset_filename = seconds_to_timestamp(args.offset_filename)
offset_comment = seconds_to_timestamp(args.offset_comment)
max_gap = seconds_to_timestamp(args.max_gap)
with client.stream_insert_context(args.path) as stream:
for f in args.infile:
filename = f.name
printf("Processing %s\n", filename)
# If the filename ends in .gz, open it with gzcat instead.
# If the filename ends in .gz, re-open it with gzip to
# decompress.
if filename.endswith(".gz"):
p = subprocess.Popen(["gzip", "-dc"],
stdin = f, stdout = subprocess.PIPE)
@@ -86,7 +179,7 @@ def main(args = None):
# Subtract 1 hour because files are created at the end
# of the hour. Hopefully, we'll be able to use
# internal comments and this value won't matter anyway.
clock_ts = parse_time(filename).totimestamp() - 3600
clock_ts = parse_time(filename) + offset_filename
print_clock_updated()
except ValueError:
pass
@@ -95,7 +188,15 @@ def main(args = None):
# Read each line
for line in f:
data_ts = data_ts_base + data_ts_inc * data_ts_step
# Once in a while a line might be truncated, if we're
# at the end of a file. Ignore it, but if we ignore
# too many, bail out.
if line[-1] != '\n':
truncated_lines += 1
if truncated_lines > 3:
raise ParseError(filename, "too many short lines")
printf("Ignoring short line in %s\n", filename)
continue
# If no content other than the newline, skip it
if len(line) <= 1:
@@ -104,41 +205,57 @@ def main(args = None):
# If line starts with a comment, look for a timestamp
if line[0] == '#':
try:
clock_ts = parse_time(line[1:]).totimestamp()
clock_ts = parse_time(line[1:]) + offset_comment
print_clock_updated()
except ValueError:
pass
continue
# If --delta mode, increment data_ts_delta by the
# delta from the file.
if args.delta:
try:
(delta, line) = line.split(None, 1)
data_ts_delta += float(delta)
except ValueError:
raise ParseError(filename, "can't parse delta")
# Calculate data_ts for this row
data_ts = get_data_ts()
# If inserting live, use clock timestamp
if live:
clock_ts = time.time()
if args.live:
clock_ts = time_now()
# If we have a real timestamp, compare it to the data
# timestamp, and make sure things match up.
if clock_ts is not None:
if (data_ts - 10) > clock_ts:
if (data_ts - max_gap) > clock_ts:
# Accumulated line timestamps are in the future.
# If we were to set data_ts=clock_ts, we'd create
# an overlap, so we have to just bail out here.
err = sprintf("Data is coming in too fast: data time "
"is %s but clock time is only %s",
format_time(data_ts),
format_time(clock_ts))
timestamp_to_human(data_ts),
timestamp_to_human(clock_ts))
if args.skip:
printf("%s\n", err)
printf("Skipping the remainder of this file\n")
break
raise ParseError(filename, err)
if (data_ts + 10) < clock_ts:
if (data_ts + max_gap) < clock_ts:
# Accumulated line timetamps are in the past. We
# can just skip some time and leave a gap in the
# data.
if data_ts_base != 0:
printf("Skipping data timestamp forward from "
"%s to %s to match clock time\n",
format_time(data_ts),
format_time(clock_ts))
timestamp_to_human(data_ts),
timestamp_to_human(clock_ts))
stream.finalize()
data_ts_base = data_ts = clock_ts
data_ts_inc = 0
data_ts_inc = data_ts_delta = 0
# Don't use this clock time anymore until we update it
clock_ts = None
@@ -146,21 +263,12 @@ def main(args = None):
if data_ts_base == 0:
raise ParseError(filename, "No idea what timestamp to use")
# This line is legit, so increment timestamp
# This line is legit, so increment timestamp (for --rate)
data_ts_inc += 1
# Once in a while a line might be truncated, if we're at
# the end of a file. Ignore it, but if we ignore too many,
# bail out.
if line[-1] != '\n':
truncated_lines += 1
if truncated_lines > 3:
raise ParseError(filename, "too many short lines")
printf("Ignoring short line in %s\n", filename)
continue
# Insert it
stream.insert("%.6f %s" % (data_ts, line))
if not args.dry_run:
stream.insert("%d %s" % (data_ts, line))
print "Done"
if __name__ == "__main__":

107
nilmtools/math.py Normal file
View File

@@ -0,0 +1,107 @@
#!/usr/bin/python
# Miscellaenous useful mathematical functions
from nilmdb.utils.printf import *
from numpy import *
from scipy import *
def sfit4(data, fs):
"""(A, f0, phi, C) = sfit4(data, fs)
Compute 4-parameter (unknown-frequency) least-squares fit to
sine-wave data, according to IEEE Std 1241-2010 Annex B
Input:
data vector of input samples
fs sampling rate (Hz)
Output:
Parameters [A, f0, phi, C] to fit the equation
x[n] = A * sin(f0/fs * 2 * pi * n + phi) + C
where n is sample number. Or, as a function of time:
x(t) = A * sin(f0 * 2 * pi * t + phi) + C
by Jim Paris
(Verified to match sfit4.m)
"""
N = len(data)
t = linspace(0, (N-1) / float(fs), N)
## Estimate frequency using FFT (step b)
Fc = fft(data)
F = abs(Fc)
F[0] = 0 # eliminate DC
# Find pair of spectral lines with largest amplitude:
# resulting values are in F(i) and F(i+1)
i = argmax(F[0:int(N/2)] + F[1:int(N/2+1)])
# Interpolate FFT to get a better result (from Markus [B37])
U1 = real(Fc[i])
U2 = real(Fc[i+1])
V1 = imag(Fc[i])
V2 = imag(Fc[i+1])
n = 2 * pi / N
ni1 = n * i
ni2 = n * (i+1)
K = ((V2-V1)*sin(ni1) + (U2-U1)*cos(ni1)) / (U2-U1)
Z1 = V1 * (K - cos(ni1)) / sin(ni1) + U1
Z2 = V2 * (K - cos(ni2)) / sin(ni2) + U2
i = arccos((Z2*cos(ni2) - Z1*cos(ni1)) / (Z2-Z1)) / n
# Convert to Hz
f0 = i * float(fs) / N
# Fit it. We'll catch exceptions here and just returns zeros
# if something fails with the least squares fit, etc.
try:
# first guess for A0, B0 using 3-parameter fit (step c)
s = zeros(3)
w = 2*pi*f0
# Now iterate 7 times (step b, plus 6 iterations of step i)
for idx in range(7):
D = c_[cos(w*t), sin(w*t), ones(N),
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
s = linalg.lstsq(D, data)[0] # eqn B.18
w = w + s[3] # update frequency estimate
## Extract results
A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
f0 = w / (2*pi)
phi = arctan2(s[0], s[1]) # eqn B.22 (flipped for sin instead of cos)
C = s[2]
return (A, f0, phi, C)
except Exception as e:
# something broke down; just return zeros
return (0, 0, 0, 0)
def peak_detect(data, delta = 0.1):
"""Simple min/max peak detection algorithm, taken from my code
in the disagg.m from the 10-8-5 paper.
Returns an array of peaks: each peak is a tuple
(n, p, is_max)
where n is the row number in 'data', and p is 'data[n]',
and is_max is True if this is a maximum, False if it's a minimum,
"""
peaks = [];
cur_min = (None, inf)
cur_max = (None, -inf)
lookformax = False
for (n, p) in enumerate(data):
if p > cur_max[1]:
cur_max = (n, p)
if p < cur_min[1]:
cur_min = (n, p)
if lookformax:
if p < (cur_max[1] - delta):
peaks.append((cur_max[0], cur_max[1], True))
cur_min = (n, p)
lookformax = False
else:
if p > (cur_min[1] + delta):
peaks.append((cur_min[0], cur_min[1], False))
cur_max = (n, p)
lookformax = True
return peaks

43
nilmtools/median.py Executable file
View File

@@ -0,0 +1,43 @@
#!/usr/bin/python
import nilmtools.filter, scipy.signal
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Median Filter")
group = parser.add_argument_group("Median filter options")
group.add_argument("-z", "--size", action="store", type=int, default=25,
help = "median filter size (default %(default)s)")
group.add_argument("-d", "--difference", action="store_true",
help = "store difference rather than filtered values")
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, e.src.layout)
raise SystemExit(1)
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata({ "median_filter_source": f.src.path,
"median_filter_size": args.size,
"median_filter_difference": repr(args.difference) })
f.process_numpy(median_filter, args = (args.size, args.difference))
def median_filter(data, interval, args, insert, final):
(size, diff) = args
(rows, cols) = data.shape
for i in range(cols - 1):
filtered = scipy.signal.medfilt(data[:, i+1], size)
if diff:
data[:, i+1] -= filtered
else:
data[:, i+1] = filtered
insert(data)
return rows
if __name__ == "__main__":
main()

168
nilmtools/pipewatch.py Executable file
View File

@@ -0,0 +1,168 @@
#!/usr/bin/python
import nilmdb.client
from nilmdb.utils.printf import *
import nilmdb.utils.lock
import nilmtools
import time
import sys
import os
import argparse
import subprocess
import tempfile
import threading
import select
import signal
import Queue
import daemon
def parse_args(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
version = nilmtools.__version__,
description = """\
Pipe data from 'generator' to 'consumer'. This is intended to be
executed frequently from cron, and will exit if another copy is
already running. If 'generator' or 'consumer' returns an error,
or if 'generator' stops sending data for a while, it will exit.
Intended for use with ethstream (generator) and nilm-insert
(consumer). Commands are executed through the shell.
""")
parser.add_argument("-d", "--daemon", action="store_true",
help="Run in background")
parser.add_argument("-l", "--lock", metavar="FILENAME", action="store",
default=tempfile.gettempdir() +
"/nilm-pipewatch.lock",
help="Lock file for detecting running instance")
parser.add_argument("-t", "--timeout", metavar="SECONDS", action="store",
type=float, default=30,
help="Restart if no output from " +
"generator for this long")
group = parser.add_argument_group("commands to execute")
group.add_argument("generator", action="store",
help="Data generator (e.g. \"ethstream -r 8000\")")
group.add_argument("consumer", action="store",
help="Data consumer (e.g. \"nilm-insert /foo/bar\")")
args = parser.parse_args(argv)
return args
def reader_thread(queue, fd):
# Read from a file descriptor, write to queue.
try:
while True:
(r, w, x) = select.select([fd], [], [fd], 0.25)
if x:
raise Exception # generator died?
if not r:
# short timeout -- just try again. This is to catch the
# fd being closed elsewhere, which is only detected
# when select restarts.
continue
data = os.read(fd, 65536)
if data == "": # generator EOF
raise Exception
queue.put(data)
except Exception:
queue.put(None)
def watcher_thread(queue, procs):
# Put None in the queue if either process dies
while True:
for p in procs:
if p.poll() is not None:
queue.put(None)
return
time.sleep(0.25)
def pipewatch(args):
# Run the processes, etc
with open(os.devnull, "r") as devnull:
generator = subprocess.Popen(args.generator, shell = True,
bufsize = -1, close_fds = True,
stdin = devnull,
stdout = subprocess.PIPE,
stderr = None)
consumer = subprocess.Popen(args.consumer, shell = True,
bufsize = -11, close_fds = True,
stdin = subprocess.PIPE,
stdout = None, stderr = None)
queue = Queue.Queue(maxsize = 32)
reader = threading.Thread(target = reader_thread,
args = (queue, generator.stdout.fileno()))
reader.start()
watcher = threading.Thread(target = watcher_thread,
args = (queue, [generator, consumer]))
watcher.start()
try:
while True:
try:
data = queue.get(True, args.timeout)
if data is None:
break
consumer.stdin.write(data)
except Queue.Empty:
# Timeout: kill the generator
fprintf(sys.stderr, "pipewatch: timeout\n")
generator.terminate()
break
generator.stdout.close()
consumer.stdin.close()
except IOError:
fprintf(sys.stderr, "pipewatch: I/O error\n")
def kill(proc):
# Wait for a process to end, or kill it
def poll_timeout(proc, timeout):
for x in range(1+int(timeout / 0.1)):
if proc.poll() is not None:
break
time.sleep(0.1)
return proc.poll()
try:
if poll_timeout(proc, 0.5) is None:
proc.terminate()
if poll_timeout(proc, 0.5) is None:
proc.kill()
except OSError:
pass
return poll_timeout(proc, 0.5)
# Wait for them to die, or kill them
gret = kill(generator)
cret = kill(consumer)
fprintf(sys.stderr, "pipewatch: generator returned %d, " +
"consumer returned %d\n", gret, cret)
if gret == 0 and cret == 0:
sys.exit(0)
sys.exit(1)
def main(argv = None):
args = parse_args(argv)
lockfile = open(args.lock, "w")
if not nilmdb.utils.lock.exclusive_lock(lockfile):
printf("pipewatch process already running (according to %s)\n",
args.lock)
sys.exit(0)
try:
# Run as a daemon if requested, otherwise run directly.
if args.daemon:
with daemon.DaemonContext(files_preserve = [ lockfile ]):
pipewatch(args)
else:
pipewatch(args)
finally:
# Clean up lockfile
try:
os.unlink(args.lock)
except OSError:
pass
if __name__ == "__main__":
main()

191
nilmtools/prep.py Executable file
View File

@@ -0,0 +1,191 @@
#!/usr/bin/python
# Spectral envelope preprocessor.
# Requires two streams as input: the original raw data, and sinefit data.
from nilmdb.utils.printf import *
from nilmdb.utils.time import timestamp_to_human
import nilmtools.filter
import nilmdb.client
from numpy import *
import scipy.fftpack
import scipy.signal
#from matplotlib import pyplot as p
import bisect
from nilmdb.utils.interval import Interval
def main(argv = None):
# Set up argument parser
f = nilmtools.filter.Filter()
parser = f.setup_parser("Spectral Envelope Preprocessor", skip_paths = True)
group = parser.add_argument_group("Prep options")
group.add_argument("-c", "--column", action="store", type=int,
help="Column number (first data column is 1)")
group.add_argument("-n", "--nharm", action="store", type=int, default=4,
help="number of odd harmonics to compute (default 4)")
group.add_argument("-N", "--nshift", action="store", type=int, default=1,
help="number of shifted FFTs per period (default 1)")
exc = group.add_mutually_exclusive_group()
exc.add_argument("-r", "--rotate", action="store", type=float,
help="rotate FFT output by this many degrees (default 0)")
exc.add_argument("-R", "--rotate-rad", action="store", type=float,
help="rotate FFT output by this many radians (default 0)")
group.add_argument("srcpath", action="store",
help="Path of raw input, e.g. /foo/raw")
group.add_argument("sinepath", action="store",
help="Path of sinefit input, e.g. /foo/sinefit")
group.add_argument("destpath", action="store",
help="Path of prep output, e.g. /foo/prep")
# Parse arguments
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
rec = "float32_%d" % (e.parsed_args.nharm * 2)
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec)
raise SystemExit(1)
if f.dest.layout_count != args.nharm * 2:
print "error: need", args.nharm*2, "columns in destination stream"
raise SystemExit(1)
# Check arguments
if args.column is None or args.column < 1:
parser.error("need a column number >= 1")
if args.nharm < 1 or args.nharm > 32:
parser.error("number of odd harmonics must be 1-32")
if args.nshift < 1:
parser.error("number of shifted FFTs must be >= 1")
if args.rotate is not None:
rotation = args.rotate * 2.0 * pi / 360.0
else:
rotation = args.rotate_rad or 0.0
# Check the sine fit stream
client_sinefit = nilmdb.client.Client(args.url)
sinefit = nilmtools.filter.get_stream_info(client_sinefit, args.sinepath)
if not sinefit:
raise Exception("sinefit data not found")
if sinefit.layout != "float32_3":
raise Exception("sinefit data type is " + sinefit.layout
+ "; expected float32_3")
# Check and set metadata in prep stream
f.check_dest_metadata({ "prep_raw_source": f.src.path,
"prep_sinefit_source": sinefit.path,
"prep_column": args.column,
"prep_rotation": repr(rotation) })
# Find the intersection of the usual set of intervals we'd filter,
# and the intervals actually present in sinefit data. This is
# what we will process.
filter_int = f.intervals()
sinefit_int = ( Interval(start, end) for (start, end) in
client_sinefit.stream_intervals(
args.sinepath, start = f.start, end = f.end) )
intervals = nilmdb.utils.interval.intersection(filter_int, sinefit_int)
# Run the process (using the helper in the filter module)
f.process_numpy(process, args = (client_sinefit, sinefit.path, args.column,
args.nharm, rotation, args.nshift),
intervals = intervals)
def process(data, interval, args, insert_function, final):
(client, sinefit_path, column, nharm, rotation, nshift) = args
rows = data.shape[0]
data_timestamps = data[:,0]
if rows < 2:
return 0
last_inserted = [nilmdb.utils.time.min_timestamp]
def insert_if_nonoverlapping(data):
"""Call insert_function to insert data, but only if this
data doesn't overlap with other data that we inserted."""
if data[0][0] <= last_inserted[0]:
return
last_inserted[0] = data[-1][0]
insert_function(data)
processed = 0
out = zeros((1, nharm * 2 + 1))
# Pull out sinefit data for the entire time range of this block
for sinefit_line in client.stream_extract(sinefit_path,
data[0, 0], data[rows-1, 0]):
def prep_period(t_min, t_max, rot):
"""
Compute prep coefficients from time t_min to t_max, which
are the timestamps of the start and end of one period.
Results are rotated by an additional extra_rot before
being inserted into the database. Returns the maximum
index processed, or None if the period couldn't be
processed.
"""
# Find the indices of data that correspond to (t_min, t_max)
idx_min = bisect.bisect_left(data_timestamps, t_min)
idx_max = bisect.bisect_left(data_timestamps, t_max)
if idx_min >= idx_max or idx_max >= len(data_timestamps):
return None
# Perform FFT over those indices
N = idx_max - idx_min
d = data[idx_min:idx_max, column]
F = scipy.fftpack.fft(d) * 2.0 / N
# If we wanted more harmonics than the FFT gave us, pad with zeros
if N < (nharm * 2):
F = r_[F, zeros(nharm * 2 - N)]
# Fill output data.
out[0, 0] = round(t_min)
for k in range(nharm):
Fk = F[2 * k + 1] * e**(rot * 1j * (k+1))
out[0, 2 * k + 1] = -imag(Fk) # Pk
out[0, 2 * k + 2] = real(Fk) # Qk
insert_if_nonoverlapping(out)
return idx_max
# Extract sinefit data to get zero crossing timestamps.
# t_min = beginning of period
# t_max = end of period
(t_min, f0, A, C) = [ float(x) for x in sinefit_line.split() ]
t_max = t_min + 1e6 / f0
# Compute prep over shifted windows of the period
# (nshift is typically 1)
for n in range(nshift):
# Compute timestamps and rotations for shifted window
time_shift = n * (t_max - t_min) / nshift
shifted_min = t_min + time_shift
shifted_max = t_max + time_shift
angle_shift = n * 2 * pi / nshift
shifted_rot = rotation - angle_shift
# Run prep computation
idx_max = prep_period(shifted_min, shifted_max, shifted_rot)
if not idx_max:
break
processed = idx_max
# If we processed no data but there's lots in here, pretend we
# processed half of it.
if processed == 0 and rows > 10000:
processed = rows / 2
printf("%s: warning: no periods found; skipping %d rows\n",
timestamp_to_human(data[0][0]), processed)
else:
printf("%s: processed %d of %d rows\n",
timestamp_to_human(data[0][0]), processed, rows)
return processed
if __name__ == "__main__":
main()

191
nilmtools/sinefit.py Executable file
View File

@@ -0,0 +1,191 @@
#!/usr/bin/python
# Sine wave fitting.
from nilmdb.utils.printf import *
import nilmtools.filter
import nilmtools.math
import nilmdb.client
from nilmdb.utils.time import (timestamp_to_human,
timestamp_to_seconds,
seconds_to_timestamp)
from numpy import *
from scipy import *
#import pylab as p
import sys
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Sine wave fitting")
group = parser.add_argument_group("Sine fit options")
group.add_argument('-c', '--column', action='store', type=int,
help='Column number (first data column is 1)')
group.add_argument('-f', '--frequency', action='store', type=float,
default=60.0,
help='Approximate frequency (default: %(default)s)')
group.add_argument('-m', '--min-freq', action='store', type=float,
help='Minimum valid frequency '
'(default: approximate frequency / 2))')
group.add_argument('-M', '--max-freq', action='store', type=float,
help='Maximum valid frequency '
'(default: approximate frequency * 2))')
group.add_argument('-a', '--min-amp', action='store', type=float,
default=20.0,
help='Minimum signal amplitude (default: %(default)s)')
# Parse arguments
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
rec = "float32_3"
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec)
raise SystemExit(1)
if args.column is None or args.column < 1:
parser.error("need a column number >= 1")
if args.frequency < 0.1:
parser.error("frequency must be >= 0.1")
if args.min_freq is None:
args.min_freq = args.frequency / 2
if args.max_freq is None:
args.max_freq = args.frequency * 2
if (args.min_freq > args.max_freq or
args.min_freq > args.frequency or
args.max_freq < args.frequency):
parser.error("invalid min or max frequency")
if args.min_amp < 0:
parser.error("min amplitude must be >= 0")
f.check_dest_metadata({ "sinefit_source": f.src.path,
"sinefit_column": args.column })
f.process_numpy(process, args = (args.column, args.frequency, args.min_amp,
args.min_freq, args.max_freq))
class SuppressibleWarning(object):
def __init__(self, maxcount = 10, maxsuppress = 100):
self.maxcount = maxcount
self.maxsuppress = maxsuppress
self.count = 0
self.last_msg = ""
def _write(self, sec, msg):
if sec:
now = timestamp_to_human(seconds_to_timestamp(sec)) + ": "
else:
now = ""
sys.stderr.write(now + msg)
def warn(self, msg, seconds = None):
self.count += 1
if self.count <= self.maxcount:
self._write(seconds, msg)
if (self.count - self.maxcount) >= self.maxsuppress:
self.reset(seconds)
def reset(self, seconds = None):
if self.count > self.maxcount:
self._write(seconds, sprintf("(%d warnings suppressed)\n",
self.count - self.maxcount))
self.count = 0
def process(data, interval, args, insert_function, final):
(column, f_expected, a_min, f_min, f_max) = args
rows = data.shape[0]
# Estimate sampling frequency from timestamps
fs = (rows-1) / (timestamp_to_seconds(data[-1][0]) -
timestamp_to_seconds(data[0][0]))
# Pull out about 3.5 periods of data at once;
# we'll expect to match 3 zero crossings in each window
N = max(int(3.5 * fs / f_expected), 10)
# If we don't have enough data, don't bother processing it
if rows < N:
return 0
warn = SuppressibleWarning(3, 1000)
# Process overlapping windows
start = 0
num_zc = 0
last_inserted_timestamp = None
while start < (rows - N):
this = data[start:start+N, column]
t_min = timestamp_to_seconds(data[start, 0])
t_max = timestamp_to_seconds(data[start+N-1, 0])
# Do 4-parameter sine wave fit
(A, f0, phi, C) = nilmtools.math.sfit4(this, fs)
# Check bounds. If frequency is too crazy, ignore this window
if f0 < f_min or f0 > f_max:
warn.warn(sprintf("frequency %s outside valid range %s - %s\n",
str(f0), str(f_min), str(f_max)), t_min)
start += N
continue
# If amplitude is too low, results are probably just noise
if A < a_min:
warn.warn(sprintf("amplitude %s below minimum threshold %s\n",
str(A), str(a_min)), t_min)
start += N
continue
#p.plot(arange(N), this)
#p.plot(arange(N), A * sin(f0/fs * 2 * pi * arange(N) + phi) + C, 'g')
# Period starts when the argument of sine is 0 degrees,
# so we're looking for sample number:
# n = (0 - phi) / (f0/fs * 2 * pi)
zc_n = (0 - phi) / (f0 / fs * 2 * pi)
period_n = fs/f0
# Add periods to make N positive
while zc_n < 0:
zc_n += period_n
last_zc = None
# Mark the zero crossings until we're a half period away
# from the end of the window
while zc_n < (N - period_n/2):
#p.plot(zc_n, C, 'ro')
t = t_min + zc_n / fs
if (last_inserted_timestamp is None or
t > last_inserted_timestamp):
insert_function([[seconds_to_timestamp(t), f0, A, C]])
last_inserted_timestamp = t
warn.reset(t)
else:
warn.warn("timestamp overlap\n", t)
num_zc += 1
last_zc = zc_n
zc_n += period_n
# Advance the window one quarter period past the last marked
# zero crossing, or advance the window by half its size if we
# didn't mark any.
if last_zc is not None:
advance = min(last_zc + period_n/4, N)
else:
advance = N/2
#p.plot(advance, C, 'go')
#p.show()
start = int(round(start + advance))
# Return the number of rows we've processed
warn.reset(last_inserted_timestamp)
if last_inserted_timestamp:
now = timestamp_to_human(seconds_to_timestamp(
last_inserted_timestamp)) + ": "
else:
now = ""
printf("%sMarked %d zero-crossings in %d rows\n", now, num_zc, start)
return start
if __name__ == "__main__":
main()

304
nilmtools/trainola.py Executable file
View File

@@ -0,0 +1,304 @@
#!/usr/bin/python
from nilmdb.utils.printf import *
import nilmdb.client
import nilmtools.filter
import nilmtools.math
from nilmdb.utils.time import (timestamp_to_human,
timestamp_to_seconds,
seconds_to_timestamp)
from nilmdb.utils import datetime_tz
from nilmdb.utils.interval import Interval
import numpy as np
import scipy
import scipy.signal
from numpy.core.umath_tests import inner1d
import nilmrun
from collections import OrderedDict
import sys
import time
import functools
import collections
class DataError(ValueError):
pass
def build_column_mapping(colinfo, streaminfo):
"""Given the 'columns' list from the JSON data, verify and
pull out a dictionary mapping for the column names/numbers."""
columns = OrderedDict()
for c in colinfo:
col_num = c['index'] + 1 # skip timestamp
if (c['name'] in columns.keys() or col_num in columns.values()):
raise DataError("duplicated columns")
if (c['index'] < 0 or c['index'] >= streaminfo.layout_count):
raise DataError("bad column number")
columns[c['name']] = col_num
if not len(columns):
raise DataError("no columns")
return columns
class Exemplar(object):
def __init__(self, exinfo, min_rows = 10, max_rows = 100000):
"""Given a dictionary entry from the 'exemplars' input JSON,
verify the stream, columns, etc. Then, fetch all the data
into self.data."""
self.name = exinfo['name']
self.url = exinfo['url']
self.stream = exinfo['stream']
self.start = exinfo['start']
self.end = exinfo['end']
self.dest_column = exinfo['dest_column']
# Get stream info
self.client = nilmdb.client.numpyclient.NumpyClient(self.url)
self.info = nilmtools.filter.get_stream_info(self.client, self.stream)
if not self.info:
raise DataError(sprintf("exemplar stream '%s' does not exist " +
"on server '%s'", self.stream, self.url))
# Build up name => index mapping for the columns
self.columns = build_column_mapping(exinfo['columns'], self.info)
# Count points
self.count = self.client.stream_count(self.stream, self.start, self.end)
# Verify count
if self.count == 0:
raise DataError("No data in this exemplar!")
if self.count < min_rows:
raise DataError("Too few data points: " + str(self.count))
if self.count > max_rows:
raise DataError("Too many data points: " + str(self.count))
# Extract the data
datagen = self.client.stream_extract_numpy(self.stream,
self.start, self.end,
self.info.layout,
maxrows = self.count)
self.data = list(datagen)[0]
# Extract just the columns that were specified in self.columns,
# skipping the timestamp.
extract_columns = [ value for (key, value) in self.columns.items() ]
self.data = self.data[:,extract_columns]
# Fix the column indices in e.columns, since we removed/reordered
# columns in self.data
for n, k in enumerate(self.columns):
self.columns[k] = n
# Subtract the means from each column
self.data = self.data - self.data.mean(axis=0)
# Get scale factors for each column by computing dot product
# of each column with itself.
self.scale = inner1d(self.data.T, self.data.T)
# Ensure a minimum (nonzero) scale and convert to list
self.scale = np.maximum(self.scale, [1e-9]).tolist()
def __str__(self):
return sprintf("\"%s\" %s [%s] %s rows",
self.name, self.stream, ",".join(self.columns.keys()),
self.count)
def timestamp_to_short_human(timestamp):
dt = datetime_tz.datetime_tz.fromtimestamp(timestamp_to_seconds(timestamp))
return dt.strftime("%H:%M:%S")
def trainola_matcher(data, interval, args, insert_func, final_chunk):
"""Perform cross-correlation match"""
( src_columns, dest_count, exemplars ) = args
nrows = data.shape[0]
# We want at least 10% more points than the widest exemplar.
widest = max([ x.count for x in exemplars ])
if (widest * 1.1) > nrows:
return 0
# This is how many points we'll consider valid in the
# cross-correlation.
valid = nrows + 1 - widest
matches = collections.defaultdict(list)
# Try matching against each of the exemplars
for e in exemplars:
corrs = []
# Compute cross-correlation for each column
for col_name in e.columns:
a = data[:, src_columns[col_name]]
b = e.data[:, e.columns[col_name]]
corr = scipy.signal.fftconvolve(a, np.flipud(b), 'valid')[0:valid]
# Scale by the norm of the exemplar
corr = corr / e.scale[e.columns[col_name]]
corrs.append(corr)
# Find the peaks using the column with the largest amplitude
biggest = e.scale.index(max(e.scale))
peaks = nilmtools.math.peak_detect(corrs[biggest], 0.1)
# To try to reduce false positives, discard peaks where
# there's a higher-magnitude peak (either min or max) within
# one exemplar width nearby.
good_peak_locations = []
for (i, (n, p, is_max)) in enumerate(peaks):
if not is_max:
continue
ok = True
# check up to 'e.count' rows before this one
j = i-1
while ok and j >= 0 and peaks[j][0] > (n - e.count):
if abs(peaks[j][1]) > abs(p):
ok = False
j -= 1
# check up to 'e.count' rows after this one
j = i+1
while ok and j < len(peaks) and peaks[j][0] < (n + e.count):
if abs(peaks[j][1]) > abs(p):
ok = False
j += 1
if ok:
good_peak_locations.append(n)
# Now look at all good peaks
for row in good_peak_locations:
# Correlation for each column must be close enough to 1.
for (corr, scale) in zip(corrs, e.scale):
# The accepted distance from 1 is based on the relative
# amplitude of the column. Use a linear mapping:
# scale 1.0 -> distance 0.1
# scale 0.0 -> distance 1.0
distance = 1 - 0.9 * (scale / e.scale[biggest])
if abs(corr[row] - 1) > distance:
# No match
break
else:
# Successful match
matches[row].append(e)
# Insert matches into destination stream.
matched_rows = sorted(matches.keys())
out = np.zeros((len(matched_rows), dest_count + 1))
for n, row in enumerate(matched_rows):
# Fill timestamp
out[n][0] = data[row, 0]
# Mark matched exemplars
for exemplar in matches[row]:
out[n, exemplar.dest_column + 1] = 1.0
# Insert it
insert_func(out)
# Return how many rows we processed
valid = max(valid, 0)
printf(" [%s] matched %d exemplars in %d rows\n",
timestamp_to_short_human(data[0][0]), np.sum(out[:,1:]), valid)
return valid
def trainola(conf):
print "Trainola", nilmtools.__version__
# Load main stream data
url = conf['url']
src_path = conf['stream']
dest_path = conf['dest_stream']
start = conf['start']
end = conf['end']
# Get info for the src and dest streams
src_client = nilmdb.client.numpyclient.NumpyClient(url)
src = nilmtools.filter.get_stream_info(src_client, src_path)
if not src:
raise DataError("source path '" + src_path + "' does not exist")
src_columns = build_column_mapping(conf['columns'], src)
dest_client = nilmdb.client.numpyclient.NumpyClient(url)
dest = nilmtools.filter.get_stream_info(dest_client, dest_path)
if not dest:
raise DataError("destination path '" + dest_path + "' does not exist")
printf("Source:\n")
printf(" %s [%s]\n", src.path, ",".join(src_columns.keys()))
printf("Destination:\n")
printf(" %s (%s columns)\n", dest.path, dest.layout_count)
# Pull in the exemplar data
exemplars = []
for n, exinfo in enumerate(conf['exemplars']):
printf("Loading exemplar %d:\n", n)
e = Exemplar(exinfo)
col = e.dest_column
if col < 0 or col >= dest.layout_count:
raise DataError(sprintf("bad destination column number %d\n" +
"dest stream only has 0 through %d",
col, dest.layout_count - 1))
printf(" %s, output column %d\n", str(e), col)
exemplars.append(e)
if len(exemplars) == 0:
raise DataError("missing exemplars")
# Verify that the exemplar columns are all represented in the main data
for n, ex in enumerate(exemplars):
for col in ex.columns:
if col not in src_columns:
raise DataError(sprintf("Exemplar %d column %s is not "
"available in source data", n, col))
# Figure out which intervals we should process
intervals = ( Interval(s, e) for (s, e) in
src_client.stream_intervals(src_path,
diffpath = dest_path,
start = start, end = end) )
intervals = nilmdb.utils.interval.optimize(intervals)
# Do the processing
rows = 100000
extractor = functools.partial(src_client.stream_extract_numpy,
src.path, layout = src.layout, maxrows = rows)
inserter = functools.partial(dest_client.stream_insert_numpy_context,
dest.path)
start = time.time()
processed_time = 0
printf("Processing intervals:\n")
for interval in intervals:
printf("%s\n", interval.human_string())
nilmtools.filter.process_numpy_interval(
interval, extractor, inserter, rows * 3,
trainola_matcher, (src_columns, dest.layout_count, exemplars))
processed_time += (timestamp_to_seconds(interval.end) -
timestamp_to_seconds(interval.start))
elapsed = max(time.time() - start, 1e-3)
printf("Done. Processed %.2f seconds per second.\n",
processed_time / elapsed)
def main(argv = None):
import simplejson as json
import sys
if argv is None:
argv = sys.argv[1:]
if len(argv) != 1:
raise DataError("need one argument, either a dictionary or JSON string")
try:
# Passed in a JSON string (e.g. on the command line)
conf = json.loads(argv[0])
except TypeError as e:
# Passed in the config dictionary (e.g. from NilmRun)
conf = argv[0]
return trainola(conf)
if __name__ == "__main__":
main()

View File

@@ -61,14 +61,27 @@ setup(name='nilmtools',
long_description = "NILM Database Tools",
license = "Proprietary",
author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.3.0',
install_requires = [ 'nilmdb >= 1.8.5',
'numpy',
'scipy',
'python-daemon >= 1.5',
#'matplotlib',
],
packages = [ 'nilmtools',
],
entry_points = {
'console_scripts': [
'nilm-decimate = nilmtools.decimate:main',
'nilm-decimate-auto = nilmtools.decimate_auto:main',
'nilm-insert = nilmtools.insert:main',
'nilm-copy = nilmtools.copy_one:main',
'nilm-prep = nilmtools.prep:main',
'nilm-copy-wildcard = nilmtools.copy_wildcard:main',
'nilm-sinefit = nilmtools.sinefit:main',
'nilm-cleanup = nilmtools.cleanup:main',
'nilm-median = nilmtools.median:main',
'nilm-trainola = nilmtools.trainola:main',
'nilm-pipewatch = nilmtools.pipewatch:main',
],
},
zip_safe = False,