Compare commits

...

17 Commits

14 changed files with 140 additions and 76 deletions

View File

@@ -1,14 +1,14 @@
#URL="http://bucket.mit.edu:8080/nilmdb"
URL="http://localhost/nilmdb"
all:
ifeq ($(INSIDE_EMACS), t)
@make test
else
@echo "Try 'make install'"
endif
all: test
test: test_trainola3
test:
ifeq ($(INSIDE_EMACS), t)
@make test_sinefit
else
@echo 'No test suite for nilmtools. Try "make install"'
endif
test_pipewatch:
nilmtools/pipewatch.py -t 3 "seq 10 20" "seq 20 30"
@@ -58,6 +58,14 @@ test_prep: /tmp/raw.dat
nilmtools/prep.py -c 2 /test/raw /test/sinefit /test/prep
nilmtool extract -s min -e max /test/prep | head -20
test_sinefit:
make install >/dev/null 2>&1
-nilmtool destroy -R /test/sinefit
nilmtool create /test/sinefit float32_3
nilmtools/sinefit.py -c 5 -s '2013/03/25 09:11:00' \
-e '2013/03/25 10:11:00' /sharon/raw /test/sinefit
nilmtool extract -s min -e max /test/sinefit | head -20
test_decimate:
-@nilmtool destroy /lees-compressor/no-leak/raw/4 || true
-@nilmtool destroy /lees-compressor/no-leak/raw/16 || true

View File

@@ -6,3 +6,4 @@ keep = 2w
[/sharon/sinefit]
keep = 1y
decimated = false

View File

@@ -1,9 +1,15 @@
# Install this by running "crontab crontab" (will replace existing crontab)
SHELL=/bin/bash
PATH=/usr/local/bin:/usr/local/sbin:/usr/bin:/usr/sbin:/bin:/sbin
# m h dom mon dow cmd
# Run NilmDB processing every 5 minutes
*/5 * * * * chronic /home/nilm/data/process.sh
# Check the capture process every minute
*/1 * * * * chronic /home/nilm/data/capture.sh
# Try frequently restarting the capture process in case it died
*/5 * * * * chronic /home/nilm/data/capture.sh
# Run fsck at startup
@reboot chronic nilmdb-fsck --fix --no-data /home/nilm/data/db/

View File

@@ -13,16 +13,20 @@ if ! flock -n -x 99 ; then
fi
trap 'rm -f "$LOCKFILE"' 0
# sinefit on phase A voltage
# redirect stdout/stderr to log, but keep it on the console too
exec > >(tee /home/nilm/data/process.log)
exec 2> >(tee -a /home/nilm/data/process.log >&2)
echo "sinefit on phase A voltage"
nilm-sinefit -c 5 /sharon/raw /sharon/sinefit
# prep on A, B, C with appropriate rotations
echo "prep on A, B, C with appropriate rotations"
nilm-prep -c 1 -r 0 /sharon/raw /sharon/sinefit /sharon/prep-a
nilm-prep -c 2 -r 120 /sharon/raw /sharon/sinefit /sharon/prep-b
nilm-prep -c 3 -r 240 /sharon/raw /sharon/sinefit /sharon/prep-c
# decimate raw and prep data
echo "decimate raw and prep data"
nilm-decimate-auto /sharon/raw /sharon/prep*
# run cleanup
echo "run cleanup"
nilm-cleanup --yes /home/nilm/data/cleanup.cfg

View File

@@ -12,6 +12,8 @@ import sys
def main(argv = None):
f = nilmtools.filter.Filter()
parser = f.setup_parser("Copy a stream")
parser.add_argument('-n', '--nometa', action='store_true',
help="Don't copy or check metadata")
# Parse arguments
try:
@@ -25,8 +27,9 @@ def main(argv = None):
raise SystemExit(1)
# Copy metadata
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata(meta)
if not args.nometa:
meta = f.client_src.stream_get_metadata(f.src.path)
f.check_dest_metadata(meta)
# Copy all rows of data using the faster Numpy interfaces
extractor = NumpyClient(f.src.url).stream_extract_numpy

View File

