Compare commits

...

7 Commits

Author SHA1 Message Date
Sharon NILM
8b9c5d4898 Fix daemon dependency 2013-07-29 17:40:51 -04:00
cf2c28b0fb Add --daemon flag 2013-07-29 17:16:18 -04:00
87a26c907b Watch for process termination too 2013-07-29 15:08:49 -04:00
def465b57c Improve pipewatch; add nilm-pipewatch script 2013-07-29 14:58:15 -04:00
0589b8d316 start of pipewatch util 2013-07-29 14:10:56 -04:00
9c5f07106d Don't need python-pip 2013-07-20 16:15:29 -04:00
62e11a11c0 Fix issue with column ordering in the exemplars
If the max scale in the exemplar was a column we weren't using, it
would bail out when looking for that correlation later.  Change things
around so exemplars in RAM only keep around the columns we care about.
2013-07-18 22:51:27 -04:00
6 changed files with 223 additions and 10 deletions

View File

@@ -8,13 +8,20 @@ else
@echo "Try 'make install'"
endif
test: test_trainola
test: test_pipewatch
test_pipewatch:
nilmtools/pipewatch.py -t 3 "seq 10 20" "seq 20 30"
test_trainola:
-nilmtool -u http://bucket/nilmdb remove -s min -e max \
/sharon/prep-a-matches
nilmtools/trainola.py "$$(cat extras/trainola-test-param-2.js)"
-nilmtool -u http://bucket/nilmdb remove -s min -e max \
/sharon/prep-a-matches
nilmtools/trainola.py "$$(cat extras/trainola-test-param.js)"
test_cleanup:
nilmtools/cleanup.py -e extras/cleanup.cfg
nilmtools/cleanup.py extras/cleanup.cfg

View File

@@ -5,8 +5,8 @@ by Jim Paris <jim@jtan.com>
Prerequisites:
# Runtime and build environments
sudo apt-get install python2.7 python2.7-dev python-setuptools python-pip
sudo apt-get install python-numpy python-scipy
sudo apt-get install python2.7 python2.7-dev python-setuptools
sudo apt-get install python-numpy python-scipy python-daemon
nilmdb (1.8.1+)

View File

@@ -0,0 +1,29 @@
{ "columns" : [ { "index" : 0, "name" : "P1" },
{ "index" : 1, "name" : "Q1" },
{ "index" : 2, "name" : "P3" } ],
"stream" : "/sharon/prep-a",
"url" : "http://bucket.mit.edu/nilmdb",
"dest_stream" : "/sharon/prep-a-matches",
"start" : 1365153062643133.5,
"end" : 1365168814443575.5,
"exemplars" : [ { "columns" : [ { "index" : 0,
"name" : "P1"
} ],
"dest_column" : 0,
"end" : 1365073657682000,
"name" : "Turn ON",
"start" : 1365073654321000,
"stream" : "/sharon/prep-a",
"url" : "http://bucket.mit.edu/nilmdb"
},
{ "columns" : [ { "index" : 2, "name" : "P3" },
{ "index" : 0, "name" : "P1" } ],
"dest_column" : 1,
"end" : 1365176528818000,
"name" : "Type 2 turn ON",
"start" : 1365176520030000,
"stream" : "/sharon/prep-a",
"url" : "http://bucket.mit.edu/nilmdb"
}
]
}

168
nilmtools/pipewatch.py Executable file
View File

