Compare commits

...

70 Commits

Author SHA1 Message Date
f530edd8a0 sfit4: if interpolated DFT fails, use peak 2013-08-16 15:36:39 -04:00
4d946bee79 Set shell and path in sample cron script 2013-08-16 15:36:20 -04:00
13ceb91999 Add test_sinefit makefile target 2013-08-16 15:36:11 -04:00
dab9625296 Run fsck at startup 2013-08-09 16:03:14 -04:00
3e7527ab57 Support -h and -v in nilm-trainola 2013-08-08 16:30:08 -04:00
31b6d82dfc Make 'make test' do nothing from command line 2013-08-07 20:19:39 -04:00
077010ba3a Store nshift in prep metadata 2013-08-07 20:19:28 -04:00
77751a8529 Fix typo in help text 2013-08-07 18:39:19 -04:00
9c711300a2 Add short form of --force-metadata, -F 2013-08-06 23:07:36 -04:00
74cf34e2ad Update sharon cleanup.cfg 2013-08-06 22:48:06 -04:00
120bf58b85 Support --nometa option for copy_one and copy_wildcard 2013-08-06 22:47:16 -04:00
c26daa9a3b Update crontab 2013-08-03 11:23:43 -04:00
6993f5c886 Fix process termination in nilm-pipewatch 2013-08-03 11:13:30 -04:00
Sharon NILM
dd69f3e51d Update process.sh 2013-08-02 23:19:14 -04:00
dc26e32b6e Make interhost, force_metadata private to Filter 2013-08-02 23:14:19 -04:00
981f23ff14 Better documentation for callback function 2013-08-02 23:14:19 -04:00
492445a469 Split off useful math functions to math.py 2013-08-02 17:27:39 -04:00
33c3586bea trainola: suppress peaks if larger ones are nearby
Might fix the problem Mark noticed where turn-off transients
are erroneously matching the drop that follows startup transients.
2013-07-31 19:12:16 -04:00
c1e0f8ffbc Fix bug in copy_one 2013-07-31 14:47:16 -04:00
d2853bdb0e Add test case for bad trainola detections 2013-07-30 20:35:54 -04:00
a4d4bc22fc Add --skip option to nilm-insert 2013-07-30 18:25:47 -04:00
6090dd6112 prep: only process intervals present in both raw & sinefit 2013-07-30 14:55:06 -04:00
Sharon NILM
9c0d9ad324 Sample scripts from Sharon 2013-07-29 18:37:55 -04:00
Sharon NILM
8b9c5d4898 Fix daemon dependency 2013-07-29 17:40:51 -04:00
cf2c28b0fb Add --daemon flag 2013-07-29 17:16:18 -04:00
87a26c907b Watch for process termination too 2013-07-29 15:08:49 -04:00
def465b57c Improve pipewatch; add nilm-pipewatch script 2013-07-29 14:58:15 -04:00
0589b8d316 start of pipewatch util 2013-07-29 14:10:56 -04:00
9c5f07106d Don't need python-pip 2013-07-20 16:15:29 -04:00
62e11a11c0 Fix issue with column ordering in the exemplars
If the max scale in the exemplar was a column we weren't using, it
would bail out when looking for that correlation later.  Change things
around so exemplars in RAM only keep around the columns we care about.
2013-07-18 22:51:27 -04:00
2bdcee2c36 More helpful error if exemplar stream doesn't exist 2013-07-15 15:19:52 -04:00
6dce8c5296 More output 2013-07-11 18:56:53 -04:00
25c35a56f6 Trainola inserts into the destination stream now 2013-07-10 12:59:39 -04:00
d610deaef0 More trainola work 2013-07-10 11:38:32 -04:00
d7d5ccc9a7 More filter cleanup 2013-07-09 19:27:20 -04:00
f28753ff5c Move process_numpy_interval outside the class 2013-07-09 18:40:49 -04:00
c9c2e0d5a8 Improve split between process_numpy and process_numpy_interval 2013-07-09 18:09:05 -04:00
5a2a32bec5 WIP on trainola improvements 2013-07-09 17:56:26 -04:00
706c3933f9 Add trainola from nilmrun 2013-07-09 17:55:57 -04:00
cfd1719152 Use nilmdb.utils.interval.optimize; bump nilmdb min version 2013-07-09 17:53:04 -04:00
c62fb45980 Makefile cleanup; add nilm-trainola binary 2013-07-09 16:53:47 -04:00
57d856f2fa Split filter.py internals up a little more
This makes it easier to use the filter stuff from other code, but it's
also turning it into more of a spaghetti nightmare.  Might not be
worth continuing down this path.
2013-07-09 16:52:00 -04:00
5d83d93019 Rename src/ directory to nilmtools/ 2013-07-08 11:54:13 -04:00
5f847a0513 Split process_numpy innards process_numpy_interval 2013-07-03 12:07:22 -04:00
29cd7eb6c7 Improve test_prep target in Makefile 2013-07-03 12:06:50 -04:00
62c8af41ea Cleanup comments 2013-06-06 15:34:23 -04:00
4f6bc48619 sinefit: include timestamps on marking output too 2013-05-11 11:00:31 -04:00
cf9eb0ed48 Improve sinefit resiliancy 2013-05-10 14:19:55 -04:00
32066fc260 Remove hard matplotlib dependency 2013-05-09 13:17:36 -04:00
739da3f973 Add median filter 2013-05-08 23:36:50 -04:00
83ad18ebf6 Fix non-string arguments to metadata_check 2013-05-08 12:49:38 -04:00
c76d527f95 Fix unicode handling in filter metadata match 2013-05-07 12:40:53 -04:00
b8a73278e7 Always store metadata rotation as a string 2013-04-29 14:25:11 -04:00
ce0691d6c4 sineefit: Change sfit4 to fit to \sin instead of \cos
And adjust the period locator accordingly.
Fitting \sin is the same mathematically, it's just conceptually more
straightforward since we're locating zero crossings anyway.
2013-04-27 18:12:20 -04:00
4da658e960 sinefit: move initial estimate into the main iteration loop
Just a little less code.  Same results.
2013-04-27 17:50:23 -04:00
8ab31eafc2 Allow shorthand method for creating an option-less parser.
This is mostly just intended to make a simple filter example shorter.
2013-04-21 16:53:28 -04:00
979ab13bff Force fs to be a float in sfit4 2013-04-17 17:58:15 -04:00
f4fda837ae Bump required nilmdb version to 1.6.0 2013-04-11 11:55:11 -04:00
5547d266d0 filter: Don't include trailing unprocessed data in the inserted intervals 2013-04-11 11:53:17 -04:00
372e977e4a Reverse cleanup order to handle interruptions better 2013-04-10 18:38:41 -04:00
640a680704 Increase default min amplitude in sinefit 2013-04-10 17:09:52 -04:00
2e74e6cd63 Skip over data if we aren't able to process any. Change output format 2013-04-10 17:01:07 -04:00
de2a794e00 Support wildcards in nilm-decimate-auto 2013-04-10 16:05:16 -04:00
065a40f265 sinefit: add minimum amplitude check 2013-04-10 15:33:51 -04:00
65fa43aff1 sinefit: catch all errors in sfit4 2013-04-10 14:36:50 -04:00
57c23c3792 sinefit: allow user to override min/max frequency detection 2013-04-10 14:36:40 -04:00
d4c8e4acb4 Include rotation in metadata 2013-04-10 14:36:05 -04:00
fd1b33401f Require a --yes argument before actually cleaning data 2013-04-09 20:13:38 -04:00
4c748ec00c Fix minor bugs 2013-04-09 20:08:25 -04:00
b72d6b6908 Warn if column count is wrong for this nharm value 2013-04-09 19:59:59 -04:00
26 changed files with 1335 additions and 361 deletions

View File

