You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

295 lines
8.8 KiB

  1. #!/usr/bin/env python3
  2. import threading
  3. import subprocess
  4. import io
  5. import sys
  6. import os
  7. import signal
  8. import time
  9. import uuid
  10. import psutil
  11. import tempfile
  12. import atexit
  13. import shutil
  14. class ProcessError(Exception):
  15. pass
  16. class LogReceiver(object):
  17. """Spawn a thread that listens to a pipe for log messages,
  18. and stores them locally."""
  19. def __init__(self, pipe):
  20. self.pipe = pipe
  21. self.log = io.BytesIO()
  22. self.thread = threading.Thread(target=self.run)
  23. self.thread.start()
  24. def run(self):
  25. while True:
  26. data = os.read(self.pipe, 65536)
  27. if not data:
  28. os.close(self.pipe)
  29. return
  30. self.log.write(data)
  31. def getvalue(self):
  32. return self.log.getvalue()
  33. def clear(self):
  34. self.log = io.BytesIO()
  35. class Process(object):
  36. """Spawn and manage a subprocess, and capture its output."""
  37. def __init__(self, argv, tempfile=None):
  38. self.start_time = None
  39. # Use a pipe for communicating log data
  40. (rpipe, wpipe) = os.pipe()
  41. self._log = LogReceiver(rpipe)
  42. # Stdin is null
  43. nullfd = os.open(os.devnull, os.O_RDONLY)
  44. # Spawn the new process
  45. try:
  46. self._process = subprocess.Popen(args=argv, stdin=nullfd,
  47. stdout=wpipe, stderr=wpipe,
  48. close_fds=True, cwd="/tmp")
  49. except (OSError, TypeError) as e:
  50. raise ProcessError(str(e))
  51. finally:
  52. # Close the FDs we don't need
  53. os.close(wpipe)
  54. os.close(nullfd)
  55. # Get process info
  56. self.start_time = time.time()
  57. self.pid = str(uuid.uuid1(self._process.pid or 0))
  58. def _join(self, timeout=1.0):
  59. start = time.time()
  60. while True:
  61. if self._process.poll() is not None:
  62. return True
  63. if (time.time() - start) >= timeout:
  64. return False
  65. time.sleep(0.1)
  66. def terminate(self, timeout=1.0):
  67. """Terminate a process, and all of its children that are in the same
  68. process group."""
  69. try:
  70. # First give it some time to die on its own
  71. if self._join(timeout):
  72. return True
  73. def getpgid(pid):
  74. try:
  75. return os.getpgid(pid)
  76. except OSError: # pragma: no cover
  77. return None
  78. def kill(pid, sig):
  79. try:
  80. return os.kill(pid, sig)
  81. except OSError: # pragma: no cover
  82. return
  83. # Find all children
  84. group = getpgid(self._process.pid)
  85. main = psutil.Process(self._process.pid)
  86. allproc = [main] + main.children(recursive=True)
  87. # Kill with SIGTERM, if they're still in this process group
  88. for proc in allproc:
  89. if getpgid(proc.pid) == group:
  90. kill(proc.pid, signal.SIGTERM)
  91. # Wait for it to die again
  92. if self._join(timeout):
  93. return True
  94. # One more try with SIGKILL
  95. for proc in allproc:
  96. if getpgid(proc.pid) == group:
  97. kill(proc.pid, signal.SIGKILL)
  98. # See if it worked
  99. return self._join(timeout)
  100. except psutil.Error: # pragma: no cover (race condition)
  101. return True
  102. def clear_log(self):
  103. self._log.clear()
  104. @property
  105. def log(self):
  106. return self._log.getvalue()
  107. @property
  108. def alive(self):
  109. return self._process.poll() is None
  110. @property
  111. def exitcode(self):
  112. return self._process.returncode
  113. def get_info_prepare(self):
  114. """Prepare the process list and measurement for .get_info.
  115. Call .get_info() about a second later."""
  116. try:
  117. main = psutil.Process(self._process.pid)
  118. self._process_list = [main] + main.children(recursive=True)
  119. for proc in self._process_list:
  120. proc.cpu_percent(0)
  121. except psutil.Error: # pragma: no cover (race condition)
  122. self._process_list = []
  123. @staticmethod
  124. def get_empty_info():
  125. return {"cpu_percent": 0,
  126. "cpu_user": 0,
  127. "cpu_sys": 0,
  128. "mem_phys": 0,
  129. "mem_virt": 0,
  130. "io_read": 0,
  131. "io_write": 0,
  132. "procs": 0}
  133. def get_info(self):
  134. """Return a dictionary with info about the process CPU and memory
  135. usage. Call .get_info_prepare() about a second before this."""
  136. d = self.get_empty_info()
  137. for proc in self._process_list:
  138. try:
  139. d["cpu_percent"] += proc.cpu_percent(0)
  140. cpuinfo = proc.cpu_times()
  141. d["cpu_user"] += cpuinfo.user
  142. d["cpu_sys"] += cpuinfo.system
  143. meminfo = proc.memory_info()
  144. d["mem_phys"] += meminfo.rss
  145. d["mem_virt"] += meminfo.vms
  146. ioinfo = proc.io_counters()
  147. d["io_read"] += ioinfo.read_bytes
  148. d["io_write"] += ioinfo.write_bytes
  149. d["procs"] += 1
  150. except psutil.Error:
  151. pass
  152. return d
  153. class ProcessManager(object):
  154. """Track and manage a collection of Process objects"""
  155. def __init__(self):
  156. self.processes = {}
  157. self.tmpdirs = {}
  158. atexit.register(self._atexit)
  159. def _cleanup_tmpdir(self, pid):
  160. if pid in self.tmpdirs:
  161. try:
  162. shutil.rmtree(self.tmpdirs[pid])
  163. except OSError: # pragma: no cover
  164. pass
  165. del self.tmpdirs[pid]
  166. def _atexit(self):
  167. # Kill remaining processes, remove their dirs
  168. for pid in list(self.processes.keys()):
  169. try:
  170. self.processes[pid].terminate()
  171. del self.processes[pid]
  172. shutil.rmtree(self.tmpdirs[pid])
  173. del self.tmpdirs[pid]
  174. except Exception: # pragma: no cover
  175. pass
  176. def __iter__(self):
  177. return iter(list(self.processes.keys()))
  178. def __getitem__(self, key):
  179. return self.processes[key]
  180. def run_code(self, code, args):
  181. """Evaluate 'code' as if it were placed into a Python file and
  182. executed. The arguments, which must be strings, will be
  183. accessible in the code as sys.argv[1:]."""
  184. # The easiest way to do this, by far, is to just write the
  185. # code to a file. Make a directory to put it in.
  186. tmpdir = tempfile.mkdtemp(prefix="nilmrun-usercode-")
  187. try:
  188. # Write the code
  189. codepath = os.path.join(tmpdir, "usercode.py")
  190. with open(codepath, "w") as f:
  191. f.write(code)
  192. # Save the args too, for debugging purposes
  193. with open(os.path.join(tmpdir, "args.txt"), "w") as f:
  194. f.write(repr(args))
  195. # Run the code
  196. argv = [sys.executable, "-B", "-s", "-u", codepath] + args
  197. pid = self.run_command(argv)
  198. # Save the temp dir
  199. self.tmpdirs[pid] = tmpdir
  200. tmpdir = None # Don't need to remove it anymore
  201. return pid
  202. finally:
  203. # Clean up tempdir if we didn't finish
  204. if tmpdir is not None:
  205. try:
  206. shutil.rmtree(tmpdir)
  207. except OSError: # pragma: no cover
  208. pass
  209. def run_command(self, argv):
  210. """Execute a command line program"""
  211. new = Process(argv)
  212. self.processes[new.pid] = new
  213. return new.pid
  214. def terminate(self, pid):
  215. return self.processes[pid].terminate()
  216. def remove(self, pid):
  217. self._cleanup_tmpdir(pid)
  218. del self.processes[pid]
  219. def get_info(self):
  220. """Get info about all running PIDs"""
  221. info = {
  222. "total": Process.get_empty_info(),
  223. "pids": {},
  224. "system": {}
  225. }
  226. # Trigger CPU usage collection
  227. for pid in self:
  228. self[pid].get_info_prepare()
  229. psutil.cpu_percent(0, percpu=True)
  230. # Give it some time
  231. time.sleep(1)
  232. # Retrieve info for system
  233. info["system"]["cpu_percent"] = sum(psutil.cpu_percent(0, percpu=True))
  234. info["system"]["cpu_max"] = 100.0 * psutil.cpu_count()
  235. info["system"]["procs"] = len(psutil.pids())
  236. meminfo = psutil.virtual_memory()
  237. info["system"]["mem_total"] = meminfo.total
  238. info["system"]["mem_used"] = meminfo.used
  239. # Retrieve info for each PID
  240. for pid in self:
  241. info["pids"][pid] = self[pid].get_info()
  242. # Update totals
  243. for key in info["total"]:
  244. info["total"][key] += info["pids"][pid][key]
  245. return info