Compare commits

...

17 Commits

14 changed files with 140 additions and 76 deletions

View File

@@ -1,14 +1,14 @@
#URL="http://bucket.mit.edu:8080/nilmdb" #URL="http://bucket.mit.edu:8080/nilmdb"
URL="http://localhost/nilmdb" URL="http://localhost/nilmdb"
all: all: test
ifeq ($(INSIDE_EMACS), t)
@make test
else
@echo "Try 'make install'"
endif
test: test_trainola3 test:
ifeq ($(INSIDE_EMACS), t)
@make test_sinefit
else
@echo 'No test suite for nilmtools. Try "make install"'
endif
test_pipewatch: test_pipewatch:
nilmtools/pipewatch.py -t 3 "seq 10 20" "seq 20 30" nilmtools/pipewatch.py -t 3 "seq 10 20" "seq 20 30"
@@ -58,6 +58,14 @@ test_prep: /tmp/raw.dat
nilmtools/prep.py -c 2 /test/raw /test/sinefit /test/prep nilmtools/prep.py -c 2 /test/raw /test/sinefit /test/prep
nilmtool extract -s min -e max /test/prep | head -20 nilmtool extract -s min -e max /test/prep | head -20
test_sinefit:
make install >/dev/null 2>&1
-nilmtool destroy -R /test/sinefit
nilmtool create /test/sinefit float32_3
nilmtools/sinefit.py -c 5 -s '2013/03/25 09:11:00' \
-e '2013/03/25 10:11:00' /sharon/raw /test/sinefit
nilmtool extract -s min -e max /test/sinefit | head -20
test_decimate: test_decimate:
-@nilmtool destroy /lees-compressor/no-leak/raw/4 || true -@nilmtool destroy /lees-compressor/no-leak/raw/4 || true
-@nilmtool destroy /lees-compressor/no-leak/raw/16 || true -@nilmtool destroy /lees-compressor/no-leak/raw/16 || true

View File

@@ -6,3 +6,4 @@ keep = 2w
[/sharon/sinefit] [/sharon/sinefit]
keep = 1y keep = 1y
decimated = false

View File

@@ -1,9 +1,15 @@
# Install this by running "crontab crontab" (will replace existing crontab) # Install this by running "crontab crontab" (will replace existing crontab)
SHELL=/bin/bash
PATH=/usr/local/bin:/usr/local/sbin:/usr/bin:/usr/sbin:/bin:/sbin
# m h dom mon dow cmd # m h dom mon dow cmd
# Run NilmDB processing every 5 minutes # Run NilmDB processing every 5 minutes
*/5 * * * * chronic /home/nilm/data/process.sh */5 * * * * chronic /home/nilm/data/process.sh
# Check the capture process every minute # Try frequently restarting the capture process in case it died
*/1 * * * * chronic /home/nilm/data/capture.sh */5 * * * * chronic /home/nilm/data/capture.sh
# Run fsck at startup
@reboot chronic nilmdb-fsck --fix --no-data /home/nilm/data/db/

View File

@@ -13,16 +13,20 @@ if ! flock -n -x 99 ; then
fi fi
trap 'rm -f "$LOCKFILE"' 0 trap 'rm -f "$LOCKFILE"' 0
# sinefit on phase A voltage # redirect stdout/stderr to log, but keep it on the console too
exec > >(tee /home/nilm/data/process.log)
exec 2> >(tee -a /home/nilm/data/process.log >&2)
echo "sinefit on phase A voltage"
nilm-sinefit -c 5 /sharon/raw /sharon/sinefit nilm-sinefit -c 5 /sharon/raw /sharon/sinefit
# prep on A, B, C with appropriate rotations echo "prep on A, B, C with appropriate rotations"
nilm-prep -c 1 -r 0 /sharon/raw /sharon/sinefit /sharon/prep-a nilm-prep -c 1 -r 0 /sharon/raw /sharon/sinefit /sharon/prep-a
nilm-prep -c 2 -r 120 /sharon/raw /sharon/sinefit /sharon/prep-b nilm-prep -c 2 -r 120 /sharon/raw /sharon/sinefit /sharon/prep-b
nilm-prep -c 3 -r 240 /sharon/raw /sharon/sinefit /sharon/prep-c nilm-prep -c 3 -r 240 /sharon/raw /sharon/sinefit /sharon/prep-c
# decimate raw and prep data echo "decimate raw and prep data"
nilm-decimate-auto /sharon/raw /sharon/prep* nilm-decimate-auto /sharon/raw /sharon/prep*
# run cleanup echo "run cleanup"
nilm-cleanup --yes /home/nilm/data/cleanup.cfg nilm-cleanup --yes /home/nilm/data/cleanup.cfg