@@ -1,29 +1,51 @@
#URL="http://bucket.mit.edu:8080/nilmdb"
URL="http://localhost/nilmdb"
all:
all: test
test:
ifeq ($(INSIDE_EMACS), t)
@make test
@make test_sinefit
else
@echo "Try 'make install'"
@echo 'No test suite for nilmtools. Try "make install"'
endif
test: test_cleanup
test_pipewatch:
nilmtools/pipewatch.py -t 3 "seq 10 20" "seq 20 30"
test_trainola:
-nilmtool -u http://bucket/nilmdb remove -s min -e max \
/sharon/prep-a-matches
nilmtools/trainola.py "$$(cat extras/trainola-test-param.js)"
test_trainola2:
-nilmtool -u http://bucket/nilmdb remove -s min -e max \
/sharon/prep-a-matches
nilmtools/trainola.py "$$(cat extras/trainola-test-param-2.js)"
test_trainola3:
-nilmtool -u "http://bucket/nilmdb" destroy -R /test/jim
nilmtool -u "http://bucket/nilmdb" create /test/jim uint8_3
nilmtools/trainola.py "$$(cat extras/trainola-test-param-3.js)"
nilmtool -u "http://bucket/nilmdb" extract /test/jim -s min -e max
test_cleanup:
src/cleanup.py -e extras/cleanup.cfg
src/cleanup.py -D extras/cleanup.cfg
nilmtools/cleanup.py -e extras/cleanup.cfg
nilmtools/cleanup.py extras/cleanup.cfg
test_insert:
@make install >/dev/null
src/insert.py --file --dry-run /test/foo </dev/null
nilmtools/insert.py --skip --file --dry-run /foo/bar ~/data/20130311T2100.prep1.gz ~/data/20130311T2100.prep1.gz ~/data/20130311T2200.prep1.gz
test_copy:
@make install >/dev/null
src/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees*
nilmtools/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees*
test_prep:
@make install >/dev/null
/tmp/raw.dat:
octave --eval 'fs = 8000;' \
--eval 't = (0:fs*10)*2*pi*60/fs;' \
--eval 'raw = transpose([sin(t); 0.3*sin(3*t)+sin(t)]);' \
--eval 'save("-ascii","/tmp/raw.dat","raw");'
test_prep: /tmp/raw.dat
-nilmtool destroy -R /test/raw
-nilmtool destroy -R /test/sinefit
-nilmtool destroy -R /test/prep
@@ -31,17 +53,26 @@ test_prep:
nilmtool create /test/sinefit float32_3
nilmtool create /test/prep float32_8
nilmtool insert -s '@0' -t -r 8000 /test/raw /tmp/raw.dat
src/sinefit.py -c 1 /test/raw /test/sinefit
src/prep.py -c 2 /test/raw /test/sinefit /test/prep
nilmtools/sinefit.py -a 0.5 -c 1 -s '@0' -e '@5000000' /test/raw /test/sinefit
nilmtools/prep.py -c 2 /test/raw /test/sinefit /test/prep
nilmtools/prep.py -c 2 /test/raw /test/sinefit /test/prep
nilmtool extract -s min -e max /test/prep | head -20
test_sinefit:
make install >/dev/null 2>&1
-nilmtool destroy -R /test/sinefit
nilmtool create /test/sinefit float32_3
nilmtools/sinefit.py -c 5 -s '2013/03/25 09:11:00' \
-e '2013/03/25 10:11:00' /sharon/raw /test/sinefit
nilmtool extract -s min -e max /test/sinefit | head -20
test_decimate:
-@nilmtool destroy /lees-compressor/no-leak/raw/4 || true
-@nilmtool destroy /lees-compressor/no-leak/raw/16 || true
-@nilmtool create /lees-compressor/no-leak/raw/4 float32_18 || true
-@nilmtool create /lees-compressor/no-leak/raw/16 float32_18 || true
time python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
python src/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
time python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/1 /lees-compressor/no-leak/raw/4
python nilmtools/decimate.py -s '2013-02-04 18:10:00' -e '2013-02-04 18:11:00' /lees-compressor/no-leak/raw/4 /lees-compressor/no-leak/raw/16
version:
python setup.py version
@@ -63,4 +94,4 @@ clean::
gitclean::
git clean -dXf
.PHONY: all version dist sdist install clean gitclean
.PHONY: all version dist sdist install clean gitclean test

View File

@@ -6,9 +6,9 @@ Prerequisites:
# Runtime and build environments
sudo apt-get install python2.7 python2.7-dev python-setuptools
sudo apt-get install python-numpy python-scipy python-matplotlib
sudo apt-get install python-numpy python-scipy python-daemon
nilmdb (1.5.0+)
nilmdb (1.8.5+)
Install:

View File

@@ -0,0 +1,10 @@
#!/bin/bash
# Start the ethstream capture using nilm-pipewatch
# Bail out on errors
set -e
nilm-pipewatch --daemon --lock "/tmp/nilmdb-capture.lock" --timeout 30 \
"ethstream -a 192.168.1.209 -n 9 -r 8000 -N" \
"nilm-insert -m 10 -r 8000 --live /sharon/raw"

View File

@@ -0,0 +1,9 @@
[/sharon/prep-*]
keep = 1y
[/sharon/raw]
keep = 2w
[/sharon/sinefit]
keep = 1y
decimated = false

View File

@@ -0,0 +1,15 @@
# Install this by running "crontab crontab" (will replace existing crontab)
SHELL=/bin/bash
PATH=/usr/local/bin:/usr/local/sbin:/usr/bin:/usr/sbin:/bin:/sbin
# m h dom mon dow cmd
# Run NilmDB processing every 5 minutes
*/5 * * * * chronic /home/nilm/data/process.sh
# Try frequently restarting the capture process in case it died
*/5 * * * * chronic /home/nilm/data/capture.sh
# Run fsck at startup
@reboot chronic nilmdb-fsck --fix --no-data /home/nilm/data/db/

View File

@@ -0,0 +1,32 @@
#!/bin/bash
# Run all necessary processing on NilmDB data.
# Bail out on errors
set -e
# Ensure only one copy of this code runs at a time:
LOCKFILE="/tmp/nilmdb-process.lock"
exec 99>"$LOCKFILE"
if ! flock -n -x 99 ; then
echo "NilmDB processing already running, giving up..."
exit 0
fi
trap 'rm -f "$LOCKFILE"' 0
# redirect stdout/stderr to log, but keep it on the console too
exec > >(tee /home/nilm/data/process.log)
exec 2> >(tee -a /home/nilm/data/process.log >&2)
echo "sinefit on phase A voltage"
nilm-sinefit -c 5 /sharon/raw /sharon/sinefit
echo "prep on A, B, C with appropriate rotations"
nilm-prep -c 1 -r 0 /sharon/raw /sharon/sinefit /sharon/prep-a
nilm-prep -c 2 -r 120 /sharon/raw /sharon/sinefit /sharon/prep-b
nilm-prep -c 3 -r 240 /sharon/raw /sharon/sinefit /sharon/prep-c
echo "decimate raw and prep data"
nilm-decimate-auto /sharon/raw /sharon/prep*
echo "run cleanup"
nilm-cleanup --yes /home/nilm/data/cleanup.cfg

View File

@@ -0,0 +1,29 @@
{ "columns" : [ { "index" : 0, "name" : "P1" },
{ "index" : 1, "name" : "Q1" },
{ "index" : 2, "name" : "P3" } ],
"stream" : "/sharon/prep-a",
"url" : "http://bucket.mit.edu/nilmdb",
"dest_stream" : "/sharon/prep-a-matches",
"start" : 1365153062643133.5,
"end" : 1365168814443575.5,
"exemplars" : [ { "columns" : [ { "index" : 0,
"name" : "P1"
} ],
"dest_column" : 0,
"end" : 1365073657682000,
"name" : "Turn ON",
"start" : 1365073654321000,
"stream" : "/sharon/prep-a",
"url" : "http://bucket.mit.edu/nilmdb"
},
{ "columns" : [ { "index" : 2, "name" : "P3" },
{ "index" : 0, "name" : "P1" } ],
"dest_column" : 1,
"end" : 1365176528818000,
"name" : "Type 2 turn ON",
"start" : 1365176520030000,
"stream" : "/sharon/prep-a",
"url" : "http://bucket.mit.edu/nilmdb"
}
]
}

View File

@@ -0,0 +1,40 @@
{
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"dest_stream": "/test/jim",
"start": 1364184839901599,
"end": 1364184942407610.2,
"columns": [ { "index": 0, "name": "P1" } ],
"exemplars": [
{
"name": "A - True DBL Freezer ON",
"dest_column": 0,
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"columns": [ { "index": 0, "name": "P1" } ],
"start": 1365277707649000,
"end": 1365277710705000
},
{
"name": "A - Boiler 1 Fan OFF",
"dest_column": 1,
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"columns": [ { "index": 0, "name": "P1" } ],
"start": 1364188370735000,
"end": 1364188373819000
},
{
"name": "A - True DBL Freezer OFF",
"dest_column": 2,
"url": "http://bucket/nilmdb",
"stream": "/sharon/prep-a",
"columns": [ { "index": 0, "name": "P1" } ],
"start": 1365278087982000,
"end": 1365278089340000
}
]
}

View File

@@ -0,0 +1,31 @@
{ "url": "http://bucket.mit.edu/nilmdb",
"dest_stream": "/sharon/prep-a-matches",
"stream": "/sharon/prep-a",
"start": 1366111383280463,
"end": 1366126163457797,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 },
{ "name": "P3", "index": 2 } ],
"exemplars": [
{ "name": "Boiler Pump ON",
"url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366260494269078,
"end": 1366260608185031,
"dest_column": 0,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
},
{ "name": "Boiler Pump OFF",
"url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a",
"start": 1366260864215764,
"end": 1366260870882998,
"dest_column": 1,
"columns": [ { "name": "P1", "index": 0 },
{ "name": "Q1", "index": 1 }
]
}
]
}

View File

View File