@@ -16,6 +16,8 @@ def main(argv = None):
Example: %(prog)s -u http://host1/nilmdb -U http://host2/nilmdb /sharon/*
""", skip_paths = True)
parser.add_argument('-n', '--nometa', action='store_true',
help="Don't copy or check metadata")
parser.add_argument("path", action="store", nargs="+",
help='Wildcard paths to copy')
args = parser.parse_args(argv)
@@ -56,6 +58,8 @@ def main(argv = None):
new_argv.extend(["--end", "@" + repr(args.end)])
if args.dry_run:
new_argv.extend(["--dry-run"])
if args.nometa:
new_argv.extend(["--nometa"])
if args.force_metadata:
new_argv.extend(["--force-metadata"])
new_argv.extend([stream[0], stream[0]])

View File

@@ -21,9 +21,9 @@ def main(argv = None):
parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/",
help="NilmDB server URL (default: %(default)s)")
parser.add_argument('-f', '--factor', action='store', default=4, type=int,
parser.add_argument("-f", "--factor", action="store", default=4, type=int,
help='Decimation factor (default: %(default)s)')
parser.add_argument("--force-metadata", action="store_true",
parser.add_argument("-F", "--force-metadata", action="store_true",
default = False,
help="Force metadata changes if the dest "
"doesn't match")

View File

@@ -133,6 +133,34 @@ def process_numpy_interval(interval, extractor, inserter, warn_rows,
# we'll not miss any data when we run again later.
insert_ctx.update_end(old_array[processed][0])
def example_callback_function(data, interval, args, insert_func, final):
"""Example of the signature for the function that gets passed
to process_numpy_interval.
'data': array of data to process -- may be empty
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
If unprocessed data remains after 'final' is True, the interval
being inserted will be ended at the timestamp of the first
unprocessed data point.
"""
raise NotImplementedError("example_callback_function does nothing")
class Filter(object):
def __init__(self, parser_description = None):
@@ -144,8 +172,8 @@ class Filter(object):
self.dest = None
self.start = None
self.end = None
self.interhost = False
self.force_metadata = False
self._interhost = False
self._force_metadata = False
if parser_description is not None:
self.setup_parser(parser_description)
self.parse_args()
@@ -178,7 +206,7 @@ class Filter(object):
default = False,
help="Just print intervals that would be "
"processed")
group.add_argument("--force-metadata", action="store_true",
group.add_argument("-F", "--force-metadata", action="store_true",
default = False,
help="Force metadata changes if the dest "
"doesn't match")
@@ -208,12 +236,12 @@ class Filter(object):
if dest_url is None:
dest_url = url
if url != dest_url:
self.interhost = True
self._interhost = True
self._client_src = Client(url)
self._client_dest = Client(dest_url)
if (not self.interhost) and (srcpath == destpath):
if (not self._interhost) and (srcpath == destpath):
raise ArgumentError("source and destination path must be different")
# Open the streams
@@ -231,8 +259,8 @@ class Filter(object):
# Print info
if not quiet:
print "Source:", self.src.string(self.interhost)
print " Dest:", self.dest.string(self.interhost)
print "Source:", self.src.string(self._interhost)
print " Dest:", self.dest.string(self._interhost)
def parse_args(self, argv = None):
"""Parse arguments from a command line"""
@@ -241,7 +269,7 @@ class Filter(object):
self.set_args(args.url, args.dest_url, args.srcpath, args.destpath,
args.start, args.end, quiet = False, parsed_args = args)
self.force_metadata = args.force_metadata
self._force_metadata = args.force_metadata
if args.dry_run:
for interval in self.intervals():
print interval.human_string()
@@ -252,7 +280,7 @@ class Filter(object):
"""Generate all the intervals that this filter should process"""
self._using_client = True
if self.interhost:
if self._interhost:
# Do the difference ourselves
s_intervals = ( Interval(start, end)
for (start, end) in
@@ -289,10 +317,11 @@ class Filter(object):
str(e), toparse))
def check_dest_metadata(self, data):
"""See if the metadata jives, and complain if it doesn't. If
there's no conflict, update the metadata to match 'data'."""
"""See if the metadata jives, and complain if it doesn't. For
each key in data, if the stream contains the key, it must match
values. If the stream does not contain the key, it is created."""
metadata = self._client_dest.stream_get_metadata(self.dest.path)
if not self.force_metadata:
if not self._force_metadata:
for key in data:
wanted = data[key]
if not isinstance(wanted, basestring):
@@ -329,30 +358,10 @@ class Filter(object):
If 'intervals' is not None, process those intervals instead of
the default list.
'function' should be defined as:
# def function(data, interval, args, insert_func, final)
'data': array of data to process -- may be empty
'interval': overall interval we're processing (but not necessarily
the interval of this particular chunk of data)
'args': opaque arguments passed to process_numpy
'insert_func': function to call in order to insert array of data.
Should be passed a 2-dimensional array of data to insert.
Data timestamps must be within the provided interval.
'final': True if this is the last bit of data for this
contiguous interval, False otherwise.
Return value of 'function' is the number of data rows processed.
Unprocessed data will be provided again in a subsequent call
(unless 'final' is True).
If unprocessed data remains after 'final' is True, the interval
being inserted will be ended at the timestamp of the first
unprocessed data point.
'function' should be defined with the same interface as
nilmtools.filter.example_callback_function. See the
documentation of that for details. 'args' are passed to
'function'.
"""
extractor = NumpyClient(self.src.url).stream_extract_numpy
inserter = NumpyClient(self.dest.url).stream_insert_numpy_context

View File

@@ -53,7 +53,7 @@ def parse_args(argv = None):
is stepped forward to match 'clock'.
- If 'data' is running ahead, there is overlap in the data, and an
error is raised. If '--ignore' is specified, the current file
error is raised. If '--skip' is specified, the current file
is skipped instead of raising an error.
"""))
parser.add_argument("-u", "--url", action="store",

View File

@@ -37,17 +37,21 @@ def sfit4(data, fs):
i = argmax(F[0:int(N/2)] + F[1:int(N/2+1)])
# Interpolate FFT to get a better result (from Markus [B37])
U1 = real(Fc[i])
U2 = real(Fc[i+1])
V1 = imag(Fc[i])
V2 = imag(Fc[i+1])
n = 2 * pi / N
ni1 = n * i
ni2 = n * (i+1)
K = ((V2-V1)*sin(ni1) + (U2-U1)*cos(ni1)) / (U2-U1)
Z1 = V1 * (K - cos(ni1)) / sin(ni1) + U1
Z2 = V2 * (K - cos(ni2)) / sin(ni2) + U2
i = arccos((Z2*cos(ni2) - Z1*cos(ni1)) / (Z2-Z1)) / n
try:
U1 = real(Fc[i])
U2 = real(Fc[i+1])
V1 = imag(Fc[i])
V2 = imag(Fc[i+1])
n = 2 * pi / N
ni1 = n * i
ni2 = n * (i+1)
K = ((V2-V1)*sin(ni1) + (U2-U1)*cos(ni1)) / (U2-U1)
Z1 = V1 * (K - cos(ni1)) / sin(ni1) + U1
Z2 = V2 * (K - cos(ni2)) / sin(ni2) + U2
i = arccos((Z2*cos(ni2) - Z1*cos(ni1)) / (Z2-Z1)) / n
except Exception:
# Just go with the biggest FFT peak
i = argmax(F[0:int(N/2)])
# Convert to Hz
f0 = i * float(fs) / N

View File

@@ -84,13 +84,16 @@ def pipewatch(args):
bufsize = -1, close_fds = True,
stdin = devnull,
stdout = subprocess.PIPE,
stderr = None)
stderr = None,
preexec_fn = os.setpgrp)
consumer = subprocess.Popen(args.consumer, shell = True,
bufsize = -11, close_fds = True,
stdin = subprocess.PIPE,
stdout = None, stderr = None)
stdout = None,
stderr = None,
preexec_fn = os.setpgrp)
queue = Queue.Queue(maxsize = 32)
queue = Queue.Queue(maxsize = 4)
reader = threading.Thread(target = reader_thread,
args = (queue, generator.stdout.fileno()))
reader.start()
@@ -125,16 +128,21 @@ def pipewatch(args):
return proc.poll()
try:
if poll_timeout(proc, 0.5) is None:
proc.terminate()
os.killpg(proc.pid, signal.SIGTERM)
if poll_timeout(proc, 0.5) is None:
proc.kill()
os.killpg(proc.pid, signal.SIGKILL)
except OSError:
pass
return poll_timeout(proc, 0.5)
# Wait for them to die, or kill them
gret = kill(generator)
cret = kill(consumer)
gret = kill(generator)
# Consume all remaining data in the queue until the reader
# and watcher threads are done
while reader.is_alive() or watcher.is_alive():
queue.get(True, 0.1)
fprintf(sys.stderr, "pipewatch: generator returned %d, " +
"consumer returned %d\n", gret, cret)

