Compare commits

...

5 Commits

Author SHA1 Message Date
a4d4bc22fc Add --skip option to nilm-insert 2013-07-30 18:25:47 -04:00
6090dd6112 prep: only process intervals present in both raw & sinefit 2013-07-30 14:55:06 -04:00
Sharon NILM
9c0d9ad324 Sample scripts from Sharon 2013-07-29 18:37:55 -04:00
Sharon NILM
8b9c5d4898 Fix daemon dependency 2013-07-29 17:40:51 -04:00
cf2c28b0fb Add --daemon flag 2013-07-29 17:16:18 -04:00
11 changed files with 176 additions and 83 deletions

View File

@@ -8,7 +8,7 @@ else
@echo "Try 'make install'" @echo "Try 'make install'"
endif endif
test: test_pipewatch test: test_insert
test_pipewatch: test_pipewatch:
nilmtools/pipewatch.py -t 3 "seq 10 20" "seq 20 30" nilmtools/pipewatch.py -t 3 "seq 10 20" "seq 20 30"
@@ -27,7 +27,7 @@ test_cleanup:
nilmtools/cleanup.py extras/cleanup.cfg nilmtools/cleanup.py extras/cleanup.cfg
test_insert: test_insert:
nilmtools/insert.py --file --dry-run /test/foo </dev/null nilmtools/insert.py --skip --file --dry-run /foo/bar ~/data/20130311T2100.prep1.gz ~/data/20130311T2100.prep1.gz ~/data/20130311T2200.prep1.gz
test_copy: test_copy:
nilmtools/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees* nilmtools/copy_wildcard.py -U "http://nilmdb.com/bucket/" -D /lees*
@@ -46,7 +46,8 @@ test_prep: /tmp/raw.dat
nilmtool create /test/sinefit float32_3 nilmtool create /test/sinefit float32_3
nilmtool create /test/prep float32_8 nilmtool create /test/prep float32_8
nilmtool insert -s '@0' -t -r 8000 /test/raw /tmp/raw.dat nilmtool insert -s '@0' -t -r 8000 /test/raw /tmp/raw.dat
nilmtools/sinefit.py -a 0.5 -c 1 /test/raw /test/sinefit nilmtools/sinefit.py -a 0.5 -c 1 -s '@0' -e '@5000000' /test/raw /test/sinefit
nilmtools/prep.py -c 2 /test/raw /test/sinefit /test/prep
nilmtools/prep.py -c 2 /test/raw /test/sinefit /test/prep nilmtools/prep.py -c 2 /test/raw /test/sinefit /test/prep
nilmtool extract -s min -e max /test/prep | head -20 nilmtool extract -s min -e max /test/prep | head -20

View File

@@ -6,9 +6,9 @@ Prerequisites:
# Runtime and build environments # Runtime and build environments
sudo apt-get install python2.7 python2.7-dev python-setuptools sudo apt-get install python2.7 python2.7-dev python-setuptools
sudo apt-get install python-numpy python-scipy sudo apt-get install python-numpy python-scipy python-daemon
nilmdb (1.8.1+) nilmdb (1.8.5+)
Install: Install:

View File

@@ -0,0 +1,10 @@
#!/bin/bash
# Start the ethstream capture using nilm-pipewatch
# Bail out on errors
set -e
nilm-pipewatch --daemon --lock "/tmp/nilmdb-capture.lock" --timeout 30 \
"ethstream -a 192.168.1.209 -n 9 -r 8000 -N" \
"nilm-insert -m 10 -r 8000 --live /sharon/raw"

View File

@@ -0,0 +1,8 @@
[/sharon/prep-*]
keep = 1y
[/sharon/raw]
keep = 2w
[/sharon/sinefit]
keep = 1y

View File

@@ -0,0 +1,9 @@
# Install this by running "crontab crontab" (will replace existing crontab)
# m h dom mon dow cmd
# Run NilmDB processing every 5 minutes
*/5 * * * * chronic /home/nilm/data/process.sh
# Check the capture process every minute
*/1 * * * * chronic /home/nilm/data/capture.sh

