nilmtools/nilmtools/pipewatch.py
Jim Paris cfc66b6847 Fix flake8 errors throughout code
This found a small number of real bugs too, for example,
this one that looked weird because of a 2to3 conversion,
but was wrong both before and after:
-        except IndexError as TypeError:
+        except (IndexError, TypeError):
2020-08-06 17:58:41 -04:00

187 lines
6.5 KiB
Python
Executable File

#!/usr/bin/env python3
import nilmdb.client
from nilmdb.utils.printf import printf, fprintf
import nilmdb.utils.lock
import nilmtools
import time
import sys
import os
import argparse
import subprocess
import tempfile
import threading
import select
import signal
import queue
import daemon
def parse_args(argv=None):
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description="""\
Pipe data from 'generator' to 'consumer'. This is intended to be
executed frequently from cron, and will exit if another copy is
already running. If 'generator' or 'consumer' returns an error,
or if 'generator' stops sending data for a while, it will exit.
Intended for use with ethstream (generator) and nilm-insert
(consumer). Commands are executed through the shell.
""")
parser.add_argument("-v", "--version", action="version",
version=nilmtools.__version__)
parser.add_argument("-d", "--daemon", action="store_true",
help="Run in background")
parser.add_argument("-l", "--lock", metavar="FILENAME", action="store",
default=tempfile.gettempdir() +
"/nilm-pipewatch.lock",
help="Lock file for detecting running instance")
parser.add_argument("-t", "--timeout", metavar="SECONDS", action="store",
type=float, default=30,
help="Exit if no output from " +
"generator for this long")
group = parser.add_argument_group("commands to execute")
group.add_argument("generator", action="store",
help="Data generator (e.g. \"ethstream -r 8000\")")
group.add_argument("consumer", action="store",
help="Data consumer (e.g. \"nilm-insert /foo/bar\")")
args = parser.parse_args(argv)
return args
def reader_thread(q, fd):
# Read from a file descriptor, write to queue.
try:
while True:
(r, w, x) = select.select([fd], [], [fd], 0.25)
if x: # pragma: no cover -- never expect this to happen
# Very few things are "exceptional conditions";
# just TCP OOB data, some TTY state changes, etc.
raise Exception
if not r:
# short timeout -- just try again. This is to catch the
# fd being closed elsewhere, which is only detected
# when select restarts.
continue
data = os.read(fd, 65536)
if data == b"": # generator EOF
raise Exception
q.put(data)
except Exception:
q.put(None)
def watcher_thread(q, procs):
# Put None in the queue if either process dies
while True:
for p in procs:
if p.poll() is not None:
q.put(None)
return
time.sleep(0.25)
def pipewatch(args):
# Run the processes, etc
with open(os.devnull, "r") as devnull:
generator = subprocess.Popen(args.generator, shell=True,
bufsize=-1, close_fds=True,
stdin=devnull,
stdout=subprocess.PIPE,
stderr=None,
preexec_fn=os.setpgrp)
consumer = subprocess.Popen(args.consumer, shell=True,
bufsize=-11, close_fds=True,
stdin=subprocess.PIPE,
stdout=None,
stderr=None,
preexec_fn=os.setpgrp)
q = queue.Queue(maxsize=4)
reader = threading.Thread(target=reader_thread,
args=(q, generator.stdout.fileno()))
reader.start()
watcher = threading.Thread(target=watcher_thread,
args=(q, [generator, consumer]))
watcher.start()
try:
while True:
try:
data = q.get(True, args.timeout)
if data is None:
break
consumer.stdin.write(data)
except queue.Empty:
# Timeout: kill the generator
fprintf(sys.stderr, "pipewatch: timeout\n")
generator.terminate()
break
generator.stdout.close()
consumer.stdin.close()
except IOError:
fprintf(sys.stderr, "pipewatch: I/O error\n")
def kill(proc):
# Wait for a process to end, or kill it
def poll_timeout(proc, timeout):
for x in range(1+int(timeout / 0.1)):
if proc.poll() is not None:
break
time.sleep(0.1)
return proc.poll()
try:
if poll_timeout(proc, 0.5) is None:
os.killpg(proc.pid, signal.SIGTERM)
if poll_timeout(proc, 0.5) is None:
os.killpg(proc.pid, signal.SIGKILL)
except OSError: # pragma: no cover
# (hard to trigger race condition in os.killpg)
pass
return poll_timeout(proc, 0.5)
# Wait for them to die, or kill them
cret = kill(consumer)
gret = kill(generator)
# Consume all remaining data in the queue until the reader
# and watcher threads are done
while reader.is_alive() or watcher.is_alive():
q.get(True, 0.1)
fprintf(sys.stderr, "pipewatch: generator returned %d, " +
"consumer returned %d\n", gret, cret)
if gret == 0 and cret == 0:
sys.exit(0)
sys.exit(1)
def main(argv=None):
args = parse_args(argv)
lockfile = open(args.lock, "w")
if not nilmdb.utils.lock.exclusive_lock(lockfile):
printf("pipewatch process already running (according to %s)\n",
args.lock)
sys.exit(0)
try:
# Run as a daemon if requested, otherwise run directly.
if args.daemon: # pragma: no cover (hard to do from inside test suite)
with daemon.DaemonContext(files_preserve=[lockfile]):
pipewatch(args)
else:
pipewatch(args)
finally:
# Clean up lockfile
try:
os.unlink(args.lock)
except OSError:
pass
if __name__ == "__main__":
main()