View File

@@ -12,6 +12,8 @@ import sys
def main(argv = None): def main(argv = None):
f = nilmtools.filter.Filter() f = nilmtools.filter.Filter()
parser = f.setup_parser("Copy a stream") parser = f.setup_parser("Copy a stream")
parser.add_argument('-n', '--nometa', action='store_true',
help="Don't copy or check metadata")
# Parse arguments # Parse arguments
try: try:
@@ -25,8 +27,9 @@ def main(argv = None):
raise SystemExit(1) raise SystemExit(1)
# Copy metadata # Copy metadata
meta = f.client_src.stream_get_metadata(f.src.path) if not args.nometa:
f.check_dest_metadata(meta) meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata(meta)
# Copy all rows of data using the faster Numpy interfaces # Copy all rows of data using the faster Numpy interfaces
extractor = NumpyClient(f.src.url).stream_extract_numpy extractor = NumpyClient(f.src.url).stream_extract_numpy

View File

@@ -16,6 +16,8 @@ def main(argv = None):
Example: %(prog)s -u http://host1/nilmdb -U http://host2/nilmdb /sharon/* Example: %(prog)s -u http://host1/nilmdb -U http://host2/nilmdb /sharon/*
""", skip_paths = True) """, skip_paths = True)
parser.add_argument('-n', '--nometa', action='store_true',
help="Don't copy or check metadata")
parser.add_argument("path", action="store", nargs="+", parser.add_argument("path", action="store", nargs="+",
help='Wildcard paths to copy') help='Wildcard paths to copy')
args = parser.parse_args(argv) args = parser.parse_args(argv)
@@ -56,6 +58,8 @@ def main(argv = None):
new_argv.extend(["--end", "@" + repr(args.end)]) new_argv.extend(["--end", "@" + repr(args.end)])
if args.dry_run: if args.dry_run:
new_argv.extend(["--dry-run"]) new_argv.extend(["--dry-run"])
if args.nometa:
new_argv.extend(["--nometa"])
if args.force_metadata: if args.force_metadata:
new_argv.extend(["--force-metadata"]) new_argv.extend(["--force-metadata"])
new_argv.extend([stream[0], stream[0]]) new_argv.extend([stream[0], stream[0]])

View File

@@ -21,9 +21,9 @@ def main(argv = None):
parser.add_argument("-u", "--url", action="store", parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/", default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)") help="NilmDB server URL (default: %(default)s)")
parser.add_argument('-f', '--factor', action='store', default=4, type=int, parser.add_argument("-f", "--factor", action="store", default=4, type=int,
help='Decimation factor (default: %(default)s)') help='Decimation factor (default: %(default)s)')
parser.add_argument("--force-metadata", action="store_true", parser.add_argument("-F", "--force-metadata", action="store_true",
default = False, default = False,
help="Force metadata changes if the dest " help="Force metadata changes if the dest "
"doesn't match") "doesn't match")

View File

