Compare commits
2 Commits
nilmtools-
...
nilmtools-
Author | SHA1 | Date | |
---|---|---|---|
![]() |
8b9c5d4898 | ||
cf2c28b0fb |
@@ -6,7 +6,7 @@ Prerequisites:
|
|||||||
|
|
||||||
# Runtime and build environments
|
# Runtime and build environments
|
||||||
sudo apt-get install python2.7 python2.7-dev python-setuptools
|
sudo apt-get install python2.7 python2.7-dev python-setuptools
|
||||||
sudo apt-get install python-numpy python-scipy
|
sudo apt-get install python-numpy python-scipy python-daemon
|
||||||
|
|
||||||
nilmdb (1.8.1+)
|
nilmdb (1.8.1+)
|
||||||
|
|
||||||
|
@@ -15,6 +15,7 @@ import threading
|
|||||||
import select
|
import select
|
||||||
import signal
|
import signal
|
||||||
import Queue
|
import Queue
|
||||||
|
import daemon
|
||||||
|
|
||||||
def parse_args(argv = None):
|
def parse_args(argv = None):
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
@@ -29,6 +30,8 @@ def parse_args(argv = None):
|
|||||||
Intended for use with ethstream (generator) and nilm-insert
|
Intended for use with ethstream (generator) and nilm-insert
|
||||||
(consumer). Commands are executed through the shell.
|
(consumer). Commands are executed through the shell.
|
||||||
""")
|
""")
|
||||||
|
parser.add_argument("-d", "--daemon", action="store_true",
|
||||||
|
help="Run in background")
|
||||||
parser.add_argument("-l", "--lock", metavar="FILENAME", action="store",
|
parser.add_argument("-l", "--lock", metavar="FILENAME", action="store",
|
||||||
default=tempfile.gettempdir() +
|
default=tempfile.gettempdir() +
|
||||||
"/nilm-pipewatch.lock",
|
"/nilm-pipewatch.lock",
|
||||||
@@ -74,82 +77,92 @@ def watcher_thread(queue, procs):
|
|||||||
return
|
return
|
||||||
time.sleep(0.25)
|
time.sleep(0.25)
|
||||||
|
|
||||||
|
def pipewatch(args):
|
||||||
|
# Run the processes, etc
|
||||||
|
with open(os.devnull, "r") as devnull:
|
||||||
|
generator = subprocess.Popen(args.generator, shell = True,
|
||||||
|
bufsize = -1, close_fds = True,
|
||||||
|
stdin = devnull,
|
||||||
|
stdout = subprocess.PIPE,
|
||||||
|
stderr = None)
|
||||||
|
consumer = subprocess.Popen(args.consumer, shell = True,
|
||||||
|
bufsize = -11, close_fds = True,
|
||||||
|
stdin = subprocess.PIPE,
|
||||||
|
stdout = None, stderr = None)
|
||||||
|
|
||||||
|
queue = Queue.Queue(maxsize = 32)
|
||||||
|
reader = threading.Thread(target = reader_thread,
|
||||||
|
args = (queue, generator.stdout.fileno()))
|
||||||
|
reader.start()
|
||||||
|
watcher = threading.Thread(target = watcher_thread,
|
||||||
|
args = (queue, [generator, consumer]))
|
||||||
|
watcher.start()
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
data = queue.get(True, args.timeout)
|
||||||
|
if data is None:
|
||||||
|
break
|
||||||
|
consumer.stdin.write(data)
|
||||||
|
except Queue.Empty:
|
||||||
|
# Timeout: kill the generator
|
||||||
|
fprintf(sys.stderr, "pipewatch: timeout\n")
|
||||||
|
generator.terminate()
|
||||||
|
break
|
||||||
|
|
||||||
|
generator.stdout.close()
|
||||||
|
consumer.stdin.close()
|
||||||
|
except IOError:
|
||||||
|
fprintf(sys.stderr, "pipewatch: I/O error\n")
|
||||||
|
|
||||||
|
def kill(proc):
|
||||||
|
# Wait for a process to end, or kill it
|
||||||
|
def poll_timeout(proc, timeout):
|
||||||
|
for x in range(1+int(timeout / 0.1)):
|
||||||
|
if proc.poll() is not None:
|
||||||
|
break
|
||||||
|
time.sleep(0.1)
|
||||||
|
return proc.poll()
|
||||||
|
try:
|
||||||
|
if poll_timeout(proc, 0.5) is None:
|
||||||
|
proc.terminate()
|
||||||
|
if poll_timeout(proc, 0.5) is None:
|
||||||
|
proc.kill()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
return poll_timeout(proc, 0.5)
|
||||||
|
|
||||||
|
# Wait for them to die, or kill them
|
||||||
|
gret = kill(generator)
|
||||||
|
cret = kill(consumer)
|
||||||
|
|
||||||
|
fprintf(sys.stderr, "pipewatch: generator returned %d, " +
|
||||||
|
"consumer returned %d\n", gret, cret)
|
||||||
|
if gret == 0 and cret == 0:
|
||||||
|
sys.exit(0)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
def main(argv = None):
|
def main(argv = None):
|
||||||
args = parse_args(argv)
|
args = parse_args(argv)
|
||||||
|
|
||||||
with open(args.lock, "w") as lockfile:
|
lockfile = open(args.lock, "w")
|
||||||
if not nilmdb.utils.lock.exclusive_lock(lockfile):
|
if not nilmdb.utils.lock.exclusive_lock(lockfile):
|
||||||
printf("pipewatch process already running (according to %s)\n",
|
printf("pipewatch process already running (according to %s)\n",
|
||||||
args.lock)
|
args.lock)
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
|
|
||||||
with open(os.devnull, "r") as devnull:
|
|
||||||
generator = subprocess.Popen(args.generator, shell = True,
|
|
||||||
bufsize = -1, close_fds = True,
|
|
||||||
stdin = devnull,
|
|
||||||
stdout = subprocess.PIPE,
|
|
||||||
stderr = None)
|
|
||||||
consumer = subprocess.Popen(args.consumer, shell = True,
|
|
||||||
bufsize = -11, close_fds = True,
|
|
||||||
stdin = subprocess.PIPE,
|
|
||||||
stdout = None, stderr = None)
|
|
||||||
|
|
||||||
queue = Queue.Queue(maxsize = 32)
|
|
||||||
reader = threading.Thread(target = reader_thread,
|
|
||||||
args = (queue, generator.stdout.fileno()))
|
|
||||||
reader.start()
|
|
||||||
watcher = threading.Thread(target = watcher_thread,
|
|
||||||
args = (queue, [generator, consumer]))
|
|
||||||
watcher.start()
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
data = queue.get(True, args.timeout)
|
|
||||||
if data is None:
|
|
||||||
break
|
|
||||||
consumer.stdin.write(data)
|
|
||||||
except Queue.Empty:
|
|
||||||
# Timeout: kill the generator
|
|
||||||
fprintf(sys.stderr, "pipewatch: timeout\n")
|
|
||||||
generator.terminate()
|
|
||||||
break
|
|
||||||
|
|
||||||
generator.stdout.close()
|
|
||||||
consumer.stdin.close()
|
|
||||||
except IOError:
|
|
||||||
fprintf(sys.stderr, "pipewatch: I/O error\n")
|
|
||||||
|
|
||||||
def kill(proc):
|
|
||||||
# Wait for a process to end, or kill it
|
|
||||||
def poll_timeout(proc, timeout):
|
|
||||||
for x in range(1+int(timeout / 0.1)):
|
|
||||||
if proc.poll() is not None:
|
|
||||||
break
|
|
||||||
time.sleep(0.1)
|
|
||||||
return proc.poll()
|
|
||||||
try:
|
|
||||||
if poll_timeout(proc, 0.5) is None:
|
|
||||||
proc.terminate()
|
|
||||||
if poll_timeout(proc, 0.5) is None:
|
|
||||||
proc.kill()
|
|
||||||
except OSError:
|
|
||||||
pass
|
|
||||||
return poll_timeout(proc, 0.5)
|
|
||||||
|
|
||||||
# Wait for them to die, or kill them
|
|
||||||
gret = kill(generator)
|
|
||||||
cret = kill(consumer)
|
|
||||||
|
|
||||||
fprintf(sys.stderr, "pipewatch: generator returned %d, " +
|
|
||||||
"consumer returned %d\n", gret, cret)
|
|
||||||
if gret == 0 and cret == 0:
|
|
||||||
sys.exit(0)
|
|
||||||
sys.exit(1)
|
|
||||||
try:
|
try:
|
||||||
os.unlink(args.lock)
|
# Run as a daemon if requested, otherwise run directly.
|
||||||
except OSError:
|
if args.daemon:
|
||||||
pass
|
with daemon.DaemonContext(files_preserve = [ lockfile ]):
|
||||||
|
pipewatch(args)
|
||||||
|
else:
|
||||||
|
pipewatch(args)
|
||||||
|
finally:
|
||||||
|
# Clean up lockfile
|
||||||
|
try:
|
||||||
|
os.unlink(args.lock)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
Reference in New Issue
Block a user