nilmrun/nilmrun/processmanager.py

295 lines
8.8 KiB
Python

#!/usr/bin/env python3
import threading
import subprocess
import io
import sys
import os
import signal
import time
import uuid
import psutil
import tempfile
import atexit
import shutil
class ProcessError(Exception):
pass
class LogReceiver(object):
"""Spawn a thread that listens to a pipe for log messages,
and stores them locally."""
def __init__(self, pipe):
self.pipe = pipe
self.log = io.BytesIO()
self.thread = threading.Thread(target=self.run)
self.thread.start()
def run(self):
while True:
data = os.read(self.pipe, 65536)
if not data:
os.close(self.pipe)
return
self.log.write(data)
def getvalue(self):
return self.log.getvalue()
def clear(self):
self.log = io.BytesIO()
class Process(object):
"""Spawn and manage a subprocess, and capture its output."""
def __init__(self, argv, tempfile=None):
self.start_time = None
# Use a pipe for communicating log data
(rpipe, wpipe) = os.pipe()
self._log = LogReceiver(rpipe)
# Stdin is null
nullfd = os.open(os.devnull, os.O_RDONLY)
# Spawn the new process
try:
self._process = subprocess.Popen(args=argv, stdin=nullfd,
stdout=wpipe, stderr=wpipe,
close_fds=True, cwd="/tmp")
except (OSError, TypeError) as e:
raise ProcessError(str(e))
finally:
# Close the FDs we don't need
os.close(wpipe)
os.close(nullfd)
# Get process info
self.start_time = time.time()
self.pid = str(uuid.uuid1(self._process.pid or 0))
def _join(self, timeout=1.0):
start = time.time()
while True:
if self._process.poll() is not None:
return True
if (time.time() - start) >= timeout:
return False
time.sleep(0.1)
def terminate(self, timeout=1.0):
"""Terminate a process, and all of its children that are in the same
process group."""
try:
# First give it some time to die on its own
if self._join(timeout):
return True
def getpgid(pid):
try:
return os.getpgid(pid)
except OSError: # pragma: no cover
return None
def kill(pid, sig):
try:
return os.kill(pid, sig)
except OSError: # pragma: no cover
return
# Find all children
group = getpgid(self._process.pid)
main = psutil.Process(self._process.pid)
allproc = [main] + main.children(recursive=True)
# Kill with SIGTERM, if they're still in this process group
for proc in allproc:
if getpgid(proc.pid) == group:
kill(proc.pid, signal.SIGTERM)
# Wait for it to die again
if self._join(timeout):
return True
# One more try with SIGKILL
for proc in allproc:
if getpgid(proc.pid) == group:
kill(proc.pid, signal.SIGKILL)
# See if it worked
return self._join(timeout)
except psutil.Error: # pragma: no cover (race condition)
return True
def clear_log(self):
self._log.clear()
@property
def log(self):
return self._log.getvalue()
@property
def alive(self):
return self._process.poll() is None
@property
def exitcode(self):
return self._process.returncode
def get_info_prepare(self):
"""Prepare the process list and measurement for .get_info.
Call .get_info() about a second later."""
try:
main = psutil.Process(self._process.pid)
self._process_list = [main] + main.children(recursive=True)
for proc in self._process_list:
proc.cpu_percent(0)
except psutil.Error: # pragma: no cover (race condition)
self._process_list = []
@staticmethod
def get_empty_info():
return {"cpu_percent": 0,
"cpu_user": 0,
"cpu_sys": 0,
"mem_phys": 0,
"mem_virt": 0,
"io_read": 0,
"io_write": 0,
"procs": 0}
def get_info(self):
"""Return a dictionary with info about the process CPU and memory
usage. Call .get_info_prepare() about a second before this."""
d = self.get_empty_info()
for proc in self._process_list:
try:
d["cpu_percent"] += proc.cpu_percent(0)
cpuinfo = proc.cpu_times()
d["cpu_user"] += cpuinfo.user
d["cpu_sys"] += cpuinfo.system
meminfo = proc.memory_info()
d["mem_phys"] += meminfo.rss
d["mem_virt"] += meminfo.vms
ioinfo = proc.io_counters()
d["io_read"] += ioinfo.read_bytes
d["io_write"] += ioinfo.write_bytes
d["procs"] += 1
except psutil.Error:
pass
return d
class ProcessManager(object):
"""Track and manage a collection of Process objects"""
def __init__(self):
self.processes = {}
self.tmpdirs = {}
atexit.register(self._atexit)
def _cleanup_tmpdir(self, pid):
if pid in self.tmpdirs:
try:
shutil.rmtree(self.tmpdirs[pid])
except OSError: # pragma: no cover
pass
del self.tmpdirs[pid]
def _atexit(self):
# Kill remaining processes, remove their dirs
for pid in list(self.processes.keys()):
try:
self.processes[pid].terminate()
del self.processes[pid]
shutil.rmtree(self.tmpdirs[pid])
del self.tmpdirs[pid]
except Exception: # pragma: no cover
pass
def __iter__(self):
return iter(list(self.processes.keys()))
def __getitem__(self, key):
return self.processes[key]
def run_code(self, code, args):
"""Evaluate 'code' as if it were placed into a Python file and
executed. The arguments, which must be strings, will be
accessible in the code as sys.argv[1:]."""
# The easiest way to do this, by far, is to just write the
# code to a file. Make a directory to put it in.
tmpdir = tempfile.mkdtemp(prefix="nilmrun-usercode-")
try:
# Write the code
codepath = os.path.join(tmpdir, "usercode.py")
with open(codepath, "w") as f:
f.write(code)
# Save the args too, for debugging purposes
with open(os.path.join(tmpdir, "args.txt"), "w") as f:
f.write(repr(args))
# Run the code
argv = [sys.executable, "-B", "-s", "-u", codepath] + args
pid = self.run_command(argv)
# Save the temp dir
self.tmpdirs[pid] = tmpdir
tmpdir = None # Don't need to remove it anymore
return pid
finally:
# Clean up tempdir if we didn't finish
if tmpdir is not None:
try:
shutil.rmtree(tmpdir)
except OSError: # pragma: no cover
pass
def run_command(self, argv):
"""Execute a command line program"""
new = Process(argv)
self.processes[new.pid] = new
return new.pid
def terminate(self, pid):
return self.processes[pid].terminate()
def remove(self, pid):
self._cleanup_tmpdir(pid)
del self.processes[pid]
def get_info(self):
"""Get info about all running PIDs"""
info = {
"total": Process.get_empty_info(),
"pids": {},
"system": {}
}
# Trigger CPU usage collection
for pid in self:
self[pid].get_info_prepare()
psutil.cpu_percent(0, percpu=True)
# Give it some time
time.sleep(1)
# Retrieve info for system
info["system"]["cpu_percent"] = sum(psutil.cpu_percent(0, percpu=True))
info["system"]["cpu_max"] = 100.0 * psutil.cpu_count()
info["system"]["procs"] = len(psutil.pids())
meminfo = psutil.virtual_memory()
info["system"]["mem_total"] = meminfo.total
info["system"]["mem_used"] = meminfo.used
# Retrieve info for each PID
for pid in self:
info["pids"][pid] = self[pid].get_info()
# Update totals
for key in info["total"]:
info["total"][key] += info["pids"][pid][key]
return info