You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

141 lines
5.1 KiB

  1. #!/usr/bin/python
  2. import nilmdb.client
  3. from nilmdb.utils.printf import *
  4. import nilmdb.utils.lock
  5. import nilmtools
  6. import time
  7. import sys
  8. import os
  9. import argparse
  10. import subprocess
  11. import tempfile
  12. import threading
  13. import select
  14. import signal
  15. import Queue
  16. def parse_args(argv = None):
  17. parser = argparse.ArgumentParser(
  18. formatter_class = argparse.ArgumentDefaultsHelpFormatter,
  19. version = nilmtools.__version__,
  20. description = """\
  21. Pipe data from 'generator' to 'consumer'. This is intended to be
  22. executed frequently from cron, and will exit if another copy is
  23. already running. If 'generator' or 'consumer' returns an error,
  24. or if 'generator' stops sending data for a while, it will exit.
  25. Intended for use with ethstream (generator) and nilm-insert
  26. (consumer). Commands are executed through the shell.
  27. """)
  28. parser.add_argument("-l", "--lock", metavar="FILENAME", action="store",
  29. default=tempfile.gettempdir() +
  30. "/nilm-pipewatch.lock",
  31. help="Lock file for detecting running instance")
  32. parser.add_argument("-t", "--timeout", metavar="SECONDS", action="store",
  33. type=float, default=30,
  34. help="Restart if no output from " +
  35. "generator for this long")
  36. group = parser.add_argument_group("commands to execute")
  37. group.add_argument("generator", action="store",
  38. help="Data generator (e.g. \"ethstream -r 8000\")")
  39. group.add_argument("consumer", action="store",
  40. help="Data consumer (e.g. \"nilm-insert /foo/bar\")")
  41. args = parser.parse_args(argv)
  42. return args
  43. def reader_thread(queue, fd):
  44. # Read from a file descriptor, write to queue.
  45. try:
  46. while True:
  47. (r, w, x) = select.select([fd], [], [fd], 0.25)
  48. if x:
  49. raise Exception
  50. if not r:
  51. # short timeout -- just try again. This is to catch the
  52. # fd being closed elsewhere, which is only detected
  53. # when select restarts.
  54. continue
  55. data = os.read(fd, 65536)
  56. if data == "":
  57. raise Exception
  58. queue.put(data)
  59. except Exception:
  60. queue.put(None)
  61. def main(argv = None):
  62. args = parse_args(argv)
  63. with open(args.lock, "w") as lockfile:
  64. if not nilmdb.utils.lock.exclusive_lock(lockfile):
  65. printf("pipewatch process already running (according to %s)\n",
  66. args.lock)
  67. sys.exit(0)
  68. with open(os.devnull, "r") as devnull:
  69. generator = subprocess.Popen(args.generator, shell = True,
  70. bufsize = -1, close_fds = True,
  71. stdin = devnull,
  72. stdout = subprocess.PIPE,
  73. stderr = None)
  74. consumer = subprocess.Popen(args.consumer, shell = True,
  75. bufsize = -11, close_fds = True,
  76. stdin = subprocess.PIPE,
  77. stdout = None, stderr = None)
  78. queue = Queue.Queue(maxsize = 32)
  79. reader = threading.Thread(target = reader_thread,
  80. args = (queue, generator.stdout.fileno()))
  81. reader.start()
  82. try:
  83. while True:
  84. try:
  85. data = queue.get(True, args.timeout)
  86. if data is None:
  87. break
  88. consumer.stdin.write(data)
  89. except Queue.Empty:
  90. # Timeout: kill the generator
  91. fprintf(sys.stderr, "pipewatch: timeout\n")
  92. generator.terminate()
  93. break
  94. generator.stdout.close()
  95. consumer.stdin.close()
  96. except IOError:
  97. fprintf(sys.stderr, "pipewatch: I/O error\n")
  98. def kill(proc):
  99. # Wait for a process to end, or kill it
  100. def poll_timeout(proc, timeout):
  101. for x in range(1+int(timeout / 0.1)):
  102. if proc.poll() is not None:
  103. break
  104. time.sleep(0.1)
  105. return proc.poll()
  106. if poll_timeout(proc, 0.5) is None:
  107. proc.terminate()
  108. if poll_timeout(proc, 0.5) is None:
  109. proc.kill()
  110. return poll_timeout(proc, 0.5)
  111. # Wait for them to die, or kill them
  112. gret = kill(generator)
  113. cret = kill(consumer)
  114. fprintf(sys.stderr, "pipewatch: generator returned %d, " +
  115. "consumer returned %d\n", gret, cret)
  116. if gret == 0 and cret == 0:
  117. sys.exit(0)
  118. sys.exit(1)
  119. try:
  120. os.unlink(args.lock)
  121. except OSError:
  122. pass
  123. if __name__ == "__main__":
  124. main()