View File

@@ -0,0 +1,28 @@
#!/bin/bash
# Run all necessary processing on NilmDB data.
# Bail out on errors
set -e
# Ensure only one copy of this code runs at a time:
LOCKFILE="/tmp/nilmdb-process.lock"
exec 99>"$LOCKFILE"
if ! flock -n -x 99 ; then
echo "NilmDB processing already running, giving up..."
exit 0
fi
trap 'rm -f "$LOCKFILE"' 0
# sinefit on phase A voltage
nilm-sinefit -c 5 /sharon/raw /sharon/sinefit
# prep on A, B, C with appropriate rotations
nilm-prep -c 1 -r 0 /sharon/raw /sharon/sinefit /sharon/prep-a
nilm-prep -c 2 -r 120 /sharon/raw /sharon/sinefit /sharon/prep-b
nilm-prep -c 3 -r 240 /sharon/raw /sharon/sinefit /sharon/prep-c
# decimate raw and prep data
nilm-decimate-auto /sharon/raw /sharon/prep*
# run cleanup
nilm-cleanup --yes /home/nilm/data/cleanup.cfg

View File

@@ -316,7 +316,8 @@ class Filter(object):
self._client_dest.stream_update_metadata(self.dest.path, data) self._client_dest.stream_update_metadata(self.dest.path, data)
# The main filter processing method. # The main filter processing method.
def process_numpy(self, function, args = None, rows = 100000): def process_numpy(self, function, args = None, rows = 100000,
intervals = None):
"""Calls process_numpy_interval for each interval that currently """Calls process_numpy_interval for each interval that currently
exists in self.src, but doesn't exist in self.dest. It will exists in self.src, but doesn't exist in self.dest. It will
process the data in chunks as follows: process the data in chunks as follows:
@@ -325,6 +326,9 @@ class Filter(object):
corresponding to the data. The data is converted to a Numpy corresponding to the data. The data is converted to a Numpy
array in chunks of 'rows' rows at a time. array in chunks of 'rows' rows at a time.
If 'intervals' is not None, process those intervals instead of
the default list.
'function' should be defined as: 'function' should be defined as:
# def function(data, interval, args, insert_func, final) # def function(data, interval, args, insert_func, final)
@@ -358,7 +362,7 @@ class Filter(object):
maxrows = rows) maxrows = rows)
inserter_func = functools.partial(inserter, self.dest.path) inserter_func = functools.partial(inserter, self.dest.path)
for interval in self.intervals(): for interval in (intervals or self.intervals()):
print "Processing", interval.human_string() print "Processing", interval.human_string()
process_numpy_interval(interval, extractor_func, inserter_func, process_numpy_interval(interval, extractor_func, inserter_func,
rows * 3, function, args) rows * 3, function, args)

View File