@@ -0,0 +1,168 @@
#!/usr/bin/python
import nilmdb.client
from nilmdb.utils.printf import *
import nilmdb.utils.lock
import nilmtools
import time
import sys
import os
import argparse
import subprocess
import tempfile
import threading
import select
import signal
import Queue
import daemon
def parse_args(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
version = nilmtools.__version__,
description = """\
Pipe data from 'generator' to 'consumer'. This is intended to be
executed frequently from cron, and will exit if another copy is
already running. If 'generator' or 'consumer' returns an error,
or if 'generator' stops sending data for a while, it will exit.
Intended for use with ethstream (generator) and nilm-insert
(consumer). Commands are executed through the shell.
""")
parser.add_argument("-d", "--daemon", action="store_true",
help="Run in background")
parser.add_argument("-l", "--lock", metavar="FILENAME", action="store",
default=tempfile.gettempdir() +
"/nilm-pipewatch.lock",
help="Lock file for detecting running instance")
parser.add_argument("-t", "--timeout", metavar="SECONDS", action="store",
type=float, default=30,
help="Restart if no output from " +
"generator for this long")
group = parser.add_argument_group("commands to execute")
group.add_argument("generator", action="store",
help="Data generator (e.g. \"ethstream -r 8000\")")
group.add_argument("consumer", action="store",
help="Data consumer (e.g. \"nilm-insert /foo/bar\")")
args = parser.parse_args(argv)
return args
def reader_thread(queue, fd):
# Read from a file descriptor, write to queue.
try:
while True:
(r, w, x) = select.select([fd], [], [fd], 0.25)
if x:
raise Exception # generator died?
if not r:
# short timeout -- just try again. This is to catch the
# fd being closed elsewhere, which is only detected
# when select restarts.
continue
data = os.read(fd, 65536)
if data == "": # generator EOF
raise Exception
queue.put(data)
except Exception:
queue.put(None)
def watcher_thread(queue, procs):
# Put None in the queue if either process dies
while True:
for p in procs:
if p.poll() is not None:
queue.put(None)
return
time.sleep(0.25)
def pipewatch(args):
# Run the processes, etc
with open(os.devnull, "r") as devnull:
generator = subprocess.Popen(args.generator, shell = True,
bufsize = -1, close_fds = True,
stdin = devnull,
stdout = subprocess.PIPE,
stderr = None)
consumer = subprocess.Popen(args.consumer, shell = True,
bufsize = -11, close_fds = True,
stdin = subprocess.PIPE,
stdout = None, stderr = None)
queue = Queue.Queue(maxsize = 32)
reader = threading.Thread(target = reader_thread,
args = (queue, generator.stdout.fileno()))
reader.start()
watcher = threading.Thread(target = watcher_thread,
args = (queue, [generator, consumer]))
watcher.start()
try:
while True:
try:
data = queue.get(True, args.timeout)
if data is None:
break
consumer.stdin.write(data)
except Queue.Empty:
# Timeout: kill the generator
fprintf(sys.stderr, "pipewatch: timeout\n")
generator.terminate()
break
generator.stdout.close()
consumer.stdin.close()
except IOError:
fprintf(sys.stderr, "pipewatch: I/O error\n")
def kill(proc):
# Wait for a process to end, or kill it
def poll_timeout(proc, timeout):
for x in range(1+int(timeout / 0.1)):
if proc.poll() is not None:
break
time.sleep(0.1)
return proc.poll()
try:
if poll_timeout(proc, 0.5) is None:
proc.terminate()
if poll_timeout(proc, 0.5) is None:
proc.kill()
except OSError:
pass
return poll_timeout(proc, 0.5)
# Wait for them to die, or kill them
gret = kill(generator)
cret = kill(consumer)
fprintf(sys.stderr, "pipewatch: generator returned %d, " +
"consumer returned %d\n", gret, cret)
if gret == 0 and cret == 0:
sys.exit(0)
sys.exit(1)
def main(argv = None):
args = parse_args(argv)
lockfile = open(args.lock, "w")
if not nilmdb.utils.lock.exclusive_lock(lockfile):
printf("pipewatch process already running (according to %s)\n",
args.lock)
sys.exit(0)
try:
# Run as a daemon if requested, otherwise run directly.
if args.daemon:
with daemon.DaemonContext(files_preserve = [ lockfile ]):
pipewatch(args)
else:
pipewatch(args)
finally:
# Clean up lockfile
try:
os.unlink(args.lock)
except OSError:
pass
if __name__ == "__main__":
main()

View File

@@ -28,12 +28,12 @@ def build_column_mapping(colinfo, streaminfo):
pull out a dictionary mapping for the column names/numbers."""
columns = OrderedDict()
for c in colinfo:
if (c['name'] in columns.keys() or
c['index'] in columns.values()):
col_num = c['index'] + 1 # skip timestamp
if (c['name'] in columns.keys() or col_num in columns.values()):
raise DataError("duplicated columns")
if (c['index'] < 0 or c['index'] >= streaminfo.layout_count):
raise DataError("bad column number")
columns[c['name']] = c['index']
columns[c['name']] = col_num
if not len(columns):
raise DataError("no columns")
return columns
@@ -79,10 +79,17 @@ class Exemplar(object):
maxrows = self.count)
self.data = list(datagen)[0]
# Discard timestamp
self.data = self.data[:,1:]
# Extract just the columns that were specified in self.columns,
# skipping the timestamp.
extract_columns = [ value for (key, value) in self.columns.items() ]
self.data = self.data[:,extract_columns]
# Subtract the mean from each column
# Fix the column indices in e.columns, since we removed/reordered
# columns in self.data
for n, k in enumerate(self.columns):
self.columns[k] = n
# Subtract the means from each column
self.data = self.data - self.data.mean(axis=0)
# Get scale factors for each column by computing dot product
@@ -147,7 +154,7 @@ def trainola_matcher(data, interval, args, insert_func, final_chunk):
# Compute cross-correlation for each column
for col_name in e.columns:
a = data[:, src_columns[col_name] + 1]
a = data[:, src_columns[col_name]]
b = e.data[:, e.columns[col_name]]
corr = scipy.signal.fftconvolve(a, np.flipud(b), 'valid')[0:valid]

View File

@@ -64,6 +64,7 @@ setup(name='nilmtools',
install_requires = [ 'nilmdb >= 1.8.1',
'numpy',
'scipy',
'python-daemon >= 1.5',
#'matplotlib',
],
packages = [ 'nilmtools',
@@ -80,6 +81,7 @@ setup(name='nilmtools',
'nilm-cleanup = nilmtools.cleanup:main',
'nilm-median = nilmtools.median:main',
'nilm-trainola = nilmtools.trainola:main',
'nilm-pipewatch = nilmtools.pipewatch:main',
],
},
zip_safe = False,