Work in progress on the process manager. Very finicky.

The multiprocessing environment is really finicky.
I'm seeing deadlocks in the process at exit, that are probably related
to tracebacks being printed and still getting redirected to the
logfile or something.
This commit is contained in:
Jim Paris 2013-07-05 19:35:09 -04:00
parent 65e48caf5f
commit f73de35ee6
6 changed files with 177 additions and 65 deletions

View File

@ -1,4 +1,4 @@
import nilmrun.threadmanager import nilmrun.processmanager
from ._version import get_versions from ._version import get_versions
__version__ = get_versions()['version'] __version__ = get_versions()['version']

143
nilmrun/processmanager.py Normal file
View File

@ -0,0 +1,143 @@
#!/usr/bin/python
from nilmdb.utils.printf import *
import threading
import multiprocessing
import cStringIO
import sys
import os
import time
import uuid
import traceback
class LogReceiver(object):
"""Spawn a thread that listens to a pipe for log messages,
and stores them locally."""
def __init__(self, pipe):
self.pipe = pipe
self.log = cStringIO.StringIO()
self.thread = threading.Thread(target = self.run)
self.thread.start()
def run(self):
try:
while True:
data = self.pipe.recv_bytes()
self.log.write(data)
except EOFError:
print "thread pipe died"
self.pipe.close()
print "thread closed"
time.sleep(1)
return
def getvalue(self):
return self.log.getvalue()
def clear(self):
self.log.truncate()
self.log.seek(0)
class LogSender(object):
"""File-like object that writes output to a pipe"""
def __init__(self, pipe):
self.pipe = pipe
def close(self):
if self.pipe:
self.pipe.close()
self.pipe = None
def write(self, data):
if self.pipe:
self.pipe.send_bytes(data)
def flush(self, data):
pass
def isatty(self):
return False
class Process(object):
"""Spawn and manage a running process"""
def __init__(self, name, function, parameters):
self.parameters = parameters
self.start_time = None
self.name = name
pipes = multiprocessing.Pipe(duplex = False)
self._log = LogReceiver(pipes[0])
self._process = multiprocessing.Process(target = self._tramp,
args = (function, pipes),
name = name)
self._process.daemon = True
self._process.start()
pipes[1].close()
self.start_time = time.time()
self.pid = str(uuid.uuid1(self._process.pid or 0))
def _tramp(self, function, pipes):
# Remap stdio in the child before calling function
print "pid:", os.getpid()
print "pipes:", pipes
print "pipes[0].fileno():", pipes[0].fileno()
print "pipes[1].fileno():", pipes[1].fileno()
pipes[0].close()
logfile = LogSender(pipes[1])
saved_stdin = sys.stdin
saved_stdout = sys.stdout
saved_stderr = sys.stderr
sys.stdin = open(os.devnull, 'r')
sys.stdout = logfile
sys.stderr = logfile
exitcode = 0
try:
function(self.parameters)
raise Exception("oh no")
except:
traceback.print_exc()
exitcode = 1
sys.stdin = saved_stdin
sys.stdout = saved_stdout
sys.stderr = saved_stderr
logfile.close()
sys.exit(exitcode)
def terminate(self, timeout = 3):
self._process.join(timeout)
if self.alive:
self._process.terminate()
def clear_log(self):
self._log.clear()
@property
def log(self):
return self._log.getvalue()
@property
def alive(self):
return self._process.is_alive()
@property
def error(self):
if self._process.exitcode:
return True
return False
class ProcessManager(object):
def __init__(self):
self.processes = {}
def __iter__(self):
return iter(self.processes.keys())
def __getitem__(self, key):
return self.processes[key]
def run(self, name, function, parameters):
new = Process(name, function, parameters)
self.processes[new.pid] = new
return new.pid

View File

