|
- #!/usr/bin/python
-
- from nilmdb.utils.printf import *
-
- import threading
- import subprocess
- import io
- import sys
- import os
- import signal
- import time
- import uuid
- import psutil
- import tempfile
- import atexit
- import shutil
-
- class ProcessError(Exception):
- pass
-
- class LogReceiver(object):
- """Spawn a thread that listens to a pipe for log messages,
- and stores them locally."""
- def __init__(self, pipe):
- self.pipe = pipe
- self.log = io.BytesIO()
- self.thread = threading.Thread(target = self.run)
- self.thread.start()
-
- def run(self):
- while True:
- data = os.read(self.pipe, 65536)
- if not data:
- os.close(self.pipe)
- return
- self.log.write(data)
-
- def getvalue(self):
- return self.log.getvalue()
-
- def clear(self):
- self.log = io.BytesIO()
-
- class Process(object):
- """Spawn and manage a subprocess, and capture its output."""
- def __init__(self, argv, tempfile = None):
- self.start_time = None
-
- # Use a pipe for communicating log data
- (rpipe, wpipe) = os.pipe()
- self._log = LogReceiver(rpipe)
-
- # Stdin is null
- nullfd = os.open(os.devnull, os.O_RDONLY)
-
- # Spawn the new process
- try:
- self._process = subprocess.Popen(args = argv, stdin = nullfd,
- stdout = wpipe, stderr = wpipe,
- close_fds = True, cwd = "/tmp")
- except (OSError, TypeError) as e:
- raise ProcessError(str(e))
- finally:
- # Close the FDs we don't need
- os.close(wpipe)
- os.close(nullfd)
-
- # Get process info
- self.start_time = time.time()
- self.pid = str(uuid.uuid1(self._process.pid or 0))
-
- def _join(self, timeout = 1.0):
- start = time.time()
- while True:
- if self._process.poll() is not None:
- return True
- if (time.time() - start) >= timeout:
- return False
- time.sleep(0.1)
-
- def terminate(self, timeout = 1.0):
- """Terminate a process, and all of its children that are in the same
- process group."""
- try:
- # First give it some time to die on its own
- if self._join(timeout):
- return True
-
- def getpgid(pid):
- try:
- return os.getpgid(pid)
- except OSError: # pragma: no cover
- return None
-
- def kill(pid, sig):
- try:
- return os.kill(pid, sig)
- except OSError: # pragma: no cover
- return
-
- # Find all children
- group = getpgid(self._process.pid)
- main = psutil.Process(self._process.pid)
- allproc = [ main ] + main.children(recursive = True)
-
- # Kill with SIGTERM, if they're still in this process group
- for proc in allproc:
- if getpgid(proc.pid) == group:
- kill(proc.pid, signal.SIGTERM)
-
- # Wait for it to die again
- if self._join(timeout):
- return True
-
- # One more try with SIGKILL
- for proc in allproc:
- if getpgid(proc.pid) == group:
- kill(proc.pid, signal.SIGKILL)
-
- # See if it worked
- return self._join(timeout)
- except psutil.Error: # pragma: no cover (race condition)
- return True
-
- def clear_log(self):
- self._log.clear()
-
- @property
- def log(self):
- return self._log.getvalue()
-
- @property
- def alive(self):
- return self._process.poll() is None
-
- @property
- def exitcode(self):
- return self._process.returncode
-
- def get_info_prepare(self):
- """Prepare the process list and measurement for .get_info.
- Call .get_info() about a second later."""
- try:
- main = psutil.Process(self._process.pid)
- self._process_list = [ main ] + main.children(recursive = True)
- for proc in self._process_list:
- proc.cpu_percent(0)
- except psutil.Error: # pragma: no cover (race condition)
- self._process_list = [ ]
-
- @staticmethod
- def get_empty_info():
- return { "cpu_percent": 0,
- "cpu_user": 0,
- "cpu_sys": 0,
- "mem_phys": 0,
- "mem_virt": 0,
- "io_read": 0,
- "io_write": 0,
- "procs": 0 }
-
- def get_info(self):
- """Return a dictionary with info about the process CPU and memory
- usage. Call .get_info_prepare() about a second before this."""
- d = self.get_empty_info()
- for proc in self._process_list:
- try:
- d["cpu_percent"] += proc.cpu_percent(0)
- cpuinfo = proc.cpu_times()
- d["cpu_user"] += cpuinfo.user
- d["cpu_sys"] += cpuinfo.system
- meminfo = proc.memory_info()
- d["mem_phys"] += meminfo.rss
- d["mem_virt"] += meminfo.vms
- ioinfo = proc.io_counters()
- d["io_read"] += ioinfo.read_bytes
- d["io_write"] += ioinfo.write_bytes
- d["procs"] += 1
- except psutil.Error:
- pass
- return d
-
- class ProcessManager(object):
- """Track and manage a collection of Process objects"""
- def __init__(self):
- self.processes = {}
- self.tmpdirs = {}
- atexit.register(self._atexit)
-
- def _cleanup_tmpdir(self, pid):
- if pid in self.tmpdirs:
- try:
- shutil.rmtree(self.tmpdirs[pid])
- except OSError: # pragma: no cover
- pass
- del self.tmpdirs[pid]
-
- def _atexit(self):
- # Kill remaining processes, remove their dirs
- for pid in list(self.processes.keys()):
- try:
- self.processes[pid].terminate()
- del self.processes[pid]
- shutil.rmtree(self.tmpdirs[pid])
- del self.tmpdirs[pid]
- except Exception: # pragma: no cover
- pass
-
- def __iter__(self):
- return iter(list(self.processes.keys()))
-
- def __getitem__(self, key):
- return self.processes[key]
-
- def run_code(self, code, args):
- """Evaluate 'code' as if it were placed into a Python file and
- executed. The arguments, which must be strings, will be
- accessible in the code as sys.argv[1:]."""
- # The easiest way to do this, by far, is to just write the
- # code to a file. Make a directory to put it in.
- tmpdir = tempfile.mkdtemp(prefix = "nilmrun-usercode-")
- try:
- # Write the code
- codepath = os.path.join(tmpdir, "usercode.py")
- with open(codepath, "w") as f:
- f.write(code)
- # Save the args too, for debugging purposes
- with open(os.path.join(tmpdir, "args.txt"), "w") as f:
- f.write(repr(args))
-
- # Run the code
- argv = [ sys.executable, "-B", "-s", "-u", codepath ] + args
- pid = self.run_command(argv)
-
- # Save the temp dir
- self.tmpdirs[pid] = tmpdir
- tmpdir = None # Don't need to remove it anymore
-
- return pid
- finally:
- # Clean up tempdir if we didn't finish
- if tmpdir is not None:
- try:
- shutil.rmtree(tmpdir)
- except OSError: # pragma: no cover
- pass
-
- def run_command(self, argv):
- """Execute a command line program"""
- new = Process(argv)
- self.processes[new.pid] = new
- return new.pid
-
- def terminate(self, pid):
- return self.processes[pid].terminate()
-
- def remove(self, pid):
- self._cleanup_tmpdir(pid)
- del self.processes[pid]
-
- def get_info(self):
- """Get info about all running PIDs"""
- info = { "total" : Process.get_empty_info(),
- "pids" : {},
- "system" : {}
- }
-
- # Trigger CPU usage collection
- for pid in self:
- self[pid].get_info_prepare()
- psutil.cpu_percent(0, percpu = True)
-
- # Give it some time
- time.sleep(1)
-
- # Retrieve info for system
- info["system"]["cpu_percent"] = sum(psutil.cpu_percent(0, percpu=True))
- info["system"]["cpu_max"] = 100.0 * psutil.cpu_count()
- info["system"]["procs"] = len(psutil.pids())
- meminfo = psutil.virtual_memory()
- info["system"]["mem_total"] = meminfo.total
- info["system"]["mem_used"] = meminfo.used
-
- # Retrieve info for each PID
- for pid in self:
- info["pids"][pid] = self[pid].get_info()
- # Update totals
- for key in info["total"]:
- info["total"][key] += info["pids"][pid][key]
-
- return info
|