You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

177 lines
6.2 KiB

  1. #!/usr/bin/python
  2. import nilmdb.client
  3. from nilmdb.utils.printf import *
  4. import nilmdb.utils.lock
  5. import nilmtools
  6. import time
  7. import sys
  8. import os
  9. import argparse
  10. import subprocess
  11. import tempfile
  12. import threading
  13. import select
  14. import signal
  15. import Queue
  16. import daemon
  17. def parse_args(argv = None):
  18. parser = argparse.ArgumentParser(
  19. formatter_class = argparse.ArgumentDefaultsHelpFormatter,
  20. version = nilmtools.__version__,
  21. description = """\
  22. Pipe data from 'generator' to 'consumer'. This is intended to be
  23. executed frequently from cron, and will exit if another copy is
  24. already running. If 'generator' or 'consumer' returns an error,
  25. or if 'generator' stops sending data for a while, it will exit.
  26. Intended for use with ethstream (generator) and nilm-insert
  27. (consumer). Commands are executed through the shell.
  28. """)
  29. parser.add_argument("-d", "--daemon", action="store_true",
  30. help="Run in background")
  31. parser.add_argument("-l", "--lock", metavar="FILENAME", action="store",
  32. default=tempfile.gettempdir() +
  33. "/nilm-pipewatch.lock",
  34. help="Lock file for detecting running instance")
  35. parser.add_argument("-t", "--timeout", metavar="SECONDS", action="store",
  36. type=float, default=30,
  37. help="Restart if no output from " +
  38. "generator for this long")
  39. group = parser.add_argument_group("commands to execute")
  40. group.add_argument("generator", action="store",
  41. help="Data generator (e.g. \"ethstream -r 8000\")")
  42. group.add_argument("consumer", action="store",
  43. help="Data consumer (e.g. \"nilm-insert /foo/bar\")")
  44. args = parser.parse_args(argv)
  45. return args
  46. def reader_thread(queue, fd):
  47. # Read from a file descriptor, write to queue.
  48. try:
  49. while True:
  50. (r, w, x) = select.select([fd], [], [fd], 0.25)
  51. if x:
  52. raise Exception # generator died?
  53. if not r:
  54. # short timeout -- just try again. This is to catch the
  55. # fd being closed elsewhere, which is only detected
  56. # when select restarts.
  57. continue
  58. data = os.read(fd, 65536)
  59. if data == "": # generator EOF
  60. raise Exception
  61. queue.put(data)
  62. except Exception:
  63. queue.put(None)
  64. def watcher_thread(queue, procs):
  65. # Put None in the queue if either process dies
  66. while True:
  67. for p in procs:
  68. if p.poll() is not None:
  69. queue.put(None)
  70. return
  71. time.sleep(0.25)
  72. def pipewatch(args):
  73. # Run the processes, etc
  74. with open(os.devnull, "r") as devnull:
  75. generator = subprocess.Popen(args.generator, shell = True,
  76. bufsize = -1, close_fds = True,
  77. stdin = devnull,
  78. stdout = subprocess.PIPE,
  79. stderr = None,
  80. preexec_fn = os.setpgrp)
  81. consumer = subprocess.Popen(args.consumer, shell = True,
  82. bufsize = -11, close_fds = True,
  83. stdin = subprocess.PIPE,
  84. stdout = None,
  85. stderr = None,
  86. preexec_fn = os.setpgrp)
  87. queue = Queue.Queue(maxsize = 4)
  88. reader = threading.Thread(target = reader_thread,
  89. args = (queue, generator.stdout.fileno()))
  90. reader.start()
  91. watcher = threading.Thread(target = watcher_thread,
  92. args = (queue, [generator, consumer]))
  93. watcher.start()
  94. try:
  95. while True:
  96. try:
  97. data = queue.get(True, args.timeout)
  98. if data is None:
  99. break
  100. consumer.stdin.write(data)
  101. except Queue.Empty:
  102. # Timeout: kill the generator
  103. fprintf(sys.stderr, "pipewatch: timeout\n")
  104. generator.terminate()
  105. break
  106. generator.stdout.close()
  107. consumer.stdin.close()
  108. except IOError:
  109. fprintf(sys.stderr, "pipewatch: I/O error\n")
  110. def kill(proc):
  111. # Wait for a process to end, or kill it
  112. def poll_timeout(proc, timeout):
  113. for x in range(1+int(timeout / 0.1)):
  114. if proc.poll() is not None:
  115. break
  116. time.sleep(0.1)
  117. return proc.poll()
  118. try:
  119. if poll_timeout(proc, 0.5) is None:
  120. os.killpg(proc.pid, signal.SIGTERM)
  121. if poll_timeout(proc, 0.5) is None:
  122. os.killpg(proc.pid, signal.SIGKILL)
  123. except OSError:
  124. pass
  125. return poll_timeout(proc, 0.5)
  126. # Wait for them to die, or kill them
  127. cret = kill(consumer)
  128. gret = kill(generator)
  129. # Consume all remaining data in the queue until the reader
  130. # and watcher threads are done
  131. while reader.is_alive() or watcher.is_alive():
  132. queue.get(True, 0.1)
  133. fprintf(sys.stderr, "pipewatch: generator returned %d, " +
  134. "consumer returned %d\n", gret, cret)
  135. if gret == 0 and cret == 0:
  136. sys.exit(0)
  137. sys.exit(1)
  138. def main(argv = None):
  139. args = parse_args(argv)
  140. lockfile = open(args.lock, "w")
  141. if not nilmdb.utils.lock.exclusive_lock(lockfile):
  142. printf("pipewatch process already running (according to %s)\n",
  143. args.lock)
  144. sys.exit(0)
  145. try:
  146. # Run as a daemon if requested, otherwise run directly.
  147. if args.daemon:
  148. with daemon.DaemonContext(files_preserve = [ lockfile ]):
  149. pipewatch(args)
  150. else:
  151. pipewatch(args)
  152. finally:
  153. # Clean up lockfile
  154. try:
  155. os.unlink(args.lock)
  156. except OSError:
  157. pass
  158. if __name__ == "__main__":
  159. main()