You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

187 lines
6.5 KiB

  1. #!/usr/bin/env python3
  2. import nilmdb.client
  3. from nilmdb.utils.printf import printf, fprintf
  4. import nilmdb.utils.lock
  5. import nilmtools
  6. import time
  7. import sys
  8. import os
  9. import argparse
  10. import subprocess
  11. import tempfile
  12. import threading
  13. import select
  14. import signal
  15. import queue
  16. import daemon
  17. def parse_args(argv=None):
  18. parser = argparse.ArgumentParser(
  19. formatter_class=argparse.ArgumentDefaultsHelpFormatter,
  20. description="""\
  21. Pipe data from 'generator' to 'consumer'. This is intended to be
  22. executed frequently from cron, and will exit if another copy is
  23. already running. If 'generator' or 'consumer' returns an error,
  24. or if 'generator' stops sending data for a while, it will exit.
  25. Intended for use with ethstream (generator) and nilm-insert
  26. (consumer). Commands are executed through the shell.
  27. """)
  28. parser.add_argument("-v", "--version", action="version",
  29. version=nilmtools.__version__)
  30. parser.add_argument("-d", "--daemon", action="store_true",
  31. help="Run in background")
  32. parser.add_argument("-l", "--lock", metavar="FILENAME", action="store",
  33. default=tempfile.gettempdir() +
  34. "/nilm-pipewatch.lock",
  35. help="Lock file for detecting running instance")
  36. parser.add_argument("-t", "--timeout", metavar="SECONDS", action="store",
  37. type=float, default=30,
  38. help="Exit if no output from " +
  39. "generator for this long")
  40. group = parser.add_argument_group("commands to execute")
  41. group.add_argument("generator", action="store",
  42. help="Data generator (e.g. \"ethstream -r 8000\")")
  43. group.add_argument("consumer", action="store",
  44. help="Data consumer (e.g. \"nilm-insert /foo/bar\")")
  45. args = parser.parse_args(argv)
  46. return args
  47. def reader_thread(q, fd):
  48. # Read from a file descriptor, write to queue.
  49. try:
  50. while True:
  51. (r, w, x) = select.select([fd], [], [fd], 0.25)
  52. if x: # pragma: no cover -- never expect this to happen
  53. # Very few things are "exceptional conditions";
  54. # just TCP OOB data, some TTY state changes, etc.
  55. raise Exception
  56. if not r:
  57. # short timeout -- just try again. This is to catch the
  58. # fd being closed elsewhere, which is only detected
  59. # when select restarts.
  60. continue
  61. data = os.read(fd, 65536)
  62. if data == b"": # generator EOF
  63. raise Exception
  64. q.put(data)
  65. except Exception:
  66. q.put(None)
  67. def watcher_thread(q, procs):
  68. # Put None in the queue if either process dies
  69. while True:
  70. for p in procs:
  71. if p.poll() is not None:
  72. q.put(None)
  73. return
  74. time.sleep(0.25)
  75. def pipewatch(args):
  76. # Run the processes, etc
  77. with open(os.devnull, "r") as devnull:
  78. generator = subprocess.Popen(args.generator, shell=True,
  79. bufsize=-1, close_fds=True,
  80. stdin=devnull,
  81. stdout=subprocess.PIPE,
  82. stderr=None,
  83. preexec_fn=os.setpgrp)
  84. consumer = subprocess.Popen(args.consumer, shell=True,
  85. bufsize=-11, close_fds=True,
  86. stdin=subprocess.PIPE,
  87. stdout=None,
  88. stderr=None,
  89. preexec_fn=os.setpgrp)
  90. q = queue.Queue(maxsize=4)
  91. reader = threading.Thread(target=reader_thread,
  92. args=(q, generator.stdout.fileno()))
  93. reader.start()
  94. watcher = threading.Thread(target=watcher_thread,
  95. args=(q, [generator, consumer]))
  96. watcher.start()
  97. try:
  98. while True:
  99. try:
  100. data = q.get(True, args.timeout)
  101. if data is None:
  102. break
  103. consumer.stdin.write(data)
  104. except queue.Empty:
  105. # Timeout: kill the generator
  106. fprintf(sys.stderr, "pipewatch: timeout\n")
  107. generator.terminate()
  108. break
  109. generator.stdout.close()
  110. consumer.stdin.close()
  111. except IOError:
  112. fprintf(sys.stderr, "pipewatch: I/O error\n")
  113. def kill(proc):
  114. # Wait for a process to end, or kill it
  115. def poll_timeout(proc, timeout):
  116. for x in range(1+int(timeout / 0.1)):
  117. if proc.poll() is not None:
  118. break
  119. time.sleep(0.1)
  120. return proc.poll()
  121. try:
  122. if poll_timeout(proc, 0.5) is None:
  123. os.killpg(proc.pid, signal.SIGTERM)
  124. if poll_timeout(proc, 0.5) is None:
  125. os.killpg(proc.pid, signal.SIGKILL)
  126. except OSError: # pragma: no cover
  127. # (hard to trigger race condition in os.killpg)
  128. pass
  129. return poll_timeout(proc, 0.5)
  130. # Wait for them to die, or kill them
  131. cret = kill(consumer)
  132. gret = kill(generator)
  133. # Consume all remaining data in the queue until the reader
  134. # and watcher threads are done
  135. while reader.is_alive() or watcher.is_alive():
  136. q.get(True, 0.1)
  137. fprintf(sys.stderr, "pipewatch: generator returned %d, " +
  138. "consumer returned %d\n", gret, cret)
  139. if gret == 0 and cret == 0:
  140. sys.exit(0)
  141. sys.exit(1)
  142. def main(argv=None):
  143. args = parse_args(argv)
  144. lockfile = open(args.lock, "w")
  145. if not nilmdb.utils.lock.exclusive_lock(lockfile):
  146. printf("pipewatch process already running (according to %s)\n",
  147. args.lock)
  148. sys.exit(0)
  149. try:
  150. # Run as a daemon if requested, otherwise run directly.
  151. if args.daemon: # pragma: no cover (hard to do from inside test suite)
  152. with daemon.DaemonContext(files_preserve=[lockfile]):
  153. pipewatch(args)
  154. else:
  155. pipewatch(args)
  156. finally:
  157. # Clean up lockfile
  158. try:
  159. os.unlink(args.lock)
  160. except OSError:
  161. pass
  162. if __name__ == "__main__":
  163. main()