@ -55,50 +55,51 @@ class App(object):
def version(self): def version(self):
return nilmrun.__version__ return nilmrun.__version__
class AppThread(object): class AppProcess(object):
def __init__(self, manager): def __init__(self, manager):
self.manager = manager self.manager = manager
def thread_status(self, pid): def process_status(self, pid):
return { return {
"pid": pid, "pid": pid,
"alive": self.manager[pid].alive, "alive": self.manager[pid].alive,
"error": self.manager[pid].error,
"name": self.manager[pid].name, "name": self.manager[pid].name,
"start_time": self.manager[pid].start_time, "start_time": self.manager[pid].start_time,
"parameters": self.manager[pid].parameters, "parameters": self.manager[pid].parameters,
"log": self.manager[pid].log, "log": self.manager[pid].log,
} }
# /thread/status # /process/status
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
def status(self, pid, clear = False): def status(self, pid, clear = False):
if pid not in self.manager: if pid not in self.manager:
raise cherrypy.NotFound() raise cherrypy.HTTPError("404 Not Found", "No such PID")
status = thread_status(pid) status = self.process_status(pid)
if clear: if clear:
self.manager[pid].clear_log() self.manager[pid].clear_log()
return status return status
# /thread/list # /process/list
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
def list(self): def list(self):
return list(self.manager) return list(self.manager)
# /thread/kill # /process/kill
@cherrypy.expose @cherrypy.expose
@cherrypy.tools.json_in() @cherrypy.tools.json_in()
@cherrypy.tools.json_out() @cherrypy.tools.json_out()
@cherrypy.tools.CORS_allow(methods = ["POST"]) @cherrypy.tools.CORS_allow(methods = ["POST"])
def kill(self, pid): def kill(self, pid):
if pid not in self.manager: if pid not in self.manager:
raise CherryPy.NotFound() raise cherrypy.HTTPError("404 Not Found", "No such PID")
if not self.manager.terminate(pid): if not self.manager.terminate(pid):
raise cherrypy.HTTPError("503 Service Unavailable", raise cherrypy.HTTPError("503 Service Unavailable",
"Failed to stop thread") "Failed to stop process")
status = thread_status(pid) status = self.process_status(pid)
manager.remove(pid) manager.remove(pid)
class AppFilter(object): class AppFilter(object):
@ -115,6 +116,7 @@ class AppFilter(object):
def trainola(self, data): def trainola(self, data):
return self.manager.run("trainola", nilmrun.trainola.trainola, data) return self.manager.run("trainola", nilmrun.trainola.trainola, data)
class Server(object): class Server(object):
def __init__(self, host = '127.0.0.1', port = 8080, def __init__(self, host = '127.0.0.1', port = 8080,
embedded = True, # hide diagnostics and output, etc embedded = True, # hide diagnostics and output, etc
@ -165,9 +167,9 @@ class Server(object):
cherrypy._cperror._ie_friendly_error_sizes = {} cherrypy._cperror._ie_friendly_error_sizes = {}
# Build up the application and mount it # Build up the application and mount it
manager = nilmrun.threadmanager.ThreadManager() manager = nilmrun.processmanager.ProcessManager()
root = App() root = App()
root.thread = AppThread(manager) root.process = AppProcess(manager)
root.filter = AppFilter(manager) root.filter = AppFilter(manager)
cherrypy.tree.apps = {} cherrypy.tree.apps = {}
cherrypy.tree.mount(root, basepath, config = { "/" : app_config }) cherrypy.tree.mount(root, basepath, config = { "/" : app_config })

View File

@ -1,46 +0,0 @@
#!/usr/bin/python
from nilmdb.utils.printf import *
import threading
class Thread(object):
def __init__(self, name, function, parameters):
self.parameters = parameters
self.start_time = None
self.log = ''
self._terminate = threading.Event()
self._thread = threading.Thread(target = function,
name = name,
args = ( parameters,
self._terminate ))
self._thread.daemon = True
self._thread.start()
def terminate(self, timeout = 60):
self._terminate.set()
self._thread.join(timeout = 60)
@property
def pid(self):
return self._thread.ident
@property
def name(self):
return self._thread.name
@property
def alive(self):
return self._thread.is_alive()
class ThreadManager(object):
def __init__(self):
self.threads = {}
def __iter__(self):
return iter(self.threads.keys())
def run(self, name, function, parameters):
new = Thread(name, function, parameters)
self.threads[new.pid] = new
return new.pid

View File

@ -7,7 +7,7 @@ test = nosetests
nocapture=1 nocapture=1
# Comment this out to see CherryPy logs on failure: # Comment this out to see CherryPy logs on failure:
nologcapture=1 nologcapture=1
with-coverage=1 #with-coverage=1
cover-inclusive=1 cover-inclusive=1
cover-package=nilmrun cover-package=nilmrun
cover-erase=1 cover-erase=1

View File

@ -24,6 +24,7 @@ import re
import urllib2 import urllib2
from urllib2 import urlopen, HTTPError from urllib2 import urlopen, HTTPError
import requests import requests
import pprint
from testutil.helpers import * from testutil.helpers import *
@ -61,15 +62,17 @@ class TestClient(object):
def test_client_02_manager(self): def test_client_02_manager(self):
client = HTTPClient(baseurl = testurl) client = HTTPClient(baseurl = testurl)
eq_(client.get("/thread/list"), []) eq_(client.get("/process/list"), [])
with assert_raises(ClientError) as e:
client.get("/process/status", { "pid": 12345 })
in_("No such PID", str(e.exception))
with assert_raises(ClientError): with assert_raises(ClientError):
client.get("/thread/status", { "pid": 12345 }) client.get("/process/kill", { "pid": 12345 })
with assert_raises(ClientError): in_("No such PID", str(e.exception))
client.get("/thread/kill", { "pid": 12345 })
def test_client_03_trainola(self): def test_client_03_trainola(self):
client = HTTPClient(baseurl = testurl) client = HTTPClient(baseurl = testurl, post_json = True)
data = { "url": "http://bucket.mit.edu/nilmdb", data = { "url": "http://bucket.mit.edu/nilmdb",
"stream": "/sharon/prep-a", "stream": "/sharon/prep-a",
@ -100,4 +103,14 @@ class TestClient(object):
] ]
} }
client.post("/filter/trainola", { "data": data }) client.post("/filter/trainola", { "data": data })
pid = client.get("/process/list")[0]
eq_(client.get("/process/list"), [pid])
while True:
time.sleep(1)
status = client.get("/process/status", { "pid": pid })
pprint.pprint(status)
print status["alive"]
if status["alive"] == False:
break
pprint.pprint(status)