@@ -133,6 +133,34 @@ def process_numpy_interval(interval, extractor, inserter, warn_rows,
# we'll not miss any data when we run again later. # we'll not miss any data when we run again later.
insert_ctx.update_end(old_array[processed][0]) insert_ctx.update_end(old_array[processed][0])
def example_callback_function(data, interval, args, insert_func, final):
"""Example of the signature for the function that gets passed
to process_numpy_interval.
'data': array of data to process -- may be empty
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
If unprocessed data remains after 'final' is True, the interval
being inserted will be ended at the timestamp of the first
unprocessed data point.
"""
raise NotImplementedError("example_callback_function does nothing")
class Filter(object): class Filter(object):
def __init__(self, parser_description = None): def __init__(self, parser_description = None):
@@ -144,8 +172,8 @@ class Filter(object):
self.dest = None self.dest = None
self.start = None self.start = None
self.end = None self.end = None
self.interhost = False self._interhost = False
self.force_metadata = False self._force_metadata = False
if parser_description is not None: if parser_description is not None:
self.setup_parser(parser_description) self.setup_parser(parser_description)
self.parse_args() self.parse_args()
@@ -178,7 +206,7 @@ class Filter(object):
default = False, default = False,
help="Just print intervals that would be " help="Just print intervals that would be "
"processed") "processed")
group.add_argument("--force-metadata", action="store_true", group.add_argument("-F", "--force-metadata", action="store_true",
default = False, default = False,
help="Force metadata changes if the dest " help="Force metadata changes if the dest "
"doesn't match") "doesn't match")
@@ -208,12 +236,12 @@ class Filter(object):
if dest_url is None: if dest_url is None:
dest_url = url dest_url = url
if url != dest_url: if url != dest_url:
self.interhost = True self._interhost = True
self._client_src = Client(url) self._client_src = Client(url)
self._client_dest = Client(dest_url) self._client_dest = Client(dest_url)
if (not self.interhost) and (srcpath == destpath): if (not self._interhost) and (srcpath == destpath):
raise ArgumentError("source and destination path must be different") raise ArgumentError("source and destination path must be different")
# Open the streams # Open the streams
@@ -231,8 +259,8 @@ class Filter(object):
# Print info # Print info
if not quiet: if not quiet:
print "Source:", self.src.string(self.interhost) print "Source:", self.src.string(self._interhost)
print " Dest:", self.dest.string(self.interhost) print " Dest:", self.dest.string(self._interhost)
def parse_args(self, argv = None): def parse_args(self, argv = None):
"""Parse arguments from a command line""" """Parse arguments from a command line"""
@@ -241,7 +269,7 @@ class Filter(object):
self.set_args(args.url, args.dest_url, args.srcpath, args.destpath, self.set_args(args.url, args.dest_url, args.srcpath, args.destpath,
args.start, args.end, quiet = False, parsed_args = args) args.start, args.end, quiet = False, parsed_args = args)
self.force_metadata = args.force_metadata self._force_metadata = args.force_metadata
if args.dry_run: if args.dry_run:
for interval in self.intervals(): for interval in self.intervals():
print interval.human_string() print interval.human_string()
@@ -252,7 +280,7 @@ class Filter(object):
"""Generate all the intervals that this filter should process""" """Generate all the intervals that this filter should process"""
self._using_client = True self._using_client = True
if self.interhost: if self._interhost:
# Do the difference ourselves # Do the difference ourselves
s_intervals = ( Interval(start, end) s_intervals = ( Interval(start, end)
for (start, end) in for (start, end) in
@@ -289,10 +317,11 @@ class Filter(object):
str(e), toparse)) str(e), toparse))
def check_dest_metadata(self, data): def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. If """See if the metadata jives, and complain if it doesn't. For
there's no conflict, update the metadata to match 'data'.""" each key in data, if the stream contains the key, it must match
values. If the stream does not contain the key, it is created."""
metadata = self._client_dest.stream_get_metadata(self.dest.path) metadata = self._client_dest.stream_get_metadata(self.dest.path)
if not self.force_metadata: if not self._force_metadata:
for key in data: for key in data:
wanted = data[key] wanted = data[key]
if not isinstance(wanted, basestring): if not isinstance(wanted, basestring):
@@ -329,30 +358,10 @@ class Filter(object):
If 'intervals' is not None, process those intervals instead of If 'intervals' is not None, process those intervals instead of
the default list. the default list.
'function' should be defined as: 'function' should be defined with the same interface as
# def function(data, interval, args, insert_func, final) nilmtools.filter.example_callback_function. See the
documentation of that for details. 'args' are passed to
'data': array of data to process -- may be empty 'function'.
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
If unprocessed data remains after 'final' is True, the interval
being inserted will be ended at the timestamp of the first
unprocessed data point.
""" """
extractor = NumpyClient(self.src.url).stream_extract_numpy extractor = NumpyClient(self.src.url).stream_extract_numpy
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context inserter = NumpyClient(self.dest.url).stream_insert_numpy_context

View File

@@ -53,7 +53,7 @@ def parse_args(argv = None):
is stepped forward to match 'clock'. is stepped forward to match 'clock'.
- If 'data' is running ahead, there is overlap in the data, and an - If 'data' is running ahead, there is overlap in the data, and an
error is raised. If '--ignore' is specified, the current file error is raised. If '--skip' is specified, the current file
is skipped instead of raising an error. is skipped instead of raising an error.
""")) """))
parser.add_argument("-u", "--url", action="store", parser.add_argument("-u", "--url", action="store",

View File

@@ -37,17 +37,21 @@ def sfit4(data, fs):
i = argmax(F[0:int(N/2)] + F[1:int(N/2+1)]) i = argmax(F[0:int(N/2)] + F[1:int(N/2+1)])
# Interpolate FFT to get a better result (from Markus [B37]) # Interpolate FFT to get a better result (from Markus [B37])
U1 = real(Fc[i]) try:
U2 = real(Fc[i+1]) U1 = real(Fc[i])
V1 = imag(Fc[i]) U2 = real(Fc[i+1])
V2 = imag(Fc[i+1]) V1 = imag(Fc[i])
n = 2 * pi / N V2 = imag(Fc[i+1])
ni1 = n * i n = 2 * pi / N
ni2 = n * (i+1) ni1 = n * i
K = ((V2-V1)*sin(ni1) + (U2-U1)*cos(ni1)) / (U2-U1) ni2 = n * (i+1)
Z1 = V1 * (K - cos(ni1)) / sin(ni1) + U1 K = ((V2-V1)*sin(ni1) + (U2-U1)*cos(ni1)) / (U2-U1)
Z2 = V2 * (K - cos(ni2)) / sin(ni2) + U2 Z1 = V1 * (K - cos(ni1)) / sin(ni1) + U1
i = arccos((Z2*cos(ni2) - Z1*cos(ni1)) / (Z2-Z1)) / n Z2 = V2 * (K - cos(ni2)) / sin(ni2) + U2
i = arccos((Z2*cos(ni2) - Z1*cos(ni1)) / (Z2-Z1)) / n
except Exception:
# Just go with the biggest FFT peak
i = argmax(F[0:int(N/2)])
# Convert to Hz # Convert to Hz
f0 = i * float(fs) / N f0 = i * float(fs) / N

View File

@@ -84,13 +84,16 @@ def pipewatch(args):
bufsize = -1, close_fds = True, bufsize = -1, close_fds = True,
stdin = devnull, stdin = devnull,
stdout = subprocess.PIPE, stdout = subprocess.PIPE,
stderr = None) stderr = None,
preexec_fn = os.setpgrp)
consumer = subprocess.Popen(args.consumer, shell = True, consumer = subprocess.Popen(args.consumer, shell = True,
bufsize = -11, close_fds = True, bufsize = -11, close_fds = True,
stdin = subprocess.PIPE, stdin = subprocess.PIPE,
stdout = None, stderr = None) stdout = None,
stderr = None,
preexec_fn = os.setpgrp)
queue = Queue.Queue(maxsize = 32) queue = Queue.Queue(maxsize = 4)
reader = threading.Thread(target = reader_thread, reader = threading.Thread(target = reader_thread,
args = (queue, generator.stdout.fileno())) args = (queue, generator.stdout.fileno()))
reader.start() reader.start()
@@ -125,16 +128,21 @@ def pipewatch(args):
return proc.poll() return proc.poll()
try: try:
if poll_timeout(proc, 0.5) is None: if poll_timeout(proc, 0.5) is None:
proc.terminate() os.killpg(proc.pid, signal.SIGTERM)
if poll_timeout(proc, 0.5) is None: if poll_timeout(proc, 0.5) is None:
proc.kill() os.killpg(proc.pid, signal.SIGKILL)
except OSError: except OSError:
pass pass
return poll_timeout(proc, 0.5) return poll_timeout(proc, 0.5)
# Wait for them to die, or kill them # Wait for them to die, or kill them
gret = kill(generator)
cret = kill(consumer) cret = kill(consumer)
gret = kill(generator)
# Consume all remaining data in the queue until the reader
# and watcher threads are done
while reader.is_alive() or watcher.is_alive():
queue.get(True, 0.1)
fprintf(sys.stderr, "pipewatch: generator returned %d, " + fprintf(sys.stderr, "pipewatch: generator returned %d, " +
"consumer returned %d\n", gret, cret) "consumer returned %d\n", gret, cret)

View File

@@ -81,7 +81,8 @@ def main(argv = None):
f.check_dest_metadata({ "prep_raw_source": f.src.path, f.check_dest_metadata({ "prep_raw_source": f.src.path,
"prep_sinefit_source": sinefit.path, "prep_sinefit_source": sinefit.path,
"prep_column": args.column, "prep_column": args.column,
"prep_rotation": repr(rotation) }) "prep_rotation": repr(rotation),
"prep_nshift": args.nshift })
# Find the intersection of the usual set of intervals we'd filter, # Find the intersection of the usual set of intervals we'd filter,
# and the intervals actually present in sinefit data. This is # and the intervals actually present in sinefit data. This is

View File

@@ -96,8 +96,11 @@ def process(data, interval, args, insert_function, final):
rows = data.shape[0] rows = data.shape[0]
# Estimate sampling frequency from timestamps # Estimate sampling frequency from timestamps
fs = (rows-1) / (timestamp_to_seconds(data[-1][0]) - ts_min = timestamp_to_seconds(data[0][0])
timestamp_to_seconds(data[0][0])) ts_max = timestamp_to_seconds(data[-1][0])
if ts_min >= ts_max:
return 0
fs = (rows-1) / (ts_max - ts_min)
# Pull out about 3.5 periods of data at once; # Pull out about 3.5 periods of data at once;
# we'll expect to match 3 zero crossings in each window # we'll expect to match 3 zero crossings in each window

View File

@@ -287,8 +287,21 @@ def main(argv = None):
if argv is None: if argv is None:
argv = sys.argv[1:] argv = sys.argv[1:]
if len(argv) != 1: if len(argv) != 1 or argv[0] == '-h' or argv[0] == '--help':
raise DataError("need one argument, either a dictionary or JSON string") printf("usage: %s [-h] [-v] <json-config-dictionary>\n\n", sys.argv[0])
printf(" Where <json-config-dictionary> is a JSON-encoded " +
"dictionary string\n")
printf(" with exemplar and stream data.\n\n")
printf(" See extras/trainola-test-param*.js in the nilmtools " +
"repository\n")
printf(" for examples.\n")
if len(argv) != 1:
raise SystemExit(1)
raise SystemExit(0)
if argv[0] == '-v' or argv[0] == '--version':
printf("%s\n", nilmtools.__version__)
raise SystemExit(0)
try: try:
# Passed in a JSON string (e.g. on the command line) # Passed in a JSON string (e.g. on the command line)