Compare commits

...

9 Commits

Author SHA1 Message Date
Sharon NILM
8b9c5d4898 Fix daemon dependency 2013-07-29 17:40:51 -04:00
cf2c28b0fb Add --daemon flag 2013-07-29 17:16:18 -04:00
87a26c907b Watch for process termination too 2013-07-29 15:08:49 -04:00
def465b57c Improve pipewatch; add nilm-pipewatch script 2013-07-29 14:58:15 -04:00
0589b8d316 start of pipewatch util 2013-07-29 14:10:56 -04:00
9c5f07106d Don't need python-pip 2013-07-20 16:15:29 -04:00
62e11a11c0 Fix issue with column ordering in the exemplars
If the max scale in the exemplar was a column we weren't using, it
would bail out when looking for that correlation later.  Change things
around so exemplars in RAM only keep around the columns we care about.
2013-07-18 22:51:27 -04:00
2bdcee2c36 More helpful error if exemplar stream doesn't exist 2013-07-15 15:19:52 -04:00
6dce8c5296 More output 2013-07-11 18:56:53 -04:00
6 changed files with 245 additions and 14 deletions

View File

@@ -8,13 +8,20 @@ else
@echo "Try 'make install'"
endif
test: test_trainola
test: test_pipewatch
test_pipewatch:
nilmtools/pipewatch.py -t 3 "seq 10 20" "seq 20 30"
test_trainola:
-nilmtool -u http://bucket/nilmdb remove -s min -e max \
/sharon/prep-a-matches
nilmtools/trainola.py "$$(cat extras/trainola-test-param-2.js)"
-nilmtool -u http://bucket/nilmdb remove -s min -e max \
/sharon/prep-a-matches
nilmtools/trainola.py "$$(cat extras/trainola-test-param.js)"
test_cleanup:
nilmtools/cleanup.py -e extras/cleanup.cfg
nilmtools/cleanup.py extras/cleanup.cfg

View File

@@ -5,8 +5,8 @@ by Jim Paris <jim@jtan.com>
Prerequisites:
# Runtime and build environments
sudo apt-get install python2.7 python2.7-dev python-setuptools python-pip
sudo apt-get install python-numpy python-scipy
sudo apt-get install python2.7 python2.7-dev python-setuptools
sudo apt-get install python-numpy python-scipy python-daemon
nilmdb (1.8.1+)

View File

@@ -0,0 +1,29 @@
{ "columns" : [ { "index" : 0, "name" : "P1" },
{ "index" : 1, "name" : "Q1" },
{ "index" : 2, "name" : "P3" } ],
"stream" : "/sharon/prep-a",
"url" : "http://bucket.mit.edu/nilmdb",
"dest_stream" : "/sharon/prep-a-matches",
"start" : 1365153062643133.5,
"end" : 1365168814443575.5,
"exemplars" : [ { "columns" : [ { "index" : 0,
"name" : "P1"
} ],
"dest_column" : 0,
"end" : 1365073657682000,
"name" : "Turn ON",
"start" : 1365073654321000,
"stream" : "/sharon/prep-a",
"url" : "http://bucket.mit.edu/nilmdb"
},
{ "columns" : [ { "index" : 2, "name" : "P3" },
{ "index" : 0, "name" : "P1" } ],
"dest_column" : 1,
"end" : 1365176528818000,
"name" : "Type 2 turn ON",
"start" : 1365176520030000,
"stream" : "/sharon/prep-a",
"url" : "http://bucket.mit.edu/nilmdb"
}
]
}

168
nilmtools/pipewatch.py Executable file
View File

