Browse Source

start of pipewatch util

tags/nilmtools-1.4.0
Jim Paris 8 years ago
parent
commit
0589b8d316
1 changed files with 115 additions and 0 deletions
  1. +115
    -0
      nilmtools/pipewatch.py

+ 115
- 0
nilmtools/pipewatch.py View File

@@ -0,0 +1,115 @@
#!/usr/bin/python

import nilmdb.client
from nilmdb.utils.printf import *
import nilmdb.utils.lock
import nilmtools

import time
import sys
import os
import argparse
import subprocess
import tempfile
import threading
import select
import signal

def parse_args(argv = None):
parser = argparse.ArgumentParser(
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
version = nilmtools.__version__,
description = """\
Pipe data from 'generator' to 'consumer'. This is intended to be
executed frequently from cron, and will exit if another copy is
already running. If 'generator' or 'consumer' returns an error,
or if 'generator' stops sending data for a while, it will exit.

Intended for use with ethstream (generator) and nilm-insert
(consumer). Commands are executed through the shell.
""")
parser.add_argument("-l", "--lock", metavar="FILENAME", action="store",
default=tempfile.gettempdir() +
"/nilm-pipewatch.lock",
help="Lock file for detecting running instance")
parser.add_argument("-t", "--timeout", metavar="SECONDS", action="store",
type=float, default=30,
help="Restart if no output from " +
"generator for this long")
group = parser.add_argument_group("commands to execute")
group.add_argument("generator", action="store",
help="Data generator (e.g. \"ethstream -r 8000\")")
group.add_argument("consumer", action="store",
help="Data consumer (e.g. \"nilm-insert /foo/bar\")")
args = parser.parse_args(argv)

return args

def sigpipe_fixup():
signal.signal(signal.SIGPIPE, signal.SIG_DFL)

def main(argv = None):
args = parse_args(argv)

with open(args.lock, "w") as lockfile:
if not nilmdb.utils.lock.exclusive_lock(lockfile):
printf("pipewatch process already running (according to %s)\n",
args.lock)
sys.exit(0)


with open(os.devnull, "r") as devnull:
sigpipe_fixup()
generator = subprocess.Popen(args.generator, shell = True,
bufsize = -1, close_fds = True,
stdin = devnull,
stdout = subprocess.PIPE,
stderr = None,
preexec_fn = sigpipe_fixup)
consumer = subprocess.Popen(args.consumer, shell = True,
bufsize = -1, close_fds = True,
stdin = subprocess.PIPE,
stdout = None, stderr = None,
preexec_fn = sigpipe_fixup)

stdout = generator.stdout.fileno()
stdin = consumer.stdin.fileno()
while True:
( r, w, x ) = select.select([stdout], [], [stdout,stdin],
args.timeout)
if not r and not x:
fprintf(sys.stderr, "pipewatch: generator timeout\n")
break
if x:
fprintf(sys.stderr, "pipewatch: I/O error\n")
break
print r, w, x
print "reading"
data = os.read(stdout, 65536)
if data == "":
fprintf(sys.stderr, "generator EOF\n")
break
os.write(stdin, data)

generator.stdout.close()
consumer.stdin.close()

def wait_kill(proc):
start = time.time()

gret = generator.wait()
cret = consumer.wait()

fprintf(sys.stderr, "generator returned %d, " +
"consumer returned %d\n", gret, cret)
if gret == 0 and cret == 0:
sys.exit(0)
sys.exit(1)
try:
os.unlink(args.lock)
except OSError:
pass

if __name__ == "__main__":
main()

Loading…
Cancel
Save