View File

@@ -81,7 +81,8 @@ def main(argv = None):
f.check_dest_metadata({ "prep_raw_source": f.src.path,
"prep_sinefit_source": sinefit.path,
"prep_column": args.column,
"prep_rotation": repr(rotation) })
"prep_rotation": repr(rotation),
"prep_nshift": args.nshift })
# Find the intersection of the usual set of intervals we'd filter,
# and the intervals actually present in sinefit data. This is

View File

@@ -96,8 +96,11 @@ def process(data, interval, args, insert_function, final):
rows = data.shape[0]
# Estimate sampling frequency from timestamps
fs = (rows-1) / (timestamp_to_seconds(data[-1][0]) -
timestamp_to_seconds(data[0][0]))
ts_min = timestamp_to_seconds(data[0][0])
ts_max = timestamp_to_seconds(data[-1][0])
if ts_min >= ts_max:
return 0
fs = (rows-1) / (ts_max - ts_min)
# Pull out about 3.5 periods of data at once;
# we'll expect to match 3 zero crossings in each window

View File

@@ -287,8 +287,21 @@ def main(argv = None):
if argv is None:
argv = sys.argv[1:]
if len(argv) != 1:
raise DataError("need one argument, either a dictionary or JSON string")
if len(argv) != 1 or argv[0] == '-h' or argv[0] == '--help':
printf("usage: %s [-h] [-v] <json-config-dictionary>\n\n", sys.argv[0])
printf(" Where <json-config-dictionary> is a JSON-encoded " +
"dictionary string\n")
printf(" with exemplar and stream data.\n\n")
printf(" See extras/trainola-test-param*.js in the nilmtools " +
"repository\n")
printf(" for examples.\n")
if len(argv) != 1:
raise SystemExit(1)
raise SystemExit(0)
if argv[0] == '-v' or argv[0] == '--version':
printf("%s\n", nilmtools.__version__)
raise SystemExit(0)
try:
# Passed in a JSON string (e.g. on the command line)