@@ -181,7 +181,7 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False)
tag_prefix = "nilmtools-"
parentdir_prefix = "nilmtools-"
versionfile_source = "src/_version.py"
versionfile_source = "nilmtools/_version.py"
def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
variables = { "refnames": git_refnames, "full": git_full }

View File

@@ -19,7 +19,7 @@ def warn(msg, *args):
fprintf(sys.stderr, "warning: " + msg + "\n", *args)
class TimePeriod(object):
_units = { 'h': ('hour', 60*60*24),
_units = { 'h': ('hour', 60*60),
'd': ('day', 60*60*24),
'w': ('week', 60*60*24*7),
'm': ('month', 60*60*24*30),
@@ -96,9 +96,9 @@ def main(argv = None):
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument("-D", "--dry-run", action="store_true",
parser.add_argument("-y", "--yes", action="store_true",
default = False,
help="Don't actually remove any data")
help="Actually remove the data (default: no)")
parser.add_argument("-e", "--estimate", action="store_true",
default = False,
help="Estimate how much disk space will be used")
@@ -228,7 +228,7 @@ def main(argv = None):
keep = seconds_to_timestamp(streams[path].keep.seconds())
for i in intervals:
total += i.end - i.start
if total < keep:
if total <= keep:
continue
remove_before = i.start + (total - keep)
break
@@ -238,14 +238,19 @@ def main(argv = None):
timestamp_to_seconds(total)))
continue
printf(" removing data before %s\n", timestamp_to_human(remove_before))
if not args.dry_run:
client.stream_remove(path, None, remove_before)
for ap in streams[path].also_clean_paths:
printf(" also removing from %s\n", ap)
if not args.dry_run:
client.stream_remove(ap, None, remove_before)
# Clean in reverse order. Since we only use the primary stream and not
# the decimated streams to figure out which data to remove, removing
# the primary stream last means that we might recover more nicely if
# we are interrupted and restarted.
clean_paths = list(reversed(streams[path].also_clean_paths)) + [ path ]
for p in clean_paths:
printf(" removing from %s\n", p)
if args.yes:
client.stream_remove(p, None, remove_before)
# All done
if not args.yes:
printf("Note: specify --yes to actually perform removals\n")
return
if __name__ == "__main__":

View File

@@ -12,6 +12,8 @@ import sys
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Copy a stream")
parser.add_argument('-n', '--nometa', action='store_true',
help="Don't copy or check metadata")
# Parse arguments
try:
@@ -25,6 +27,7 @@ def main(argv = None):
raise SystemExit(1)
# Copy metadata
if not args.nometa:
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata(meta)
@@ -32,7 +35,7 @@ def main(argv = None):
extractor = NumpyClient(f.src.url).stream_extract_numpy
inserter = NumpyClient(f.dest.url).stream_insert_numpy_context
for i in f.intervals():
print "Processing", f.interval_string(i)
print "Processing", i.human_string()
with inserter(f.dest.path, i.start, i.end) as insert_ctx:
for data in extractor(f.src.path, i.start, i.end):
insert_ctx.insert(data)

View File