@@ -0,0 +1,168 @@
#!/usr/bin/python
import nilmdb.client
from nilmdb.utils.printf import *
import nilmdb.utils.lock
import nilmtools
import time
import sys
import os
import argparse
import subprocess
import tempfile
import threading
import select
import signal
import Queue
import daemon
def parse_args(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
version = nilmtools.__version__,
description = """\
Pipe data from 'generator' to 'consumer'. This is intended to be
executed frequently from cron, and will exit if another copy is
already running. If 'generator' or 'consumer' returns an error,
or if 'generator' stops sending data for a while, it will exit.
Intended for use with ethstream (generator) and nilm-insert
(consumer). Commands are executed through the shell.
""")
parser.add_argument("-d", "--daemon", action="store_true",
help="Run in background")
parser.add_argument("-l", "--lock", metavar="FILENAME", action="store",
default=tempfile.gettempdir() +
"/nilm-pipewatch.lock",
help="Lock file for detecting running instance")
parser.add_argument("-t", "--timeout", metavar="SECONDS", action="store",
type=float, default=30,
help="Restart if no output from " +
"generator for this long")
group = parser.add_argument_group("commands to execute")
group.add_argument("generator", action="store",
help="Data generator (e.g. \"ethstream -r 8000\")")
group.add_argument("consumer", action="store",
help="Data consumer (e.g. \"nilm-insert /foo/bar\")")
args = parser.parse_args(argv)
return args
def reader_thread(queue, fd):
# Read from a file descriptor, write to queue.
try:
while True:
(r, w, x) = select.select([fd], [], [fd], 0.25)
if x:
raise Exception # generator died?
if not r:
# short timeout -- just try again. This is to catch the
# fd being closed elsewhere, which is only detected
# when select restarts.
continue
data = os.read(fd, 65536)
if data == "": # generator EOF
raise Exception
queue.put(data)
except Exception:
queue.put(None)
def watcher_thread(queue, procs):
# Put None in the queue if either process dies
while True:
for p in procs:
if p.poll() is not None:
queue.put(None)
return
time.sleep(0.25)
def pipewatch(args):
# Run the processes, etc
with open(os.devnull, "r") as devnull:
generator = subprocess.Popen(args.generator, shell = True,
bufsize = -1, close_fds = True,
stdin = devnull,
stdout = subprocess.PIPE,
stderr = None)
consumer = subprocess.Popen(args.consumer, shell = True,
bufsize = -11, close_fds = True,
stdin = subprocess.PIPE,
stdout = None, stderr = None)
queue = Queue.Queue(maxsize = 32)
reader = threading.Thread(target = reader_thread,
args = (queue, generator.stdout.fileno()))
reader.start()
watcher = threading.Thread(target = watcher_thread,
args = (queue, [generator, consumer]))
watcher.start()
try:
while True:
try:
data = queue.get(True, args.timeout)
if data is None:
break
consumer.stdin.write(data)
except Queue.Empty:
# Timeout: kill the generator
fprintf(sys.stderr, "pipewatch: timeout\n")
generator.terminate()
break
generator.stdout.close()
consumer.stdin.close()
except IOError:
fprintf(sys.stderr, "pipewatch: I/O error\n")
def kill(proc):
# Wait for a process to end, or kill it
def poll_timeout(proc, timeout):
for x in range(1+int(timeout / 0.1)):
if proc.poll() is not None:
break
time.sleep(0.1)
return proc.poll()
try:
if poll_timeout(proc, 0.5) is None:
proc.terminate()
if poll_timeout(proc, 0.5) is None:
proc.kill()
except OSError:
pass
return poll_timeout(proc, 0.5)
# Wait for them to die, or kill them
gret = kill(generator)
cret = kill(consumer)
fprintf(sys.stderr, "pipewatch: generator returned %d, " +
"consumer returned %d\n", gret, cret)
if gret == 0 and cret == 0:
sys.exit(0)
sys.exit(1)
def main(argv = None):
args = parse_args(argv)
lockfile = open(args.lock, "w")
if not nilmdb.utils.lock.exclusive_lock(lockfile):
printf("pipewatch process already running (according to %s)\n",
args.lock)
sys.exit(0)
try:
# Run as a daemon if requested, otherwise run directly.
if args.daemon:
with daemon.DaemonContext(files_preserve = [ lockfile ]):
pipewatch(args)
else:
pipewatch(args)
finally:
# Clean up lockfile
try:
os.unlink(args.lock)
except OSError:
pass
if __name__ == "__main__":
main()

View File

