Add /process/command; fix killing of forked processes

Now an entire process tree is killed in /process/remove, as long as
each child hasn't changed its process group.
This commit is contained in:
Jim Paris 2013-07-07 12:13:13 -04:00
parent 30a3559253
commit 734e1d9b52
3 changed files with 177 additions and 93 deletions

View File

@ -10,6 +10,8 @@ import os
import signal
import time
import uuid
import subprocess
import psutil
class LogReceiver(object):
"""Spawn a thread that listens to a pipe for log messages,
@ -21,13 +23,12 @@ class LogReceiver(object):
self.thread.start()
def run(self):
try:
while True:
data = self.pipe.recv_bytes()
self.log.write(data)
except EOFError:
self.pipe.close()
data = os.read(self.pipe, 65536)
if not data:
os.close(self.pipe)
return
self.log.write(data)
def getvalue(self):
return self.log.getvalue()
@ -35,64 +36,107 @@ class LogReceiver(object):
def clear(self):
self.log = cStringIO.StringIO()
class LogSender(object): # pragma: no cover (runs in a different process)
"""File-like object that writes output to a pipe"""
def __init__(self, pipe):
self.pipe = pipe
def close(self):
if self.pipe:
self.pipe.close()
self.pipe = None
def write(self, data):
if self.pipe:
self.pipe.send_bytes(data)
def flush(self):
pass
def isatty(self):
return False
class Process(object):
"""Spawn and manage a running process"""
"""Spawn and manage a process that calls a Python function"""
def __init__(self, name, function, parameters):
self.parameters = parameters
self.start_time = None
self.name = name
pipes = multiprocessing.Pipe(duplex = False)
self._log = LogReceiver(pipes[0])
# Use a pipe for communicating log data
(rpipe, wpipe) = os.pipe()
self._log = LogReceiver(rpipe)
# Start the function in a new process
self._process = multiprocessing.Process(
target = self._tramp, name = name,
args = (function, pipes, parameters))
target = self._trampoline, name = name,
args = (function, rpipe, wpipe, parameters))
self._process.daemon = True
self._process.start()
pipes[1].close()
# Close the writer end of the pipe, get process info
os.close(wpipe)
self.start_time = time.time()
self.pid = str(uuid.uuid1(self._process.pid or 0))
def _tramp(self, function, pipes, parameters): # pragma: no cover
# Remap stdio in the child before calling function
pipes[0].close()
logfile = LogSender(pipes[1])
sys.stdin = open(os.devnull, 'r')
sys.stdout = logfile
sys.stderr = logfile
function(parameters)
def _trampoline(self, func, rpipe, wpipe, param): # pragma: no cover
# No coverage report for this, because it's executed in a subprocess
"""Trampoline function to set up stdio and call the real function."""
# Close the reader end of the pipe
os.close(rpipe)
def terminate(self, force = False, timeout = 1.0):
# Like os.close() but ignores errors
def tryclose(fd):
try:
os.close(fd)
except OSError:
pass
# Remap stdio to go to the pipe. We do this at the OS level,
# replacing FDs, so that future spawned processes do the right thing.
# stdin
sys.stdin.close()
tryclose(0)
fd = os.open(os.devnull, os.O_RDONLY) # 0
sys.stdin = os.fdopen(fd, 'r', 0)
# stdout
sys.stdout.close()
tryclose(1)
fd = os.dup(wpipe) # 1
sys.stdout = os.fdopen(fd, 'w', 0)
# stdout
sys.stderr.close()
tryclose(2)
fd = os.dup(wpipe) # 2
sys.stderr = os.fdopen(fd, 'w', 0)
# Don't need this extra fd
os.close(wpipe)
# Ready to go -- call the function
func(param)
def terminate(self, timeout = 1.0):
"""Terminate a process, and all of its children that are in the same
process group."""
# First give it some time to die on its own
self._process.join(timeout)
if self.alive:
self._process.terminate()
self._process.join(timeout)
if self.alive and force:
os.kill(self._process.pid, signal.SIGKILL)
self._process.join(timeout)
if self.alive:
return False
if not self.alive:
return True
def getpgid(pid):
try:
return os.getpgid(pid)
except OSError: # pragma: no cover
return None
# Find all children
group = getpgid(self._process.pid)
main = psutil.Process(self._process.pid)
allproc = [ main ] + main.get_children(recursive = True)
# Kill with SIGTERM, if they're still in this process group
for proc in allproc:
if getpgid(proc.pid) == group:
os.kill(proc.pid, signal.SIGTERM)
# Wait for it to die again
self._process.join(timeout)
if not self.alive:
return True
# One more try with SIGKILL
for proc in allproc:
if getpgid(proc.pid) == group:
os.kill(proc.pid, signal.SIGKILL)
# See if it worked
self._process.join(timeout)
return not self.alive
def clear_log(self):
self._log.clear()
@ -119,13 +163,19 @@ class ProcessManager(object):
def __getitem__(self, key):
return self.processes[key]
def run(self, name, function, parameters):
def run_python(self, name, function, parameters):
new = Process(name, function, parameters)
self.processes[new.pid] = new
return new.pid
def terminate(self, pid, force = False):
return self.processes[pid].terminate(force = force)
def run_command(self, name, args):
def spwan_user_command(args): # pragma: no cover (runs in subprocess)
p = subprocess.Popen(args, close_fds = True, cwd = "/tmp")
sys.exit(p.wait())
return self.run_python(name, spwan_user_command, args)
def terminate(self, pid):
return self.processes[pid].terminate()
def remove(self, pid):
del self.processes[pid]

View File

@ -98,14 +98,11 @@ class AppProcess(object):
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@cherrypy.tools.CORS_allow(methods = ["POST"])
def remove(self, pid, force = False):
"""Remove a process from the manager, killing it if necessary.
If 'force' is 1, try harder to kill it."""
if force == "0":
force = False
def remove(self, pid):
"""Remove a process from the manager, killing it if necessary."""
if pid not in self.manager:
raise cherrypy.HTTPError("404 Not Found", "No such PID")
if not self.manager.terminate(pid, force):
if not self.manager.terminate(pid): # pragma: no cover
raise cherrypy.HTTPError("503 Service Unavailable",
"Failed to stop process")
status = self.process_status(pid)
@ -113,20 +110,15 @@ class AppProcess(object):
return status
# /process/command
# @cherrypy.expose
# @cherrypy.tools.json_in()
# @cherrypy.tools.json_out()
# @cherrypy.tools.CORS_allow(methods = ["POST"])
# def remove(self, ):
# """Run an arbitrary shell command"""
# if pid not in self.manager:
# raise cherrypy.HTTPError("404 Not Found", "No such PID")
# if not self.manager.terminate(pid):
# raise cherrypy.HTTPError("503 Service Unavailable",
# "Failed to stop process")
# status = self.process_status(pid)
# self.manager.remove(pid)
# return status
@cherrypy.expose
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@cherrypy.tools.CORS_allow(methods = ["POST"])
def command(self, args):
"""Execute an arbitrary program on the server. 'args' is the
argument list, with 'args[0]' being the program and 'args[1]',
'args[2]', etc as arguments."""
return self.manager.run_command("command", args)
class AppFilter(object):
@ -140,7 +132,7 @@ class AppFilter(object):
@exception_to_httperror(KeyError, ValueError)
@cherrypy.tools.CORS_allow(methods = ["POST"])
def trainola(self, data):
return self.manager.run(
return self.manager.run_python(
"trainola", nilmrun.filters.trainola.filterfunc, data)
# /filter/dummy
@ -150,7 +142,7 @@ class AppFilter(object):
@exception_to_httperror(KeyError, ValueError)
@cherrypy.tools.CORS_allow(methods = ["POST"])
def dummy(self, data):
return self.manager.run(
return self.manager.run_python(
"dummy", nilmrun.filters.dummy.filterfunc, data)
class Server(object):

View File

@ -46,6 +46,16 @@ def teardown_module():
class TestClient(object):
def wait_end(self, client, pid, timeout = 5):
start = time.time()
status = None
while (time.time() - start) < timeout:
status = client.get("/process/status", { "pid": pid })
if status["alive"] == False:
return status
raise AssertionError("process " + str(pid) + " didn't die in " +
str(timeout) + " seconds: " + repr(status))
def test_client_01_basic(self):
client = HTTPClient(baseurl = testurl)
version = client.get("/version")
@ -91,11 +101,8 @@ class TestClient(object):
nin_("dummy 0\ndummy 1\ndummy 2\ndummy 3\n", status["log"])
# See that it ended properly
start = time.time()
while status["alive"] == True and (time.time() - start) < 5:
status = client.get("/process/status", { "pid": pid })
status = self.wait_end(client, pid)
in_("dummy 27\ndummy 28\ndummy 29\n", status["log"])
eq_(status["alive"], False)
eq_(status["exitcode"], 0)
# Remove it
@ -122,7 +129,11 @@ class TestClient(object):
newpid = client.post("/filter/dummy", { "data": 50 })
ne_(newpid, pid)
time.sleep(0.5)
start = time.time()
status = client.post("/process/remove", { "pid": newpid })
elapsed = time.time() - start
# Should have died in slightly over 1 second
assert(0.5 < elapsed < 2)
eq_(status["alive"], False)
ne_(status["exitcode"], 0)
@ -130,28 +141,21 @@ class TestClient(object):
eq_(client.get("/process/list"), [])
# Try to remove a running filter that ignored SIGTERM
# (can't be killed, at least on POSIX platforms).
pid = client.post("/filter/dummy", { "data": 0 })
with assert_raises(ServerError) as e:
status = client.post("/process/remove", { "pid": pid, "force":"0" })
in_("503 Service Unavailable", str(e.exception))
in_("Failed to stop process", str(e.exception))
# Kill it harder
status = client.post("/process/remove", { "pid": pid, "force": True })
start = time.time()
status = client.post("/process/remove", { "pid": pid })
elapsed = time.time() - start
# Should have died in slightly over 2 seconds
assert(1.5 < elapsed < 3)
eq_(status["alive"], False)
ne_(status["exitcode"], 0)
def test_client_05_trainola_simple(self):
client = HTTPClient(baseurl = testurl, post_json = True)
pid = client.post("/filter/trainola", { "data": {} })
start = time.time()
while (time.time() - start) < 5:
status = client.get("/process/status", { "pid": pid })
if status["alive"] == False:
break
eq_(status["alive"], False)
status = self.wait_end(client, pid)
ne_(status["exitcode"], 0)
status = client.post("/process/remove", { "pid": pid })
@unittest.skip("needs a running nilmdb")
def test_client_06_trainola(self):
@ -208,3 +212,41 @@ class TestClient(object):
raise AssertionError("took too long")
if i < 3:
raise AssertionError("too fast?")
def test_client_07_process_command(self):
client = HTTPClient(baseurl = testurl, post_json = True)
eq_(client.get("/process/list"), [])
def do(args, kill):
pid = client.post("/process/command", { "args": args } )
eq_(client.get("/process/list"), [pid])
if kill:
time.sleep(1)
status = client.get("/process/status", { "pid": pid })
if not status["alive"]:
raise AssertionError("died before we could kill it")
status = client.post("/process/remove", { "pid": pid })
if status["alive"]:
raise AssertionError("didn't get killed")
else:
self.wait_end(client, pid)
status = client.post("/process/remove", { "pid": pid })
return status
# Simple command
status = do(["pwd"], False)
eq_(status["exitcode"], 0)
eq_("/tmp\n", status["log"])
# Command with args
status = do(["expr", "1", "+", "2"], False)
eq_(status["exitcode"], 0)
eq_("3\n", status["log"])
# Missing command
status = do(["/no-such-command-blah-blah"], False)
ne_(status["exitcode"], 0)
# Kill a slow command
status = do(["sleep", "60"], True)
ne_(status["exitcode"], 0)