@@ -16,6 +16,8 @@ def main(argv = None):
Example: %(prog)s -u http://host1/nilmdb -U http://host2/nilmdb /sharon/*
""", skip_paths = True)
parser.add_argument('-n', '--nometa', action='store_true',
help="Don't copy or check metadata")
parser.add_argument("path", action="store", nargs="+",
help='Wildcard paths to copy')
args = parser.parse_args(argv)
@@ -56,6 +58,8 @@ def main(argv = None):
new_argv.extend(["--end", "@" + repr(args.end)])
if args.dry_run:
new_argv.extend(["--dry-run"])
if args.nometa:
new_argv.extend(["--nometa"])
if args.force_metadata:
new_argv.extend(["--force-metadata"])
new_argv.extend([stream[0], stream[0]])

View File

View File

@@ -4,39 +4,59 @@ import nilmtools.filter
import nilmtools.decimate
import nilmdb.client
import argparse
import fnmatch
def main(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.RawDescriptionHelpFormatter,
version = "1.0",
version = nilmtools.__version__,
description = """\
Automatically create multiple decimations from a single source
stream, continuing until the last decimated level contains fewer
than 500 points total.
Wildcards and multiple paths are accepted. Decimated paths are
ignored when matching wildcards.
""")
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument('-f', '--factor', action='store', default=4, type=int,
parser.add_argument("-f", "--factor", action="store", default=4, type=int,
help='Decimation factor (default: %(default)s)')
parser.add_argument("--force-metadata", action="store_true",
parser.add_argument("-F", "--force-metadata", action="store_true",
default = False,
help="Force metadata changes if the dest "
"doesn't match")
parser.add_argument("path", action="store",
parser.add_argument("path", action="store", nargs='+',
help='Path of base stream')
args = parser.parse_args(argv)
# Pull out info about the base stream
client = nilmdb.client.Client(args.url)
info = nilmtools.filter.get_stream_info(client, args.path)
if not info:
raise Exception("path " + args.path + " not found")
# Find list of paths to process
streams = [ unicode(s[0]) for s in client.stream_list() ]
streams = [ s for s in streams if "~decim-" not in s ]
paths = []
for path in args.path:
new = fnmatch.filter(streams, unicode(path))
if not new:
print "error: no stream matched path:", path
raise SystemExit(1)
paths.extend(new)
meta = client.stream_get_metadata(args.path)
for path in paths:
do_decimation(client, args, path)
def do_decimation(client, args, path):
print "Decimating", path
info = nilmtools.filter.get_stream_info(client, path)
if not info:
raise Exception("path " + path + " not found")
meta = client.stream_get_metadata(path)
if "decimate_source" in meta:
print "Stream", args.path, "was decimated from", meta["decimate_source"]
print "Stream", path, "was decimated from", meta["decimate_source"]
print "You need to pass the base stream instead"
raise SystemExit(1)
@@ -53,7 +73,7 @@ def main(argv = None):
if info.rows <= 500:
break
factor *= args.factor
new_path = "%s~decim-%d" % (args.path, factor)
new_path = "%s~decim-%d" % (path, factor)
# Create the stream if needed
new_info = nilmtools.filter.get_stream_info(client, new_path)
@@ -72,5 +92,7 @@ def main(argv = None):
# Update info using the newly decimated stream
info = nilmtools.filter.get_stream_info(client, new_path)
return
if __name__ == "__main__":
main()

View File

@@ -19,6 +19,10 @@ import re
import argparse
import numpy as np
import cStringIO
import functools
class ArgumentError(Exception):
pass
class MissingDestination(Exception):
def __init__(self, args, src, dest):
@@ -65,9 +69,101 @@ def get_stream_info(client, path):
return None
return StreamInfo(client.geturl(), streams[0])
# Filter processing for a single interval of data.
def process_numpy_interval(interval, extractor, inserter, warn_rows,
function, args = None):
"""For the given 'interval' of data, extract data, process it
through 'function', and insert the result.
'extractor' should be a function like NumpyClient.stream_extract_numpy
but with the the interval 'start' and 'end' as the only parameters,
e.g.:
extractor = functools.partial(NumpyClient.stream_extract_numpy,
src_path, layout = l, maxrows = m)
'inserter' should be a function like NumpyClient.stream_insert_context
but with the interval 'start' and 'end' as the only parameters, e.g.:
inserter = functools.partial(NumpyClient.stream_insert_context,
dest_path)
If 'warn_rows' is not None, print a warning to stdout when the
number of unprocessed rows exceeds this amount.
See process_numpy for details on 'function' and 'args'.
"""
if args is None:
args = []
with inserter(interval.start, interval.end) as insert_ctx:
insert_func = insert_ctx.insert
old_array = np.array([])
for new_array in extractor(interval.start, interval.end):
# If we still had old data left, combine it
if old_array.shape[0] != 0:
array = np.vstack((old_array, new_array))
else:
array = new_array
# Pass the data to the user provided function
processed = function(array, interval, args, insert_func, False)
# Send any pending data that the user function inserted
insert_ctx.send()
# Save the unprocessed parts
if processed >= 0:
old_array = array[processed:]
else:
raise Exception(
sprintf("%s return value %s must be >= 0",
str(function), str(processed)))
# Warn if there's too much data remaining
if warn_rows is not None and old_array.shape[0] > warn_rows:
printf("warning: %d unprocessed rows in buffer\n",
old_array.shape[0])
# Last call for this contiguous interval
if old_array.shape[0] != 0:
processed = function(old_array, interval, args,
insert_func, True)
if processed != old_array.shape[0]:
# Truncate the interval we're inserting at the first
# unprocessed data point. This ensures that
# we'll not miss any data when we run again later.
insert_ctx.update_end(old_array[processed][0])
def example_callback_function(data, interval, args, insert_func, final):
"""Example of the signature for the function that gets passed
to process_numpy_interval.
'data': array of data to process -- may be empty
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
If unprocessed data remains after 'final' is True, the interval
being inserted will be ended at the timestamp of the first
unprocessed data point.
"""
raise NotImplementedError("example_callback_function does nothing")
class Filter(object):
def __init__(self):
def __init__(self, parser_description = None):
self._parser = None
self._client_src = None
self._client_dest = None
@@ -76,8 +172,11 @@ class Filter(object):
self.dest = None
self.start = None
self.end = None
self.interhost = False
self.force_metadata = False
self._interhost = False
self._force_metadata = False
if parser_description is not None:
self.setup_parser(parser_description)
self.parse_args()
@property
def client_src(self):
@@ -107,7 +206,7 @@ class Filter(object):
default = False,
help="Just print intervals that would be "
"processed")
group.add_argument("--force-metadata", action="store_true",
group.add_argument("-F", "--force-metadata", action="store_true",
default = False,
help="Force metadata changes if the dest "
"doesn't match")
@@ -131,68 +230,57 @@ class Filter(object):
self._parser = parser
return parser
def interval_string(self, interval):
return sprintf("[ %s -> %s ]",
timestamp_to_human(interval.start),
timestamp_to_human(interval.end))
def set_args(self, url, dest_url, srcpath, destpath, start, end,
parsed_args = None, quiet = True):
"""Set arguments directly from parameters"""
if dest_url is None:
dest_url = url
if url != dest_url:
self._interhost = True
self._client_src = Client(url)
self._client_dest = Client(dest_url)
if (not self._interhost) and (srcpath == destpath):
raise ArgumentError("source and destination path must be different")
# Open the streams
self.src = get_stream_info(self._client_src, srcpath)
if not self.src:
raise ArgumentError("source path " + srcpath + " not found")
self.dest = get_stream_info(self._client_dest, destpath)
if not self.dest:
raise MissingDestination(parsed_args, self.src,
StreamInfo(dest_url, [destpath]))
self.start = start
self.end = end
# Print info
if not quiet:
print "Source:", self.src.string(self._interhost)
print " Dest:", self.dest.string(self._interhost)
def parse_args(self, argv = None):
"""Parse arguments from a command line"""
args = self._parser.parse_args(argv)
if args.dest_url is None:
args.dest_url = args.url
if args.url != args.dest_url:
self.interhost = True
self._client_src = Client(args.url)
self._client_dest = Client(args.dest_url)
if (not self.interhost) and (args.srcpath == args.destpath):
self._parser.error("source and destination path must be different")
# Open and print info about the streams
self.src = get_stream_info(self._client_src, args.srcpath)
if not self.src:
self._parser.error("source path " + args.srcpath + " not found")
self.dest = get_stream_info(self._client_dest, args.destpath)
if not self.dest:
raise MissingDestination(args, self.src,
StreamInfo(args.dest_url, [args.destpath]))
print "Source:", self.src.string(self.interhost)
print " Dest:", self.dest.string(self.interhost)
self.set_args(args.url, args.dest_url, args.srcpath, args.destpath,
args.start, args.end, quiet = False, parsed_args = args)
self._force_metadata = args.force_metadata
if args.dry_run:
for interval in self.intervals():
print self.interval_string(interval)
print interval.human_string()
raise SystemExit(0)
self.force_metadata = args.force_metadata
self.start = args.start
self.end = args.end
return args
def _optimize_int(self, it):
"""Join and yield adjacent intervals from the iterator 'it'"""
saved_int = None
for interval in it:
if saved_int is not None:
if saved_int.end == interval.start:
interval.start = saved_int.start
else:
yield saved_int
saved_int = interval
if saved_int is not None:
yield saved_int
def intervals(self):
"""Generate all the intervals that this filter should process"""
self._using_client = True
if self.interhost:
if self._interhost:
# Do the difference ourselves
s_intervals = ( Interval(start, end)
for (start, end) in
@@ -214,12 +302,13 @@ class Filter(object):
self.src.path, diffpath = self.dest.path,
start = self.start, end = self.end) )
# Optimize intervals: join intervals that are adjacent
for interval in self._optimize_int(intervals):
for interval in nilmdb.utils.interval.optimize(intervals):
yield interval
self._using_client = False
# Misc helpers
def arg_time(self, toparse):
@staticmethod
def arg_time(toparse):
"""Parse a time string argument"""
try:
return nilmdb.utils.time.parse_time(toparse)
@@ -228,13 +317,20 @@ class Filter(object):
str(e), toparse))
def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. If
there's no conflict, update the metadata to match 'data'."""
"""See if the metadata jives, and complain if it doesn't. For
each key in data, if the stream contains the key, it must match
values. If the stream does not contain the key, it is created."""
metadata = self._client_dest.stream_get_metadata(self.dest.path)
if not self.force_metadata:
if not self._force_metadata:
for key in data:
wanted = str(data[key])
wanted = data[key]
if not isinstance(wanted, basestring):
wanted = str(wanted)
val = metadata.get(key, wanted)
# Force UTF-8 encoding for comparison and display
wanted = wanted.encode('utf-8')
val = val.encode('utf-8')
key = key.encode('utf-8')
if val != wanted and self.dest.rows > 0:
m = "Metadata in destination stream:\n"
m += " %s = %s\n" % (key, val)
@@ -249,77 +345,36 @@ class Filter(object):
self._client_dest.stream_update_metadata(self.dest.path, data)
# The main filter processing method.
def process_numpy(self, function, args = None, rows = 100000):
"""For all intervals that exist in self.src but don't exist in
self.dest, call 'function' with a Numpy array corresponding to
the data. The data is converted to a Numpy array in chunks of
'rows' rows at a time.
def process_numpy(self, function, args = None, rows = 100000,
intervals = None):
"""Calls process_numpy_interval for each interval that currently
exists in self.src, but doesn't exist in self.dest. It will
process the data in chunks as follows:
'function' should be defined as:
def function(data, interval, args, insert_func, final)
For each chunk of data, call 'function' with a Numpy array
corresponding to the data. The data is converted to a Numpy
array in chunks of 'rows' rows at a time.
'data': array of data to process -- may be empty
If 'intervals' is not None, process those intervals instead of
the default list.
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
'function' should be defined with the same interface as
nilmtools.filter.example_callback_function. See the
documentation of that for details. 'args' are passed to
'function'.
"""
if args is None:
args = []
extractor = NumpyClient(self.src.url).stream_extract_numpy
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context
for interval in self.intervals():
print "Processing", self.interval_string(interval)
with inserter(self.dest.path,
interval.start, interval.end) as insert_ctx:
insert_function = insert_ctx.insert
old_array = np.array([])
for new_array in extractor(self.src.path,
interval.start, interval.end,
extractor_func = functools.partial(extractor, self.src.path,
layout = self.src.layout,
maxrows = rows):
# If we still had old data left, combine it
if old_array.shape[0] != 0:
array = np.vstack((old_array, new_array))
else:
array = new_array
maxrows = rows)
inserter_func = functools.partial(inserter, self.dest.path)
# Pass it to the process function
processed = function(array, interval, args,
insert_function, False)
# Send any pending data
insert_ctx.send()
# Save the unprocessed parts
if processed >= 0:
old_array = array[processed:]
else:
raise Exception(
sprintf("%s return value %s must be >= 0",
str(function), str(processed)))
# Warn if there's too much data remaining
if old_array.shape[0] > 3 * rows:
printf("warning: %d unprocessed rows in buffer\n",
old_array.shape[0])
# Last call for this contiguous interval
if old_array.shape[0] != 0:
function(old_array, interval, args, insert_function, True)
for interval in (intervals or self.intervals()):
print "Processing", interval.human_string()
process_numpy_interval(interval, extractor_func, inserter_func,
rows * 3, function, args)
def main(argv = None):
# This is just a dummy function; actual filters can use the other
@@ -328,7 +383,7 @@ def main(argv = None):
parser = f.setup_parser()
args = f.parse_args(argv)
for i in f.intervals():
print "Generic filter: need to handle", f.interval_string(i)
print "Generic filter: need to handle", i.human_string()
if __name__ == "__main__":
main()

View File

@@ -53,7 +53,8 @@ def parse_args(argv = None):
is stepped forward to match 'clock'.
- If 'data' is running ahead, there is overlap in the data, and an
error is raised.
error is raised. If '--skip' is specified, the current file
is skipped instead of raising an error.
"""))
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
@@ -61,6 +62,8 @@ def parse_args(argv = None):
group = parser.add_argument_group("Misc options")
group.add_argument("-D", "--dry-run", action="store_true",
help="Parse files, but don't insert any data")
group.add_argument("-s", "--skip", action="store_true",
help="Skip files if the data would overlap")
group.add_argument("-m", "--max-gap", action="store", default=10.0,
metavar="SEC", type=float,
help="Max discrepency between clock and data "
@@ -235,6 +238,10 @@ def main(argv = None):
"is %s but clock time is only %s",
timestamp_to_human(data_ts),
timestamp_to_human(clock_ts))
if args.skip:
printf("%s\n", err)
printf("Skipping the remainder of this file\n")
break
raise ParseError(filename, err)
if (data_ts + max_gap) < clock_ts:

111
nilmtools/math.py Normal file
View File

@@ -0,0 +1,111 @@
#!/usr/bin/python
# Miscellaenous useful mathematical functions
from nilmdb.utils.printf import *
from numpy import *
from scipy import *
def sfit4(data, fs):
"""(A, f0, phi, C) = sfit4(data, fs)
Compute 4-parameter (unknown-frequency) least-squares fit to
sine-wave data, according to IEEE Std 1241-2010 Annex B
Input:
data vector of input samples
fs sampling rate (Hz)
Output:
Parameters [A, f0, phi, C] to fit the equation
x[n] = A * sin(f0/fs * 2 * pi * n + phi) + C
where n is sample number. Or, as a function of time:
x(t) = A * sin(f0 * 2 * pi * t + phi) + C
by Jim Paris
(Verified to match sfit4.m)
"""
N = len(data)
t = linspace(0, (N-1) / float(fs), N)
## Estimate frequency using FFT (step b)
Fc = fft(data)
F = abs(Fc)
F[0] = 0 # eliminate DC
# Find pair of spectral lines with largest amplitude:
# resulting values are in F(i) and F(i+1)
i = argmax(F[0:int(N/2)] + F[1:int(N/2+1)])
# Interpolate FFT to get a better result (from Markus [B37])
try:
U1 = real(Fc[i])
U2 = real(Fc[i+1])
V1 = imag(Fc[i])
V2 = imag(Fc[i+1])
n = 2 * pi / N
ni1 = n * i
ni2 = n * (i+1)
K = ((V2-V1)*sin(ni1) + (U2-U1)*cos(ni1)) / (U2-U1)
Z1 = V1 * (K - cos(ni1)) / sin(ni1) + U1
Z2 = V2 * (K - cos(ni2)) / sin(ni2) + U2
i = arccos((Z2*cos(ni2) - Z1*cos(ni1)) / (Z2-Z1)) / n
except Exception:
# Just go with the biggest FFT peak
i = argmax(F[0:int(N/2)])
# Convert to Hz
f0 = i * float(fs) / N
# Fit it. We'll catch exceptions here and just returns zeros
# if something fails with the least squares fit, etc.
try:
# first guess for A0, B0 using 3-parameter fit (step c)
s = zeros(3)
w = 2*pi*f0
# Now iterate 7 times (step b, plus 6 iterations of step i)
for idx in range(7):
D = c_[cos(w*t), sin(w*t), ones(N),
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
s = linalg.lstsq(D, data)[0] # eqn B.18
w = w + s[3] # update frequency estimate
## Extract results
A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
f0 = w / (2*pi)
phi = arctan2(s[0], s[1]) # eqn B.22 (flipped for sin instead of cos)
C = s[2]
return (A, f0, phi, C)
except Exception as e:
# something broke down; just return zeros
return (0, 0, 0, 0)
def peak_detect(data, delta = 0.1):
"""Simple min/max peak detection algorithm, taken from my code
in the disagg.m from the 10-8-5 paper.
Returns an array of peaks: each peak is a tuple
(n, p, is_max)
where n is the row number in 'data', and p is 'data[n]',
and is_max is True if this is a maximum, False if it's a minimum,
"""
peaks = [];
cur_min = (None, inf)
cur_max = (None, -inf)
lookformax = False
for (n, p) in enumerate(data):
if p > cur_max[1]:
cur_max = (n, p)
if p < cur_min[1]:
cur_min = (n, p)
if lookformax:
if p < (cur_max[1] - delta):
peaks.append((cur_max[0], cur_max[1], True))
cur_min = (n, p)
lookformax = False
else:
if p > (cur_min[1] + delta):
peaks.append((cur_min[0], cur_min[1], False))
cur_max = (n, p)
lookformax = True
return peaks

43
nilmtools/median.py Executable file
View File

@@ -0,0 +1,43 @@
#!/usr/bin/python
import nilmtools.filter, scipy.signal
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Median Filter")
group = parser.add_argument_group("Median filter options")
group.add_argument("-z", "--size", action="store", type=int, default=25,
help = "median filter size (default %(default)s)")
group.add_argument("-d", "--difference", action="store_true",
help = "store difference rather than filtered values")
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url,
e.dest.path, e.src.layout)
raise SystemExit(1)
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata({ "median_filter_source": f.src.path,
"median_filter_size": args.size,
"median_filter_difference": repr(args.difference) })
f.process_numpy(median_filter, args = (args.size, args.difference))
def median_filter(data, interval, args, insert, final):
(size, diff) = args
(rows, cols) = data.shape
for i in range(cols - 1):
filtered = scipy.signal.medfilt(data[:, i+1], size)
if diff:
data[:, i+1] -= filtered
else:
data[:, i+1] = filtered
insert(data)
return rows
if __name__ == "__main__":
main()

176
nilmtools/pipewatch.py Executable file
View File

@@ -0,0 +1,176 @@
#!/usr/bin/python
import nilmdb.client
from nilmdb.utils.printf import *
import nilmdb.utils.lock
import nilmtools
import time
import sys
import os
import argparse
import subprocess
import tempfile
import threading
import select
import signal
import Queue
import daemon
def parse_args(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
version = nilmtools.__version__,
description = """\
Pipe data from 'generator' to 'consumer'. This is intended to be
executed frequently from cron, and will exit if another copy is
already running. If 'generator' or 'consumer' returns an error,
or if 'generator' stops sending data for a while, it will exit.
Intended for use with ethstream (generator) and nilm-insert
(consumer). Commands are executed through the shell.
""")
parser.add_argument("-d", "--daemon", action="store_true",
help="Run in background")
parser.add_argument("-l", "--lock", metavar="FILENAME", action="store",
default=tempfile.gettempdir() +
"/nilm-pipewatch.lock",
help="Lock file for detecting running instance")
parser.add_argument("-t", "--timeout", metavar="SECONDS", action="store",
type=float, default=30,
help="Restart if no output from " +
"generator for this long")
group = parser.add_argument_group("commands to execute")
group.add_argument("generator", action="store",
help="Data generator (e.g. \"ethstream -r 8000\")")
group.add_argument("consumer", action="store",
help="Data consumer (e.g. \"nilm-insert /foo/bar\")")
args = parser.parse_args(argv)
return args
def reader_thread(queue, fd):
# Read from a file descriptor, write to queue.
try:
while True:
(r, w, x) = select.select([fd], [], [fd], 0.25)
if x:
raise Exception # generator died?
if not r:
# short timeout -- just try again. This is to catch the
# fd being closed elsewhere, which is only detected
# when select restarts.
continue
data = os.read(fd, 65536)
if data == "": # generator EOF
raise Exception
queue.put(data)
except Exception:
queue.put(None)
def watcher_thread(queue, procs):
# Put None in the queue if either process dies
while True:
for p in procs:
if p.poll() is not None:
queue.put(None)
return
time.sleep(0.25)
def pipewatch(args):
# Run the processes, etc
with open(os.devnull, "r") as devnull:
generator = subprocess.Popen(args.generator, shell = True,
bufsize = -1, close_fds = True,
stdin = devnull,
stdout = subprocess.PIPE,
stderr = None,
preexec_fn = os.setpgrp)
consumer = subprocess.Popen(args.consumer, shell = True,
bufsize = -11, close_fds = True,
stdin = subprocess.PIPE,
stdout = None,
stderr = None,
preexec_fn = os.setpgrp)
queue = Queue.Queue(maxsize = 4)
reader = threading.Thread(target = reader_thread,
args = (queue, generator.stdout.fileno()))
reader.start()
watcher = threading.Thread(target = watcher_thread,
args = (queue, [generator, consumer]))
watcher.start()
try:
while True:
try:
data = queue.get(True, args.timeout)
if data is None:
break
consumer.stdin.write(data)
except Queue.Empty:
# Timeout: kill the generator
fprintf(sys.stderr, "pipewatch: timeout\n")
generator.terminate()
break
generator.stdout.close()
consumer.stdin.close()
except IOError:
fprintf(sys.stderr, "pipewatch: I/O error\n")
def kill(proc):
# Wait for a process to end, or kill it
def poll_timeout(proc, timeout):
for x in range(1+int(timeout / 0.1)):
if proc.poll() is not None:
break
time.sleep(0.1)
return proc.poll()
try:
if poll_timeout(proc, 0.5) is None:
os.killpg(proc.pid, signal.SIGTERM)
if poll_timeout(proc, 0.5) is None:
os.killpg(proc.pid, signal.SIGKILL)
except OSError:
pass
return poll_timeout(proc, 0.5)
# Wait for them to die, or kill them
cret = kill(consumer)
gret = kill(generator)
# Consume all remaining data in the queue until the reader
# and watcher threads are done
while reader.is_alive() or watcher.is_alive():
queue.get(True, 0.1)
fprintf(sys.stderr, "pipewatch: generator returned %d, " +
"consumer returned %d\n", gret, cret)
if gret == 0 and cret == 0:
sys.exit(0)
sys.exit(1)
def main(argv = None):
args = parse_args(argv)
lockfile = open(args.lock, "w")
if not nilmdb.utils.lock.exclusive_lock(lockfile):
printf("pipewatch process already running (according to %s)\n",
args.lock)
sys.exit(0)
try:
# Run as a daemon if requested, otherwise run directly.
if args.daemon:
with daemon.DaemonContext(files_preserve = [ lockfile ]):
pipewatch(args)
else:
pipewatch(args)
finally:
# Clean up lockfile
try:
os.unlink(args.lock)
except OSError:
pass
if __name__ == "__main__":
main()