@@ -6,6 +6,7 @@ import nilmtools.filter
from nilmdb.utils.time import (timestamp_to_human,
timestamp_to_seconds,
seconds_to_timestamp)
from nilmdb.utils import datetime_tz
from nilmdb.utils.interval import Interval
import numpy as np
@@ -15,6 +16,7 @@ from numpy.core.umath_tests import inner1d
import nilmrun
from collections import OrderedDict
import sys
import time
import functools
import collections
@@ -26,12 +28,12 @@ def build_column_mapping(colinfo, streaminfo):
pull out a dictionary mapping for the column names/numbers."""
columns = OrderedDict()
for c in colinfo:
if (c['name'] in columns.keys() or
c['index'] in columns.values()):
col_num = c['index'] + 1 # skip timestamp
if (c['name'] in columns.keys() or col_num in columns.values()):
raise DataError("duplicated columns")
if (c['index'] < 0 or c['index'] >= streaminfo.layout_count):
raise DataError("bad column number")
columns[c['name']] = c['index']
columns[c['name']] = col_num
if not len(columns):
raise DataError("no columns")
return columns
@@ -52,6 +54,9 @@ class Exemplar(object):
# Get stream info
self.client = nilmdb.client.numpyclient.NumpyClient(self.url)
self.info = nilmtools.filter.get_stream_info(self.client, self.stream)
if not self.info:
raise DataError(sprintf("exemplar stream '%s' does not exist " +
"on server '%s'", self.stream, self.url))
# Build up name => index mapping for the columns
self.columns = build_column_mapping(exinfo['columns'], self.info)
@@ -74,10 +79,17 @@ class Exemplar(object):
maxrows = self.count)
self.data = list(datagen)[0]
# Discard timestamp
self.data = self.data[:,1:]
# Extract just the columns that were specified in self.columns,
# skipping the timestamp.
extract_columns = [ value for (key, value) in self.columns.items() ]
self.data = self.data[:,extract_columns]
# Subtract the mean from each column
# Fix the column indices in e.columns, since we removed/reordered
# columns in self.data
for n, k in enumerate(self.columns):
self.columns[k] = n
# Subtract the means from each column
self.data = self.data - self.data.mean(axis=0)
# Get scale factors for each column by computing dot product
@@ -117,6 +129,10 @@ def peak_detect(data, delta):
lookformax = True
return (mins, maxs)
def timestamp_to_short_human(timestamp):
dt = datetime_tz.datetime_tz.fromtimestamp(timestamp_to_seconds(timestamp))
return dt.strftime("%H:%M:%S")
def trainola_matcher(data, interval, args, insert_func, final_chunk):
"""Perform cross-correlation match"""
( src_columns, dest_count, exemplars ) = args
@@ -138,7 +154,7 @@ def trainola_matcher(data, interval, args, insert_func, final_chunk):
# Compute cross-correlation for each column
for col_name in e.columns:
a = data[:, src_columns[col_name] + 1]
a = data[:, src_columns[col_name]]
b = e.data[:, e.columns[col_name]]
corr = scipy.signal.fftconvolve(a, np.flipud(b), 'valid')[0:valid]
@@ -183,7 +199,10 @@ def trainola_matcher(data, interval, args, insert_func, final_chunk):
insert_func(out)
# Return how many rows we processed
return max(valid, 0)
valid = max(valid, 0)
printf(" [%s] matched %d exemplars in %d rows\n",
timestamp_to_short_human(data[0][0]), np.sum(out[:,1:]), valid)
return valid
def trainola(conf):
print "Trainola", nilmtools.__version__
@@ -247,14 +266,20 @@ def trainola(conf):
src.path, layout = src.layout, maxrows = rows)
inserter = functools.partial(dest_client.stream_insert_numpy_context,
dest.path)
start = time.time()
processed_time = 0
printf("Processing intervals:\n")
for interval in intervals:
printf("Processing interval:\n")
printf(" %s\n", interval.human_string())
printf("%s\n", interval.human_string())
nilmtools.filter.process_numpy_interval(
interval, extractor, inserter, rows * 3,
trainola_matcher, (src_columns, dest.layout_count, exemplars))
processed_time += (timestamp_to_seconds(interval.end) -
timestamp_to_seconds(interval.start))
elapsed = max(time.time() - start, 1e-3)
return "done"
printf("Done. Processed %.2f seconds per second.\n",
processed_time / elapsed)
def main(argv = None):
import simplejson as json

View File

@@ -64,6 +64,7 @@ setup(name='nilmtools',
install_requires = [ 'nilmdb >= 1.8.1',
'numpy',
'scipy',
'python-daemon >= 1.5',
#'matplotlib',
],
packages = [ 'nilmtools',
@@ -80,6 +81,7 @@ setup(name='nilmtools',
'nilm-cleanup = nilmtools.cleanup:main',
'nilm-median = nilmtools.median:main',
'nilm-trainola = nilmtools.trainola:main',
'nilm-pipewatch = nilmtools.pipewatch:main',
],
},
zip_safe = False,