@@ -53,7 +53,8 @@ def parse_args(argv = None):
is stepped forward to match 'clock'. is stepped forward to match 'clock'.
- If 'data' is running ahead, there is overlap in the data, and an - If 'data' is running ahead, there is overlap in the data, and an
error is raised. error is raised. If '--ignore' is specified, the current file
is skipped instead of raising an error.
""")) """))
parser.add_argument("-u", "--url", action="store", parser.add_argument("-u", "--url", action="store",
default="http://localhost/nilmdb/", default="http://localhost/nilmdb/",
@@ -61,6 +62,8 @@ def parse_args(argv = None):
group = parser.add_argument_group("Misc options") group = parser.add_argument_group("Misc options")
group.add_argument("-D", "--dry-run", action="store_true", group.add_argument("-D", "--dry-run", action="store_true",
help="Parse files, but don't insert any data") help="Parse files, but don't insert any data")
group.add_argument("-s", "--skip", action="store_true",
help="Skip files if the data would overlap")
group.add_argument("-m", "--max-gap", action="store", default=10.0, group.add_argument("-m", "--max-gap", action="store", default=10.0,
metavar="SEC", type=float, metavar="SEC", type=float,
help="Max discrepency between clock and data " help="Max discrepency between clock and data "
@@ -235,6 +238,10 @@ def main(argv = None):
"is %s but clock time is only %s", "is %s but clock time is only %s",
timestamp_to_human(data_ts), timestamp_to_human(data_ts),
timestamp_to_human(clock_ts)) timestamp_to_human(clock_ts))
if args.skip:
printf("%s\n", err)
printf("Skipping the remainder of this file\n")
break
raise ParseError(filename, err) raise ParseError(filename, err)
if (data_ts + max_gap) < clock_ts: if (data_ts + max_gap) < clock_ts:

View File

@@ -15,6 +15,7 @@ import threading
import select import select
import signal import signal
import Queue import Queue
import daemon
def parse_args(argv = None): def parse_args(argv = None):
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
@@ -29,6 +30,8 @@ def parse_args(argv = None):
Intended for use with ethstream (generator) and nilm-insert Intended for use with ethstream (generator) and nilm-insert
(consumer). Commands are executed through the shell. (consumer). Commands are executed through the shell.
""") """)
parser.add_argument("-d", "--daemon", action="store_true",
help="Run in background")
parser.add_argument("-l", "--lock", metavar="FILENAME", action="store", parser.add_argument("-l", "--lock", metavar="FILENAME", action="store",
default=tempfile.gettempdir() + default=tempfile.gettempdir() +
"/nilm-pipewatch.lock", "/nilm-pipewatch.lock",
@@ -74,82 +77,92 @@ def watcher_thread(queue, procs):
return return
time.sleep(0.25) time.sleep(0.25)
def pipewatch(args):
# Run the processes, etc
with open(os.devnull, "r") as devnull:
generator = subprocess.Popen(args.generator, shell = True,
bufsize = -1, close_fds = True,
stdin = devnull,
stdout = subprocess.PIPE,
stderr = None)
consumer = subprocess.Popen(args.consumer, shell = True,
bufsize = -11, close_fds = True,
stdin = subprocess.PIPE,
stdout = None, stderr = None)
queue = Queue.Queue(maxsize = 32)
reader = threading.Thread(target = reader_thread,
args = (queue, generator.stdout.fileno()))
reader.start()
watcher = threading.Thread(target = watcher_thread,
args = (queue, [generator, consumer]))
watcher.start()
try:
while True:
try:
data = queue.get(True, args.timeout)
if data is None:
break
consumer.stdin.write(data)
except Queue.Empty:
# Timeout: kill the generator
fprintf(sys.stderr, "pipewatch: timeout\n")
generator.terminate()
break
generator.stdout.close()
consumer.stdin.close()
except IOError:
fprintf(sys.stderr, "pipewatch: I/O error\n")
def kill(proc):
# Wait for a process to end, or kill it
def poll_timeout(proc, timeout):
for x in range(1+int(timeout / 0.1)):
if proc.poll() is not None:
break
time.sleep(0.1)
return proc.poll()
try:
if poll_timeout(proc, 0.5) is None:
proc.terminate()
if poll_timeout(proc, 0.5) is None:
proc.kill()
except OSError:
pass
return poll_timeout(proc, 0.5)
# Wait for them to die, or kill them
gret = kill(generator)
cret = kill(consumer)
fprintf(sys.stderr, "pipewatch: generator returned %d, " +
"consumer returned %d\n", gret, cret)
if gret == 0 and cret == 0:
sys.exit(0)
sys.exit(1)
def main(argv = None): def main(argv = None):
args = parse_args(argv) args = parse_args(argv)
with open(args.lock, "w") as lockfile: lockfile = open(args.lock, "w")
if not nilmdb.utils.lock.exclusive_lock(lockfile): if not nilmdb.utils.lock.exclusive_lock(lockfile):
printf("pipewatch process already running (according to %s)\n", printf("pipewatch process already running (according to %s)\n",
args.lock) args.lock)
sys.exit(0) sys.exit(0)
with open(os.devnull, "r") as devnull:
generator = subprocess.Popen(args.generator, shell = True,
bufsize = -1, close_fds = True,
stdin = devnull,
stdout = subprocess.PIPE,
stderr = None)
consumer = subprocess.Popen(args.consumer, shell = True,
bufsize = -11, close_fds = True,
stdin = subprocess.PIPE,
stdout = None, stderr = None)
queue = Queue.Queue(maxsize = 32)
reader = threading.Thread(target = reader_thread,
args = (queue, generator.stdout.fileno()))
reader.start()
watcher = threading.Thread(target = watcher_thread,
args = (queue, [generator, consumer]))
watcher.start()
try:
while True:
try:
data = queue.get(True, args.timeout)
if data is None:
break
consumer.stdin.write(data)
except Queue.Empty:
# Timeout: kill the generator
fprintf(sys.stderr, "pipewatch: timeout\n")
generator.terminate()
break
generator.stdout.close()
consumer.stdin.close()
except IOError:
fprintf(sys.stderr, "pipewatch: I/O error\n")
def kill(proc):
# Wait for a process to end, or kill it
def poll_timeout(proc, timeout):
for x in range(1+int(timeout / 0.1)):
if proc.poll() is not None:
break
time.sleep(0.1)
return proc.poll()
try:
if poll_timeout(proc, 0.5) is None:
proc.terminate()
if poll_timeout(proc, 0.5) is None:
proc.kill()
except OSError:
pass
return poll_timeout(proc, 0.5)
# Wait for them to die, or kill them
gret = kill(generator)
cret = kill(consumer)
fprintf(sys.stderr, "pipewatch: generator returned %d, " +
"consumer returned %d\n", gret, cret)
if gret == 0 and cret == 0:
sys.exit(0)
sys.exit(1)
try: try:
os.unlink(args.lock) # Run as a daemon if requested, otherwise run directly.
except OSError: if args.daemon:
pass with daemon.DaemonContext(files_preserve = [ lockfile ]):
pipewatch(args)
else:
pipewatch(args)
finally:
# Clean up lockfile
try:
os.unlink(args.lock)
except OSError:
pass
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -12,6 +12,7 @@ import scipy.fftpack
import scipy.signal import scipy.signal
#from matplotlib import pyplot as p #from matplotlib import pyplot as p
import bisect import bisect
from nilmdb.utils.interval import Interval
def main(argv = None): def main(argv = None):
# Set up argument parser # Set up argument parser
@@ -82,9 +83,20 @@ def main(argv = None):
"prep_column": args.column, "prep_column": args.column,
"prep_rotation": repr(rotation) }) "prep_rotation": repr(rotation) })
# Run the processing function on all data # Find the intersection of the usual set of intervals we'd filter,
# and the intervals actually present in sinefit data. This is
# what we will process.
filter_int = f.intervals()
sinefit_int = ( Interval(start, end) for (start, end) in
client_sinefit.stream_intervals(
args.sinepath, start = f.start, end = f.end) )
intervals = nilmdb.utils.interval.intersection(filter_int, sinefit_int)
# Run the process (using the helper in the filter module)
f.process_numpy(process, args = (client_sinefit, sinefit.path, args.column, f.process_numpy(process, args = (client_sinefit, sinefit.path, args.column,
args.nharm, rotation, args.nshift)) args.nharm, rotation, args.nshift),
intervals = intervals)
def process(data, interval, args, insert_function, final): def process(data, interval, args, insert_function, final):
(client, sinefit_path, column, nharm, rotation, nshift) = args (client, sinefit_path, column, nharm, rotation, nshift) = args

View File

@@ -61,9 +61,10 @@ setup(name='nilmtools',
long_description = "NILM Database Tools", long_description = "NILM Database Tools",
license = "Proprietary", license = "Proprietary",
author_email = 'jim@jtan.com', author_email = 'jim@jtan.com',
install_requires = [ 'nilmdb >= 1.8.1', install_requires = [ 'nilmdb >= 1.8.5',
'numpy', 'numpy',
'scipy', 'scipy',
'python-daemon >= 1.5',
#'matplotlib', #'matplotlib',
], ],
packages = [ 'nilmtools', packages = [ 'nilmtools',