View File

@@ -3,6 +3,8 @@
# Spectral envelope preprocessor.
# Requires two streams as input: the original raw data, and sinefit data.
from nilmdb.utils.printf import *
from nilmdb.utils.time import timestamp_to_human
import nilmtools.filter
import nilmdb.client
from numpy import *
@@ -10,6 +12,7 @@ import scipy.fftpack
import scipy.signal
#from matplotlib import pyplot as p
import bisect
from nilmdb.utils.interval import Interval
def main(argv = None):
# Set up argument parser
@@ -46,6 +49,10 @@ def main(argv = None):
print " nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec)
raise SystemExit(1)
if f.dest.layout_count != args.nharm * 2:
print "error: need", args.nharm*2, "columns in destination stream"
raise SystemExit(1)
# Check arguments
if args.column is None or args.column < 1:
parser.error("need a column number >= 1")
@@ -73,11 +80,24 @@ def main(argv = None):
# Check and set metadata in prep stream
f.check_dest_metadata({ "prep_raw_source": f.src.path,
"prep_sinefit_source": sinefit.path,
"prep_column": args.column })
"prep_column": args.column,
"prep_rotation": repr(rotation),
"prep_nshift": args.nshift })
# Run the processing function on all data
# Find the intersection of the usual set of intervals we'd filter,
# and the intervals actually present in sinefit data. This is
# what we will process.
filter_int = f.intervals()
sinefit_int = ( Interval(start, end) for (start, end) in
client_sinefit.stream_intervals(
args.sinepath, start = f.start, end = f.end) )
intervals = nilmdb.utils.interval.intersection(filter_int, sinefit_int)
# Run the process (using the helper in the filter module)
f.process_numpy(process, args = (client_sinefit, sinefit.path, args.column,
args.nharm, rotation, args.nshift))
args.nharm, rotation, args.nshift),
intervals = intervals)
def process(data, interval, args, insert_function, final):
(client, sinefit_path, column, nharm, rotation, nshift) = args
@@ -101,7 +121,6 @@ def process(data, interval, args, insert_function, final):
# Pull out sinefit data for the entire time range of this block
for sinefit_line in client.stream_extract(sinefit_path,
data[0, 0], data[rows-1, 0]):
def prep_period(t_min, t_max, rot):
"""
Compute prep coefficients from time t_min to t_max, which
@@ -158,7 +177,15 @@ def process(data, interval, args, insert_function, final):
break
processed = idx_max
print "Processed", processed, "of", rows, "rows"
# If we processed no data but there's lots in here, pretend we
# processed half of it.
if processed == 0 and rows > 10000:
processed = rows / 2
printf("%s: warning: no periods found; skipping %d rows\n",
timestamp_to_human(data[0][0]), processed)
else:
printf("%s: processed %d of %d rows\n",
timestamp_to_human(data[0][0]), processed, rows)
return processed
if __name__ == "__main__":

191
nilmtools/sinefit.py Executable file
View File

@@ -0,0 +1,191 @@
#!/usr/bin/python
# Sine wave fitting.
from nilmdb.utils.printf import *
import nilmtools.filter
import nilmtools.math
import nilmdb.client
from nilmdb.utils.time import (timestamp_to_human,
timestamp_to_seconds,
seconds_to_timestamp)
from numpy import *
from scipy import *
#import pylab as p
import sys
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Sine wave fitting")
group = parser.add_argument_group("Sine fit options")
group.add_argument('-c', '--column', action='store', type=int,
help='Column number (first data column is 1)')
group.add_argument('-f', '--frequency', action='store', type=float,
default=60.0,
help='Approximate frequency (default: %(default)s)')
group.add_argument('-m', '--min-freq', action='store', type=float,
help='Minimum valid frequency '
'(default: approximate frequency / 2))')
group.add_argument('-M', '--max-freq', action='store', type=float,
help='Maximum valid frequency '
'(default: approximate frequency * 2))')
group.add_argument('-a', '--min-amp', action='store', type=float,
default=20.0,
help='Minimum signal amplitude (default: %(default)s)')
# Parse arguments
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
rec = "float32_3"
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec)
raise SystemExit(1)
if args.column is None or args.column < 1:
parser.error("need a column number >= 1")
if args.frequency < 0.1:
parser.error("frequency must be >= 0.1")
if args.min_freq is None:
args.min_freq = args.frequency / 2
if args.max_freq is None:
args.max_freq = args.frequency * 2
if (args.min_freq > args.max_freq or
args.min_freq > args.frequency or
args.max_freq < args.frequency):
parser.error("invalid min or max frequency")
if args.min_amp < 0:
parser.error("min amplitude must be >= 0")
f.check_dest_metadata({ "sinefit_source": f.src.path,
"sinefit_column": args.column })
f.process_numpy(process, args = (args.column, args.frequency, args.min_amp,
args.min_freq, args.max_freq))
class SuppressibleWarning(object):
def __init__(self, maxcount = 10, maxsuppress = 100):
self.maxcount = maxcount
self.maxsuppress = maxsuppress
self.count = 0
self.last_msg = ""
def _write(self, sec, msg):
if sec:
now = timestamp_to_human(seconds_to_timestamp(sec)) + ": "
else:
now = ""
sys.stderr.write(now + msg)
def warn(self, msg, seconds = None):
self.count += 1
if self.count <= self.maxcount:
self._write(seconds, msg)
if (self.count - self.maxcount) >= self.maxsuppress:
self.reset(seconds)
def reset(self, seconds = None):
if self.count > self.maxcount:
self._write(seconds, sprintf("(%d warnings suppressed)\n",
self.count - self.maxcount))
self.count = 0
def process(data, interval, args, insert_function, final):
(column, f_expected, a_min, f_min, f_max) = args
rows = data.shape[0]
# Estimate sampling frequency from timestamps
fs = (rows-1) / (timestamp_to_seconds(data[-1][0]) -
timestamp_to_seconds(data[0][0]))
# Pull out about 3.5 periods of data at once;
# we'll expect to match 3 zero crossings in each window
N = max(int(3.5 * fs / f_expected), 10)
# If we don't have enough data, don't bother processing it
if rows < N:
return 0
warn = SuppressibleWarning(3, 1000)
# Process overlapping windows
start = 0
num_zc = 0
last_inserted_timestamp = None
while start < (rows - N):
this = data[start:start+N, column]
t_min = timestamp_to_seconds(data[start, 0])
t_max = timestamp_to_seconds(data[start+N-1, 0])
# Do 4-parameter sine wave fit
(A, f0, phi, C) = nilmtools.math.sfit4(this, fs)
# Check bounds. If frequency is too crazy, ignore this window
if f0 < f_min or f0 > f_max:
warn.warn(sprintf("frequency %s outside valid range %s - %s\n",
str(f0), str(f_min), str(f_max)), t_min)
start += N
continue
# If amplitude is too low, results are probably just noise
if A < a_min:
warn.warn(sprintf("amplitude %s below minimum threshold %s\n",
str(A), str(a_min)), t_min)
start += N
continue
#p.plot(arange(N), this)
#p.plot(arange(N), A * sin(f0/fs * 2 * pi * arange(N) + phi) + C, 'g')
# Period starts when the argument of sine is 0 degrees,
# so we're looking for sample number:
# n = (0 - phi) / (f0/fs * 2 * pi)
zc_n = (0 - phi) / (f0 / fs * 2 * pi)
period_n = fs/f0
# Add periods to make N positive
while zc_n < 0:
zc_n += period_n
last_zc = None
# Mark the zero crossings until we're a half period away
# from the end of the window
while zc_n < (N - period_n/2):
#p.plot(zc_n, C, 'ro')
t = t_min + zc_n / fs
if (last_inserted_timestamp is None or
t > last_inserted_timestamp):
insert_function([[seconds_to_timestamp(t), f0, A, C]])
last_inserted_timestamp = t
warn.reset(t)
else:
warn.warn("timestamp overlap\n", t)
num_zc += 1
last_zc = zc_n
zc_n += period_n
# Advance the window one quarter period past the last marked
# zero crossing, or advance the window by half its size if we
# didn't mark any.
if last_zc is not None:
advance = min(last_zc + period_n/4, N)
else:
advance = N/2
#p.plot(advance, C, 'go')
#p.show()
start = int(round(start + advance))
# Return the number of rows we've processed
warn.reset(last_inserted_timestamp)
if last_inserted_timestamp:
now = timestamp_to_human(seconds_to_timestamp(
last_inserted_timestamp)) + ": "
else:
now = ""
printf("%sMarked %d zero-crossings in %d rows\n", now, num_zc, start)
return start
if __name__ == "__main__":
main()

317
nilmtools/trainola.py Executable file
View File

@@ -0,0 +1,317 @@
#!/usr/bin/python
from nilmdb.utils.printf import *
import nilmdb.client
import nilmtools.filter
import nilmtools.math
from nilmdb.utils.time import (timestamp_to_human,
timestamp_to_seconds,
seconds_to_timestamp)
from nilmdb.utils import datetime_tz
from nilmdb.utils.interval import Interval
import numpy as np
import scipy
import scipy.signal
from numpy.core.umath_tests import inner1d
import nilmrun
from collections import OrderedDict
import sys
import time
import functools
import collections
class DataError(ValueError):
pass
def build_column_mapping(colinfo, streaminfo):
"""Given the 'columns' list from the JSON data, verify and
pull out a dictionary mapping for the column names/numbers."""
columns = OrderedDict()
for c in colinfo:
col_num = c['index'] + 1 # skip timestamp
if (c['name'] in columns.keys() or col_num in columns.values()):
raise DataError("duplicated columns")
if (c['index'] < 0 or c['index'] >= streaminfo.layout_count):
raise DataError("bad column number")
columns[c['name']] = col_num
if not len(columns):
raise DataError("no columns")
return columns
class Exemplar(object):
def __init__(self, exinfo, min_rows = 10, max_rows = 100000):
"""Given a dictionary entry from the 'exemplars' input JSON,
verify the stream, columns, etc. Then, fetch all the data
into self.data."""
self.name = exinfo['name']
self.url = exinfo['url']
self.stream = exinfo['stream']
self.start = exinfo['start']
self.end = exinfo['end']
self.dest_column = exinfo['dest_column']
# Get stream info
self.client = nilmdb.client.numpyclient.NumpyClient(self.url)
self.info = nilmtools.filter.get_stream_info(self.client, self.stream)
if not self.info:
raise DataError(sprintf("exemplar stream '%s' does not exist " +
"on server '%s'", self.stream, self.url))
# Build up name => index mapping for the columns
self.columns = build_column_mapping(exinfo['columns'], self.info)
# Count points
self.count = self.client.stream_count(self.stream, self.start, self.end)
# Verify count
if self.count == 0:
raise DataError("No data in this exemplar!")
if self.count < min_rows:
raise DataError("Too few data points: " + str(self.count))
if self.count > max_rows:
raise DataError("Too many data points: " + str(self.count))
# Extract the data
datagen = self.client.stream_extract_numpy(self.stream,
self.start, self.end,
self.info.layout,
maxrows = self.count)
self.data = list(datagen)[0]
# Extract just the columns that were specified in self.columns,
# skipping the timestamp.
extract_columns = [ value for (key, value) in self.columns.items() ]
self.data = self.data[:,extract_columns]
# Fix the column indices in e.columns, since we removed/reordered
# columns in self.data
for n, k in enumerate(self.columns):
self.columns[k] = n
# Subtract the means from each column
self.data = self.data - self.data.mean(axis=0)
# Get scale factors for each column by computing dot product
# of each column with itself.
self.scale = inner1d(self.data.T, self.data.T)
# Ensure a minimum (nonzero) scale and convert to list
self.scale = np.maximum(self.scale, [1e-9]).tolist()
def __str__(self):
return sprintf("\"%s\" %s [%s] %s rows",
self.name, self.stream, ",".join(self.columns.keys()),
self.count)
def timestamp_to_short_human(timestamp):
dt = datetime_tz.datetime_tz.fromtimestamp(timestamp_to_seconds(timestamp))
return dt.strftime("%H:%M:%S")
def trainola_matcher(data, interval, args, insert_func, final_chunk):
"""Perform cross-correlation match"""
( src_columns, dest_count, exemplars ) = args
nrows = data.shape[0]
# We want at least 10% more points than the widest exemplar.
widest = max([ x.count for x in exemplars ])
if (widest * 1.1) > nrows:
return 0
# This is how many points we'll consider valid in the
# cross-correlation.
valid = nrows + 1 - widest
matches = collections.defaultdict(list)
# Try matching against each of the exemplars
for e in exemplars:
corrs = []
# Compute cross-correlation for each column
for col_name in e.columns:
a = data[:, src_columns[col_name]]
b = e.data[:, e.columns[col_name]]
corr = scipy.signal.fftconvolve(a, np.flipud(b), 'valid')[0:valid]
# Scale by the norm of the exemplar
corr = corr / e.scale[e.columns[col_name]]
corrs.append(corr)
# Find the peaks using the column with the largest amplitude
biggest = e.scale.index(max(e.scale))
peaks = nilmtools.math.peak_detect(corrs[biggest], 0.1)
# To try to reduce false positives, discard peaks where
# there's a higher-magnitude peak (either min or max) within
# one exemplar width nearby.
good_peak_locations = []
for (i, (n, p, is_max)) in enumerate(peaks):
if not is_max:
continue
ok = True
# check up to 'e.count' rows before this one
j = i-1
while ok and j >= 0 and peaks[j][0] > (n - e.count):
if abs(peaks[j][1]) > abs(p):
ok = False
j -= 1
# check up to 'e.count' rows after this one
j = i+1
while ok and j < len(peaks) and peaks[j][0] < (n + e.count):
if abs(peaks[j][1]) > abs(p):
ok = False
j += 1
if ok:
good_peak_locations.append(n)
# Now look at all good peaks
for row in good_peak_locations:
# Correlation for each column must be close enough to 1.
for (corr, scale) in zip(corrs, e.scale):
# The accepted distance from 1 is based on the relative
# amplitude of the column. Use a linear mapping:
# scale 1.0 -> distance 0.1
# scale 0.0 -> distance 1.0
distance = 1 - 0.9 * (scale / e.scale[biggest])
if abs(corr[row] - 1) > distance:
# No match
break
else:
# Successful match
matches[row].append(e)
# Insert matches into destination stream.
matched_rows = sorted(matches.keys())
out = np.zeros((len(matched_rows), dest_count + 1))
for n, row in enumerate(matched_rows):
# Fill timestamp
out[n][0] = data[row, 0]
# Mark matched exemplars
for exemplar in matches[row]:
out[n, exemplar.dest_column + 1] = 1.0
# Insert it
insert_func(out)
# Return how many rows we processed
valid = max(valid, 0)
printf(" [%s] matched %d exemplars in %d rows\n",
timestamp_to_short_human(data[0][0]), np.sum(out[:,1:]), valid)
return valid
def trainola(conf):
print "Trainola", nilmtools.__version__
# Load main stream data
url = conf['url']
src_path = conf['stream']
dest_path = conf['dest_stream']
start = conf['start']
end = conf['end']
# Get info for the src and dest streams
src_client = nilmdb.client.numpyclient.NumpyClient(url)
src = nilmtools.filter.get_stream_info(src_client, src_path)
if not src:
raise DataError("source path '" + src_path + "' does not exist")
src_columns = build_column_mapping(conf['columns'], src)
dest_client = nilmdb.client.numpyclient.NumpyClient(url)
dest = nilmtools.filter.get_stream_info(dest_client, dest_path)
if not dest:
raise DataError("destination path '" + dest_path + "' does not exist")
printf("Source:\n")
printf(" %s [%s]\n", src.path, ",".join(src_columns.keys()))
printf("Destination:\n")
printf(" %s (%s columns)\n", dest.path, dest.layout_count)
# Pull in the exemplar data
exemplars = []
for n, exinfo in enumerate(conf['exemplars']):
printf("Loading exemplar %d:\n", n)
e = Exemplar(exinfo)
col = e.dest_column
if col < 0 or col >= dest.layout_count:
raise DataError(sprintf("bad destination column number %d\n" +
"dest stream only has 0 through %d",
col, dest.layout_count - 1))
printf(" %s, output column %d\n", str(e), col)
exemplars.append(e)
if len(exemplars) == 0:
raise DataError("missing exemplars")
# Verify that the exemplar columns are all represented in the main data
for n, ex in enumerate(exemplars):
for col in ex.columns:
if col not in src_columns:
raise DataError(sprintf("Exemplar %d column %s is not "
"available in source data", n, col))
# Figure out which intervals we should process
intervals = ( Interval(s, e) for (s, e) in
src_client.stream_intervals(src_path,
diffpath = dest_path,
start = start, end = end) )
intervals = nilmdb.utils.interval.optimize(intervals)
# Do the processing
rows = 100000
extractor = functools.partial(src_client.stream_extract_numpy,
src.path, layout = src.layout, maxrows = rows)
inserter = functools.partial(dest_client.stream_insert_numpy_context,
dest.path)
start = time.time()
processed_time = 0
printf("Processing intervals:\n")
for interval in intervals:
printf("%s\n", interval.human_string())
nilmtools.filter.process_numpy_interval(
interval, extractor, inserter, rows * 3,
trainola_matcher, (src_columns, dest.layout_count, exemplars))
processed_time += (timestamp_to_seconds(interval.end) -
timestamp_to_seconds(interval.start))
elapsed = max(time.time() - start, 1e-3)
printf("Done. Processed %.2f seconds per second.\n",
processed_time / elapsed)
def main(argv = None):
import simplejson as json
import sys
if argv is None:
argv = sys.argv[1:]
if len(argv) != 1 or argv[0] == '-h' or argv[0] == '--help':
printf("usage: %s [-h] [-v] <json-config-dictionary>\n\n", sys.argv[0])
printf(" Where <json-config-dictionary> is a JSON-encoded " +
"dictionary string\n")
printf(" with exemplar and stream data.\n\n")
printf(" See extras/trainola-test-param*.js in the nilmtools " +
"repository\n")
printf(" for examples.\n")
if len(argv) != 1:
raise SystemExit(1)
raise SystemExit(0)
if argv[0] == '-v' or argv[0] == '--version':
printf("%s\n", nilmtools.__version__)
raise SystemExit(0)
try:
# Passed in a JSON string (e.g. on the command line)
conf = json.loads(argv[0])
except TypeError as e:
# Passed in the config dictionary (e.g. from NilmRun)
conf = argv[0]
return trainola(conf)
if __name__ == "__main__":
main()

View File

@@ -30,7 +30,7 @@ except ImportError:
# Versioneer manages version numbers from git tags.
# https://github.com/warner/python-versioneer
import versioneer
versioneer.versionfile_source = 'src/_version.py'
versioneer.versionfile_source = 'nilmtools/_version.py'
versioneer.versionfile_build = 'nilmtools/_version.py'
versioneer.tag_prefix = 'nilmtools-'
versioneer.parentdir_prefix = 'nilmtools-'
@@ -61,14 +61,14 @@ setup(name='nilmtools',
long_description = "NILM Database Tools",
license = "Proprietary",
author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.5.0',
install_requires = [ 'nilmdb >= 1.8.5',
'numpy',
'scipy',
'matplotlib',
'python-daemon >= 1.5',
#'matplotlib',
],
packages = [ 'nilmtools',
],
package_dir = { 'nilmtools': 'src' },
entry_points = {
'console_scripts': [
'nilm-decimate = nilmtools.decimate:main',
@@ -79,6 +79,9 @@ setup(name='nilmtools',
'nilm-copy-wildcard = nilmtools.copy_wildcard:main',
'nilm-sinefit = nilmtools.sinefit:main',
'nilm-cleanup = nilmtools.cleanup:main',
'nilm-median = nilmtools.median:main',
'nilm-trainola = nilmtools.trainola:main',
'nilm-pipewatch = nilmtools.pipewatch:main',
],
},
zip_safe = False,

View File

@@ -1,187 +0,0 @@
#!/usr/bin/python
# Sine wave fitting. This runs about 5x faster than realtime on raw data.
import nilmtools.filter
import nilmdb.client
from numpy import *
from scipy import *
#import pylab as p
import operator
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Sine wave fitting")
group = parser.add_argument_group("Sine fit options")
group.add_argument('-c', '--column', action='store', type=int,
help='Column number (first data column is 1)')
group.add_argument('-f', '--frequency', action='store', type=float,
default=60.0,
help='Approximate frequency (default: %(default)s)')
# Parse arguments
try:
args = f.parse_args(argv)
except nilmtools.filter.MissingDestination as e:
rec = "float32_3"
print "Source is %s (%s)" % (e.src.path, e.src.layout)
print "Destination %s doesn't exist" % (e.dest.path)
print "You could make it with a command like:"
print " nilmtool -u %s create %s %s" % (e.dest.url, e.dest.path, rec)
raise SystemExit(1)
if args.column is None or args.column < 1:
parser.error("need a column number >= 1")
if args.frequency < 0.1:
parser.error("frequency must be >= 0.1")
f.check_dest_metadata({ "sinefit_source": f.src.path,
"sinefit_column": args.column })
f.process_numpy(process, args = (args.column, args.frequency))
def process(data, interval, args, insert_function, final):
(column, f_expected) = args
rows = data.shape[0]
# Estimate sampling frequency from timestamps
fs = 1e6 * (rows-1) / (data[-1][0] - data[0][0])
# Pull out about 3.5 periods of data at once;
# we'll expect to match 3 zero crossings in each window
N = max(int(3.5 * fs / f_expected), 10)
# If we don't have enough data, don't bother processing it
if rows < N:
return 0
# Process overlapping windows
start = 0
num_zc = 0
while start < (rows - N):
this = data[start:start+N, column]
t_min = data[start, 0]/1e6
t_max = data[start+N-1, 0]/1e6
# Do 4-parameter sine wave fit
(A, f0, phi, C) = sfit4(this, fs)
# Check bounds. If frequency is too crazy, ignore this window
if f0 < (f_expected/2) or f0 > (f_expected*2):
print "frequency", f0, "too far from expected value", f_expected
start += N
continue
#p.plot(arange(N), this)
#p.plot(arange(N), A * cos(f0/fs * 2 * pi * arange(N) + phi) + C, 'g')
# Period starts when the argument of cosine is 3*pi/2 degrees,
# so we're looking for sample number:
# n = (3 * pi / 2 - phi) / (f0/fs * 2 * pi)
zc_n = (3 * pi / 2 - phi) / (f0 / fs * 2 * pi)
period_n = fs/f0
# Add periods to make N positive
while zc_n < 0:
zc_n += period_n
last_zc = None
# Mark the zero crossings until we're a half period away
# from the end of the window
while zc_n < (N - period_n/2):
#p.plot(zc_n, C, 'ro')
t = t_min + zc_n / fs
insert_function([[t * 1e6, f0, A, C]])
num_zc += 1
last_zc = zc_n
zc_n += period_n
# Advance the window one quarter period past the last marked
# zero crossing, or advance the window by half its size if we
# didn't mark any.
if last_zc is not None:
advance = min(last_zc + period_n/4, N)
else:
advance = N/2
#p.plot(advance, C, 'go')
#p.show()
start = int(round(start + advance))
# Return the number of rows we've processed
print "Marked", num_zc, "zero-crossings in", start, "rows"
return start
def sfit4(data, fs):
"""(A, f0, phi, C) = sfit4(data, fs)
Compute 4-parameter (unknown-frequency) least-squares fit to
sine-wave data, according to IEEE Std 1241-2010 Annex B
Input:
data vector of input samples
fs sampling rate (Hz)
Output:
Parameters [A, f0, phi, C] to fit the equation
x[n] = A * cos(f0/fs * 2 * pi * n + phi) + C
where n is sample number. Or, as a function of time:
x(t) = A * cos(f0 * 2 * pi * t + phi) + C
by Jim Paris
(Verified to match sfit4.m)
"""
N = len(data)
t = linspace(0, (N-1) / fs, N)
## Estimate frequency using FFT (step b)
Fc = fft(data)
F = abs(Fc)
F[0] = 0 # eliminate DC
# Find pair of spectral lines with largest amplitude:
# resulting values are in F(i) and F(i+1)
i = argmax(F[0:int(N/2)] + F[1:int(N/2+1)])
# Interpolate FFT to get a better result (from Markus [B37])
U1 = real(Fc[i])
U2 = real(Fc[i+1])
V1 = imag(Fc[i])
V2 = imag(Fc[i+1])
n = 2 * pi / N
ni1 = n * i
ni2 = n * (i+1)
K = ((V2-V1)*sin(ni1) + (U2-U1)*cos(ni1)) / (U2-U1)
Z1 = V1 * (K - cos(ni1)) / sin(ni1) + U1
Z2 = V2 * (K - cos(ni2)) / sin(ni2) + U2
i = arccos((Z2*cos(ni2) - Z1*cos(ni1)) / (Z2-Z1)) / n
# Convert to Hz
f0 = i * fs / N
## Fit it
# first guess for A0, B0 using 3-parameter fit (step c)
w = 2*pi*f0
D = c_[cos(w*t), sin(w*t), ones(N)]
s = linalg.lstsq(D, data)[0]
# Now iterate 6 times (step i)
for idx in range(6):
D = c_[cos(w*t), sin(w*t), ones(N),
-s[0] * t * sin(w*t) + s[1] * t * cos(w*t) ] # eqn B.16
s = linalg.lstsq(D, data)[0] # eqn B.18
w = w + s[3] # update frequency estimate
## Extract results
A = sqrt(s[0]*s[0] + s[1]*s[1]) # eqn B.21
f0 = w / (2*pi)
try:
phi = -arctan2(s[1], s[0]) # eqn B.22
except TypeError:
# something broke down, just return zeros
return (0, 0, 0, 0)
C = s[2]
return (A, f0, phi, C)
if __name__ == "__main__":
main()