|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- #!/usr/bin/env python3
-
- import nilmdb.client
- from nilmdb.utils.printf import printf, fprintf
- import nilmdb.utils.lock
- import nilmtools
-
- import time
- import sys
- import os
- import argparse
- import subprocess
- import tempfile
- import threading
- import select
- import signal
- import queue
- import daemon
-
-
- def parse_args(argv=None):
- parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter,
- description="""\
- Pipe data from 'generator' to 'consumer'. This is intended to be
- executed frequently from cron, and will exit if another copy is
- already running. If 'generator' or 'consumer' returns an error,
- or if 'generator' stops sending data for a while, it will exit.
-
- Intended for use with ethstream (generator) and nilm-insert
- (consumer). Commands are executed through the shell.
- """)
- parser.add_argument("-v", "--version", action="version",
- version=nilmtools.__version__)
- parser.add_argument("-d", "--daemon", action="store_true",
- help="Run in background")
- parser.add_argument("-l", "--lock", metavar="FILENAME", action="store",
- default=tempfile.gettempdir() +
- "/nilm-pipewatch.lock",
- help="Lock file for detecting running instance")
- parser.add_argument("-t", "--timeout", metavar="SECONDS", action="store",
- type=float, default=30,
- help="Exit if no output from " +
- "generator for this long")
- group = parser.add_argument_group("commands to execute")
- group.add_argument("generator", action="store",
- help="Data generator (e.g. \"ethstream -r 8000\")")
- group.add_argument("consumer", action="store",
- help="Data consumer (e.g. \"nilm-insert /foo/bar\")")
- args = parser.parse_args(argv)
-
- return args
-
-
- def reader_thread(q, fd):
- # Read from a file descriptor, write to queue.
- try:
- while True:
- (r, w, x) = select.select([fd], [], [fd], 0.25)
- if x: # pragma: no cover -- never expect this to happen
- # Very few things are "exceptional conditions";
- # just TCP OOB data, some TTY state changes, etc.
- raise Exception
- if not r:
- # short timeout -- just try again. This is to catch the
- # fd being closed elsewhere, which is only detected
- # when select restarts.
- continue
- data = os.read(fd, 65536)
- if data == b"": # generator EOF
- raise Exception
- q.put(data)
- except Exception:
- q.put(None)
-
-
- def watcher_thread(q, procs):
- # Put None in the queue if either process dies
- while True:
- for p in procs:
- if p.poll() is not None:
- q.put(None)
- return
- time.sleep(0.25)
-
-
- def pipewatch(args):
- # Run the processes, etc
- with open(os.devnull, "r") as devnull:
- generator = subprocess.Popen(args.generator, shell=True,
- bufsize=-1, close_fds=True,
- stdin=devnull,
- stdout=subprocess.PIPE,
- stderr=None,
- preexec_fn=os.setpgrp)
- consumer = subprocess.Popen(args.consumer, shell=True,
- bufsize=-11, close_fds=True,
- stdin=subprocess.PIPE,
- stdout=None,
- stderr=None,
- preexec_fn=os.setpgrp)
-
- q = queue.Queue(maxsize=4)
- reader = threading.Thread(target=reader_thread,
- args=(q, generator.stdout.fileno()))
- reader.start()
- watcher = threading.Thread(target=watcher_thread,
- args=(q, [generator, consumer]))
- watcher.start()
- try:
- while True:
- try:
- data = q.get(True, args.timeout)
- if data is None:
- break
- consumer.stdin.write(data)
- except queue.Empty:
- # Timeout: kill the generator
- fprintf(sys.stderr, "pipewatch: timeout\n")
- generator.terminate()
- break
-
- generator.stdout.close()
- consumer.stdin.close()
- except IOError:
- fprintf(sys.stderr, "pipewatch: I/O error\n")
-
- def kill(proc):
- # Wait for a process to end, or kill it
- def poll_timeout(proc, timeout):
- for x in range(1+int(timeout / 0.1)):
- if proc.poll() is not None:
- break
- time.sleep(0.1)
- return proc.poll()
- try:
- if poll_timeout(proc, 0.5) is None:
- os.killpg(proc.pid, signal.SIGTERM)
- if poll_timeout(proc, 0.5) is None:
- os.killpg(proc.pid, signal.SIGKILL)
- except OSError: # pragma: no cover
- # (hard to trigger race condition in os.killpg)
- pass
- return poll_timeout(proc, 0.5)
-
- # Wait for them to die, or kill them
- cret = kill(consumer)
- gret = kill(generator)
-
- # Consume all remaining data in the queue until the reader
- # and watcher threads are done
- while reader.is_alive() or watcher.is_alive():
- q.get(True, 0.1)
-
- fprintf(sys.stderr, "pipewatch: generator returned %d, " +
- "consumer returned %d\n", gret, cret)
- if gret == 0 and cret == 0:
- sys.exit(0)
- sys.exit(1)
-
-
- def main(argv=None):
- args = parse_args(argv)
-
- lockfile = open(args.lock, "w")
- if not nilmdb.utils.lock.exclusive_lock(lockfile):
- printf("pipewatch process already running (according to %s)\n",
- args.lock)
- sys.exit(0)
- try:
- # Run as a daemon if requested, otherwise run directly.
- if args.daemon: # pragma: no cover (hard to do from inside test suite)
- with daemon.DaemonContext(files_preserve=[lockfile]):
- pipewatch(args)
- else:
- pipewatch(args)
- finally:
- # Clean up lockfile
- try:
- os.unlink(args.lock)
- except OSError:
- pass
-
-
- if __name__ == "__main__":
- main()
|