#!/usr/bin/env python3 import nilmdb.client from nilmdb.utils.printf import * import nilmdb.utils.lock import nilmtools import time import sys import os import argparse import subprocess import tempfile import threading import select import signal import queue import daemon def parse_args(argv = None): parser = argparse.ArgumentParser( formatter_class = argparse.ArgumentDefaultsHelpFormatter, description = """\ Pipe data from 'generator' to 'consumer'. This is intended to be executed frequently from cron, and will exit if another copy is already running. If 'generator' or 'consumer' returns an error, or if 'generator' stops sending data for a while, it will exit. Intended for use with ethstream (generator) and nilm-insert (consumer). Commands are executed through the shell. """) parser.add_argument("-v", "--version", action="version", version=nilmtools.__version__) parser.add_argument("-d", "--daemon", action="store_true", help="Run in background") parser.add_argument("-l", "--lock", metavar="FILENAME", action="store", default=tempfile.gettempdir() + "/nilm-pipewatch.lock", help="Lock file for detecting running instance") parser.add_argument("-t", "--timeout", metavar="SECONDS", action="store", type=float, default=30, help="Exit if no output from " + "generator for this long") group = parser.add_argument_group("commands to execute") group.add_argument("generator", action="store", help="Data generator (e.g. \"ethstream -r 8000\")") group.add_argument("consumer", action="store", help="Data consumer (e.g. \"nilm-insert /foo/bar\")") args = parser.parse_args(argv) return args def reader_thread(q, fd): # Read from a file descriptor, write to queue. try: while True: (r, w, x) = select.select([fd], [], [fd], 0.25) if x: # pragma: no cover -- never expect this to happen # Very few things are "exceptional conditions"; # just TCP OOB data, some TTY state changes, etc. raise Exception if not r: # short timeout -- just try again. This is to catch the # fd being closed elsewhere, which is only detected # when select restarts. continue data = os.read(fd, 65536) if data == b"": # generator EOF raise Exception q.put(data) except Exception: q.put(None) def watcher_thread(q, procs): # Put None in the queue if either process dies while True: for p in procs: if p.poll() is not None: q.put(None) return time.sleep(0.25) def pipewatch(args): # Run the processes, etc with open(os.devnull, "r") as devnull: generator = subprocess.Popen(args.generator, shell = True, bufsize = -1, close_fds = True, stdin = devnull, stdout = subprocess.PIPE, stderr = None, preexec_fn = os.setpgrp) consumer = subprocess.Popen(args.consumer, shell = True, bufsize = -11, close_fds = True, stdin = subprocess.PIPE, stdout = None, stderr = None, preexec_fn = os.setpgrp) q = queue.Queue(maxsize = 4) reader = threading.Thread(target = reader_thread, args = (q, generator.stdout.fileno())) reader.start() watcher = threading.Thread(target = watcher_thread, args = (q, [generator, consumer])) watcher.start() try: while True: try: data = q.get(True, args.timeout) if data is None: break consumer.stdin.write(data) except queue.Empty: # Timeout: kill the generator fprintf(sys.stderr, "pipewatch: timeout\n") generator.terminate() break generator.stdout.close() consumer.stdin.close() except IOError: fprintf(sys.stderr, "pipewatch: I/O error\n") def kill(proc): # Wait for a process to end, or kill it def poll_timeout(proc, timeout): for x in range(1+int(timeout / 0.1)): if proc.poll() is not None: break time.sleep(0.1) return proc.poll() try: if poll_timeout(proc, 0.5) is None: os.killpg(proc.pid, signal.SIGTERM) if poll_timeout(proc, 0.5) is None: os.killpg(proc.pid, signal.SIGKILL) except OSError: # pragma: no cover # (hard to trigger race condition in os.killpg) pass return poll_timeout(proc, 0.5) # Wait for them to die, or kill them cret = kill(consumer) gret = kill(generator) # Consume all remaining data in the queue until the reader # and watcher threads are done while reader.is_alive() or watcher.is_alive(): q.get(True, 0.1) fprintf(sys.stderr, "pipewatch: generator returned %d, " + "consumer returned %d\n", gret, cret) if gret == 0 and cret == 0: sys.exit(0) sys.exit(1) def main(argv = None): args = parse_args(argv) lockfile = open(args.lock, "w") if not nilmdb.utils.lock.exclusive_lock(lockfile): printf("pipewatch process already running (according to %s)\n", args.lock) sys.exit(0) try: # Run as a daemon if requested, otherwise run directly. if args.daemon: # pragma: no cover (hard to do from inside test suite) with daemon.DaemonContext(files_preserve = [ lockfile ]): pipewatch(args) else: pipewatch(args) finally: # Clean up lockfile try: os.unlink(args.lock) except OSError: pass if __name__ == "__main__": main()