Browse Source

Add --daemon flag

tags/nilmtools-1.4.1
Jim Paris 8 years ago
parent
commit
cf2c28b0fb
3 changed files with 87 additions and 73 deletions
  1. +1
    -1
      README.txt
  2. +85
    -72
      nilmtools/pipewatch.py
  3. +1
    -0
      setup.py

+ 1
- 1
README.txt View File

@@ -6,7 +6,7 @@ Prerequisites:

# Runtime and build environments
sudo apt-get install python2.7 python2.7-dev python-setuptools
sudo apt-get install python-numpy python-scipy
sudo apt-get install python-numpy python-scipy python-daemon

nilmdb (1.8.1+)



+ 85
- 72
nilmtools/pipewatch.py View File

@@ -15,6 +15,7 @@ import threading
import select
import signal
import Queue
import daemon

def parse_args(argv = None):
parser = argparse.ArgumentParser(
@@ -29,6 +30,8 @@ def parse_args(argv = None):
Intended for use with ethstream (generator) and nilm-insert
(consumer). Commands are executed through the shell.
""")
parser.add_argument("-d", "--daemon", action="store_true",
help="Run in background")
parser.add_argument("-l", "--lock", metavar="FILENAME", action="store",
default=tempfile.gettempdir() +
"/nilm-pipewatch.lock",
@@ -74,82 +77,92 @@ def watcher_thread(queue, procs):
return
time.sleep(0.25)

def main(argv = None):
args = parse_args(argv)

with open(args.lock, "w") as lockfile:
if not nilmdb.utils.lock.exclusive_lock(lockfile):
printf("pipewatch process already running (according to %s)\n",
args.lock)
def pipewatch(args):
# Run the processes, etc
with open(os.devnull, "r") as devnull:
generator = subprocess.Popen(args.generator, shell = True,
bufsize = -1, close_fds = True,
stdin = devnull,
stdout = subprocess.PIPE,
stderr = None)
consumer = subprocess.Popen(args.consumer, shell = True,
bufsize = -11, close_fds = True,
stdin = subprocess.PIPE,
stdout = None, stderr = None)

queue = Queue.Queue(maxsize = 32)
reader = threading.Thread(target = reader_thread,
args = (queue, generator.stdout.fileno()))
reader.start()
watcher = threading.Thread(target = watcher_thread,
args = (queue, [generator, consumer]))
watcher.start()
try:
while True:
try:
data = queue.get(True, args.timeout)
if data is None:
break
consumer.stdin.write(data)
except Queue.Empty:
# Timeout: kill the generator
fprintf(sys.stderr, "pipewatch: timeout\n")
generator.terminate()
break

generator.stdout.close()
consumer.stdin.close()
except IOError:
fprintf(sys.stderr, "pipewatch: I/O error\n")

def kill(proc):
# Wait for a process to end, or kill it
def poll_timeout(proc, timeout):
for x in range(1+int(timeout / 0.1)):
if proc.poll() is not None:
break
time.sleep(0.1)
return proc.poll()
try:
if poll_timeout(proc, 0.5) is None:
proc.terminate()
if poll_timeout(proc, 0.5) is None:
proc.kill()
except OSError:
pass
return poll_timeout(proc, 0.5)

# Wait for them to die, or kill them
gret = kill(generator)
cret = kill(consumer)

fprintf(sys.stderr, "pipewatch: generator returned %d, " +
"consumer returned %d\n", gret, cret)
if gret == 0 and cret == 0:
sys.exit(0)
sys.exit(1)

def main(argv = None):
args = parse_args(argv)

with open(os.devnull, "r") as devnull:
generator = subprocess.Popen(args.generator, shell = True,
bufsize = -1, close_fds = True,
stdin = devnull,
stdout = subprocess.PIPE,
stderr = None)
consumer = subprocess.Popen(args.consumer, shell = True,
bufsize = -11, close_fds = True,
stdin = subprocess.PIPE,
stdout = None, stderr = None)

queue = Queue.Queue(maxsize = 32)
reader = threading.Thread(target = reader_thread,
args = (queue, generator.stdout.fileno()))
reader.start()
watcher = threading.Thread(target = watcher_thread,
args = (queue, [generator, consumer]))
watcher.start()
try:
while True:
try:
data = queue.get(True, args.timeout)
if data is None:
break
consumer.stdin.write(data)
except Queue.Empty:
# Timeout: kill the generator
fprintf(sys.stderr, "pipewatch: timeout\n")
generator.terminate()
break

generator.stdout.close()
consumer.stdin.close()
except IOError:
fprintf(sys.stderr, "pipewatch: I/O error\n")

def kill(proc):
# Wait for a process to end, or kill it
def poll_timeout(proc, timeout):
for x in range(1+int(timeout / 0.1)):
if proc.poll() is not None:
break
time.sleep(0.1)
return proc.poll()
try:
if poll_timeout(proc, 0.5) is None:
proc.terminate()
if poll_timeout(proc, 0.5) is None:
proc.kill()
except OSError:
pass
return poll_timeout(proc, 0.5)

# Wait for them to die, or kill them
gret = kill(generator)
cret = kill(consumer)

fprintf(sys.stderr, "pipewatch: generator returned %d, " +
"consumer returned %d\n", gret, cret)
if gret == 0 and cret == 0:
sys.exit(0)
sys.exit(1)
lockfile = open(args.lock, "w")
if not nilmdb.utils.lock.exclusive_lock(lockfile):
printf("pipewatch process already running (according to %s)\n",
args.lock)
sys.exit(0)
try:
os.unlink(args.lock)
except OSError:
pass
# Run as a daemon if requested, otherwise run directly.
if args.daemon:
with daemon.DaemonContext(files_preserve = [ lockfile ]):
pipewatch(args)
else:
pipewatch(args)
finally:
# Clean up lockfile
try:
os.unlink(args.lock)
except OSError:
pass

if __name__ == "__main__":
main()

+ 1
- 0
setup.py View File

@@ -64,6 +64,7 @@ setup(name='nilmtools',
install_requires = [ 'nilmdb >= 1.8.1',
'numpy',
'scipy',
'daemon',
#'matplotlib',
],
packages = [ 'nilmtools',


Loading…
Cancel
Save