Source code for process_pool

import yaml
import traceback
import time
import tempfile
import subprocess
import signal
import psutil
import os
import misc
from logging import getLogger
import hashlib
import fcntl
import errno
import datetime
import copy
from uaperrors import UAPError
'''
This module can be used to launch child processes and wait for them.
Processes may either run on their own or pipelines can be built with them.
'''

import sys
sys.path.append('./include/steps')

logger = getLogger("uap_logger")


[docs]class TimeoutException(Exception): pass
def timeout_handler(signum, frame): time.sleep(3600) raise TimeoutException() def restore_sigpipe_handler(): # http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe.html signal.signal(signal.SIGPIPE, signal.SIG_DFL) os.setsid()
[docs]class ProcessPool(object): ''' The process pool provides an environment for launching and monitoring processes. You can launch any number of unrelated processes plus any number of pipelines in which several processes are chained together. Use it like this:: with process_pool.ProcessPool(self) as pool: # launch processes or create pipelines here When the scope opened by the *with* statement is left, all processes are launched and being watched. The process pool then waits until all processes have finished. You cannot launch a process pool within another process pool, but you can launch multiple pipeline and independent processes within a single process pool. Also, you can launch several process pools sequentially. ''' TAIL_LENGTH = 1024 ''' Size of the tail which gets recorded from both *stdout* and *stderr* streams of every process launched with this class, in bytes. ''' COPY_BLOCK_SIZE = 4194304 ''' When *stdout* or *stderr* streams should be written to output files, this is the buffer size which is used for writing. ''' SIGTERM_TIMEOUT = 10 ''' After a SIGTERM signal is issued, wait this many seconds before going postal. ''' process_watcher_pid = None current_instance = None process_pool_is_dead = False # signal names for numbers... kudos to # http://stackoverflow.com/questions/2549939/get-signal-names-from-numbers-in-python SIGNAL_NAMES = dict((getattr(signal, n), n) for n in dir( signal) if n.startswith('SIG') and '_' not in n)
[docs] class Pipeline(object): ''' This class can be used to chain multiple processes together. Use it like this:: with pool.Pipeline(pool) as pipeline: # append processes to the pipeline here ''' def __init__(self, pool): pool.launch_calls.append(self) self.append_calls = [] def __enter__(self): return self def __exit__(self, type, value, traceback): pass
[docs] def append(self, args, stdout_path=None, stderr_path=None, hints={}): ''' Append a process to the pipeline. Parameters get stored and are passed to *ProcessPool.launch()* later, so the same behaviour applies. ''' call = { 'args': copy.deepcopy(args), 'stdout_path': copy.copy(stdout_path), 'stderr_path': copy.copy(stderr_path), 'hints': copy.deepcopy(hints) } self.append_calls.append(call)
def __init__(self, run): if ProcessPool.process_pool_is_dead: raise UAPError("We have encountered an error, stopping now...") # the run for which this ProcessPool computes stuff # (for temporary paths etc.) self._run = run # log entries self.log_entries = [] # dict of PID -> path to copy process report self.copy_process_reports = {} # set of currently running PIDs # whenever a child process exits, its PID will get removed from this self.running_procs = set() # list of PIDs in the order that processes were launched # whenever a child process exits, its PID will remain in here self.proc_order = [] # we must keep Popen objects around, or their destructor will be called self.popen_procs = {} # dict of PID -> process info # whenever a child process exits, its PID will remain in here self.proc_details = {} self.process_watcher_report = dict() # list of temp paths to clean up self.temp_paths = [] # list of commands to be launched self.launch_calls = [] # List of processes we killed deliberately. Look: every time a # within a pipeline exits, we SIGTERM its predecessor. This is # necessary because otherwise, stuff is hanging forever. self.ok_to_fail = set() self.copy_processes_for_pid = dict() self.clean_up = False def clean_up_temp_paths(self): self.clean_up = True def get_run(self): return self._run def check_subprocess_command(self, command): for argument in command: if not isinstance(argument, str): raise UAPError( "The command to be launched '%s' " % command + "contains non-string argument '%s'. " % argument + "Therefore the command will fail. Please " + "fix this type issue.") return def load_unload_module(self, module_cmd): if module_cmd.__class__ == str: module_cmd = [module_cmd] for command in module_cmd: if isinstance(command, str): command = command.split() self.check_subprocess_command(command) try: proc = subprocess.Popen( command, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) except OSError as e: raise UAPError("Error while executing '%s' " "Error no.: %s Error message: %s" % (" ".join(command), e.errno, e.strerror)) (output, error) = proc.communicate() exec(output) sys.stderr.write(error.decode('utf-8')) sys.stderr.flush() return def launch_pre_post_command(self, commands): if commands.__class__ == str: commands = [commands] for command in commands: if isinstance(command, str): command = command.split() self.launch(command) def __enter__(self): if ProcessPool.current_instance is not None: raise UAPError( "Sorry, only one instance of ProcessPool allowed at " "a time.") ProcessPool.current_instance = self # First we have to add the pre_command commands for execution pre_commands = self.get_run().get_step().get_pre_commands().values() if len(pre_commands) > 0: self.launch_pre_post_command(pre_commands) return self def __exit__(self, type, value, traceback): # Lastly we have to add the post_command commands for execution post_commands = self.get_run().get_step().get_post_commands().values() if len(post_commands) > 0: self.launch_pre_post_command(post_commands) # before everything is launched load the necessary modules module_loads = set( self.get_run().get_step().get_module_loads().values()) if len(module_loads) > 0: for module_load in module_loads: self.load_unload_module(module_load) # now launch all processes... self._launch_all_processes() # ...and wait until all child processes have exited try: self._wait() except BaseException: # pass log to step even if there was a problem self.get_run().get_step().append_pipeline_log(self.get_log()) raise # if there was no exception, still pass log to step self.get_run().get_step().append_pipeline_log(self.get_log()) # after finishing gracefully unload modules module_unloads = set( self.get_run().get_step().get_module_unloads().values()) if len(module_unloads) > 0: for module_unload in module_unloads: self.load_unload_module(module_unload) # remove all temporary files or directories we know of if self.clean_up: self.get_run().remove_temporary_paths() ProcessPool.current_instance = None
[docs] def launch(self, args, stdout_path=None, stderr_path=None, hints={}): ''' Launch a process. Arguments, including the program itself, are passed in *args*. If the program is not a binary but a script which cannot be invoked directly from the command line, the first element of *args* must be a list like this: *['python', 'script.py']*. Use *stdout_path* and *stderr_path* to redirect *stdout* and *stderr* streams to files. In any case, the output of both streams gets watched, the process pool calculates SHA256 checksums automatically and also keeps the last 1024 bytes of every stream. This may be useful if a process crashes and writes error messages to *stderr* in which case you can see them even if you didn't redirect *stderr* to a log file. Hints can be specified but are not essential. They help to determine the direction of arrows for the run annotation graphs rendered by GraphViz (sometimes, it's not clear from the command line whether a certain file is an input or output file to a given process). ''' call = { 'args': copy.deepcopy(args), 'stdout_path': copy.copy(stdout_path), 'stderr_path': copy.copy(stderr_path), 'hints': copy.deepcopy(hints) } self.launch_calls.append(call)
[docs] def log(self, message): ''' Append a message to the pipeline log. ''' formatted_message = "[%s] %s" % ( datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), message) logger.info(formatted_message) self.log_entries.append(formatted_message)
[docs] def get_log(self): ''' Return the log as a dictionary. ''' log = dict() log['processes'] = [] proc_details_copy = dict() for pid in self.proc_order: if 'listener_for' in self.proc_details[pid]: attach_info = self.proc_details[pid]['listener_for'] proc_details_copy[attach_info[0]][attach_info[1] + \ '_copy'] = copy.deepcopy(self.proc_details[pid]) del proc_details_copy[attach_info[0] ][attach_info[1] + '_copy']['listener_for'] else: proc_details_copy[pid] = copy.deepcopy(self.proc_details[pid]) for pid in self.proc_order: if pid in proc_details_copy: log['processes'].append(proc_details_copy[pid]) log['log'] = copy.deepcopy(self.log_entries) log['process_watcher'] = copy.deepcopy(self.process_watcher_report) log['ok_to_fail'] = copy.deepcopy(self.ok_to_fail) return log
def _launch_all_processes(self): for info in self.launch_calls: if info.__class__ == ProcessPool.Pipeline: pipeline = info use_stdin = None last_pid = None for index, info in enumerate(pipeline.append_calls): use_stdin, pid = self._do_launch( info, index < len(pipeline.append_calls) - 1, use_stdin) if last_pid is not None: self.proc_details[pid]['use_stdin_of'] = last_pid last_pid = pid else: self._do_launch(info) def _do_launch(self, info, keep_stdout_open=False, use_stdin=None): ''' Launch a process and after that, launch a copy process for *stdout* and *stderr* each. ''' args = copy.deepcopy(info['args']) stdout_path = copy.copy(info['stdout_path']) stderr_path = copy.copy(info['stderr_path']) hints = copy.deepcopy(info['hints']) program_name = copy.deepcopy(args[0]) if program_name.__class__ == list: new_args = args[0] program_name = new_args[-1] new_args.extend(args[1:]) args = new_args self.check_subprocess_command(args) # launch the process and always pipe stdout and stderr because we # want to watch both streams, regardless of whether stdout should # be passed on to another process proc = subprocess.Popen( args, stdin=use_stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=restore_sigpipe_handler, close_fds=True ) pid = proc.pid self.popen_procs[pid] = proc self.running_procs.add(pid) self.proc_order.append(pid) name = os.path.basename(program_name) self.proc_details[pid] = { 'name': name, 'start_time': datetime.datetime.now(), 'args': args, 'pid': pid, 'hints': hints, 'stdout_path': stdout_path, 'stderr_path': stderr_path } message = "Launched %s in %s as PID %d (%s)." % \ (' '.join(args), os.getcwd(), pid, name) self.log(message) pipe = None if keep_stdout_open: pipe = os.pipe() self.copy_processes_for_pid[pid] = list() for which in ['stdout', 'stderr']: report_path = self.get_run().add_temporary_file("%s-report" % which, '.txt') sink_path = stdout_path if which == 'stdout' else stderr_path listener_pid = self._do_launch_copy_process( proc.stdout if which == 'stdout' else proc.stderr, sink_path, report_path, pid, which, pipe if which == 'stdout' else None) self.copy_processes_for_pid[pid].append(listener_pid) self.copy_process_reports[listener_pid] = report_path if sink_path is not None: self.proc_details[listener_pid]['sink'] = os.path.basename( sink_path) self.proc_details[listener_pid]['sink_full_path'] = os.path.abspath( sink_path) if keep_stdout_open: os.close(pipe[1]) return pipe[0], pid else: return None, pid def _do_launch_copy_process( self, fin, fout_path, report_path, parent_pid, which, pipe): pid = os.fork() if pid == 0: def write_report_and_exit(signum=None, frame=None): try: # write report with open(report_path, 'w') as freport: report = dict() report['sha256'] = checksum.hexdigest() report['tail'] = tail.decode('utf-8', errors='ignore') report['length'] = length report['lines'] = newline_count freport.write(yaml.dump(report)) except (IOError, LookupError) as e: logger.error("Eror while writing %s (%s): %s" % (report_path, type(e).__name__, e)) logger.debug(traceback.format_exc()) finally: os._exit(0) signal.signal(signal.SIGTERM, write_report_and_exit) signal.signal(signal.SIGINT, write_report_and_exit) signal.signal(signal.SIGPIPE, write_report_and_exit) os.setsid() if pipe is not None: os.close(pipe[0]) fdout = None if fout_path is not None: fdout = os.open(fout_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) checksum = hashlib.sha256() tail = b'' length = 0 newline_count = 0 while True: block = fin.read(ProcessPool.COPY_BLOCK_SIZE) if len(block) == 0: # fin reports EOF, let's call it a day break # update checksum checksum.update(block) # update tail if len(block) >= ProcessPool.TAIL_LENGTH: tail = block[-ProcessPool.TAIL_LENGTH:] else: keep_length = ProcessPool.TAIL_LENGTH - len(block) tail = tail[-keep_length:] + block # update length length += len(block) # update newline_count newline_count += block.count(b'\n') # write block to output file if fdout is not None: tries = 0 bytes_written = None while not bytes_written: try: bytes_written = os.write(fdout, block) except OSError: tries += 1 if tries == 5: raise time.sleep(.1) if bytes_written != len(block): os._exit(1) # write block to pipe if pipe is not None: tries = 0 bytes_written = None while not bytes_written: try: bytes_written = os.write(pipe[1], block) except OSError: tries += 1 if tries == 5: raise time.sleep(.1) if bytes_written != len(block): os._exit(2) # we're finished, close everything fin.close() if fdout is not None: os.close(fdout) if pipe is not None: os.close(pipe[1]) write_report_and_exit() else: self.running_procs.add(pid) self.proc_order.append(pid) self.proc_details[pid] = { 'name': '[stream listener for %s of PID %d]' % (which, parent_pid), 'start_time': datetime.datetime.now(), 'pid': pid, 'listener_for': [parent_pid, which], 'report_path': report_path } self.log( "Launched a copy process with PID %d to capture %s of PID %d." % (pid, which, parent_pid)) if fout_path is not None: self.log("...which gets also redirected to %s" % fout_path) return pid def _wait(self): ''' Wait for all processes to exit. ''' self.log("Now launching process watcher and waiting for all child " "processes to exit.") watcher_report_path = \ self.get_run().add_temporary_file('watcher-report', suffix='.yaml') watcher_pid = self._launch_process_watcher(watcher_report_path) ProcessPool.process_watcher_pid = watcher_pid pid = None first_failed_pid = None was_reporter = None failed_pids = set() while True: if len(self.running_procs) == 0: break try: # wait for the next child process to exit pid, exit_code_with_signal = os.wait() signal_number = exit_code_with_signal & 255 exit_code = exit_code_with_signal >> 8 name = 'unkown name' if pid in self.proc_details.keys(): name = self.proc_details[pid]['name'] logger.info("PID %s (%s), Signal: %s, Exit code: %s" % (pid, name, signal_number, exit_code)) if pid == watcher_pid: ProcessPool.process_watcher_pid = None try: with open(watcher_report_path, 'r') as f: self.process_watcher_report = yaml.load( f, Loader=yaml.FullLoader) except IOError as e: logger.warning( "Couldn't load watcher report from %s." % watcher_report_path) logger.debug("Reading the watcher failed with: %s" % e) raise # the process watcher has terminated, which is cool, I guess # (if it's the last child process, anyway) continue try: # remove pid from self.running_procs self.running_procs.remove(pid) except KeyError as e: if pid != os.getpid(): logger.debug("Caught a process which we " "didn't know: %d.\n" % pid) if pid in self.proc_details: self.proc_details[pid]['end_time'] = datetime.datetime.now() what_happened = "has exited with exit code %d" % exit_code if signal_number > 0: what_happened = "has received signal %d" % signal_number if signal_number in ProcessPool.SIGNAL_NAMES: what_happened = ( "has received %s (signal number %d)" % (ProcessPool.SIGNAL_NAMES[signal_number], signal_number)) if pid in self.proc_details: self.log( "%s (PID %d) %s." % (self.proc_details[pid]['name'], pid, what_happened)) else: self.log("PID %d %s." % (pid, what_happened)) if pid in self.proc_details: if signal_number == 0: self.proc_details[pid]['exit_code'] = exit_code else: self.proc_details[pid]['signal'] = signal_number if signal_number in ProcessPool.SIGNAL_NAMES: self.proc_details[pid]['signal_name'] = ProcessPool.SIGNAL_NAMES[signal_number] # now kill it's predecessor if 'use_stdin_of' in self.proc_details[pid]: kpid = self.proc_details[pid]['use_stdin_of'] self.log( "Now killing %d, the predecessor of %d (%s)." % (kpid, pid, self.proc_details[pid]['name'])) if kpid in self.proc_details.keys(): logger.debug( 'PID %s is "%s".' % (kpid, self.proc_details[kpid]['name'])) self.ok_to_fail.add(kpid) try: os.kill(kpid, signal.SIGPIPE) except OSError as e: if e.errno == errno.ESRCH: self.log("Couldn't kill %d: no such " "process." % kpid) pass else: raise if pid in self.copy_process_reports: if first_failed_pid is None: was_reporter = True report_path = self.copy_process_reports[pid] report = None if os.path.exists(report_path): with open(report_path, 'r') as f: report = yaml.load(f, Loader=yaml.FullLoader) if report is not None: self.proc_details[pid].update(report) elif first_failed_pid is None: was_reporter = False except TimeoutException as e: error = traceback.format_exception(*sys.exc_info())[-1] logger.error(error) self.log("Timeout, killing all child processes now.") ProcessPool.kill_all_child_processes() except OSError as e: if e.errno == errno.ECHILD: # no more children running, we are done logger.debug("ProcessPool: There are no child " "processes left, exiting.\n") signal.alarm(0) self.log("Cancelling timeout (if there was one), all " "child processes have exited.") break elif e.errno == errno.EINTR: # a system call was interrupted, pfft. pass else: raise else: if exit_code_with_signal != 0: if pid not in self.ok_to_fail: # Oops, something went wrong. See what happens and # terminate all child processes in a few seconds. if first_failed_pid is None: first_failed_pid = pid failed_pids.add(pid) signal.signal(signal.SIGALRM, timeout_handler) name = 'unkown' if pid in self.proc_details.keys(): name = self.proc_details[pid]['name'] self.log( 'Terminating all children of "%s" in %d seconds...' % (name, ProcessPool.SIGTERM_TIMEOUT)) signal.alarm(ProcessPool.SIGTERM_TIMEOUT) else: name = 'unkown name' if pid in self.proc_details.keys(): name = self.proc_details[pid]['name'] logger.debug('PID %s (%s) was expected to fail ' 'because the kill signal was send. ' 'Now killing its copy processes.' % (pid, name)) for kpid in self.copy_processes_for_pid[pid]: try: os.kill(kpid, signal.SIGTERM) except OSError as e: if e.errno == errno.ESRCH: # the copy process already exited pass else: raise else: logger.debug("Killed copy process of %d (%s): %d" % (pid, name, kpid)) # now wait for the watcher process, if it still exists try: os.waitpid(watcher_pid, 0) try: with open(watcher_report_path, 'r') as f: self.process_watcher_report = yaml.load( f, Loader=yaml.FullLoader) except IOError as e: logger.warning("Couldn't load watcher report from %s." % watcher_report_path) logger.debug("Reading the watcher failed with: %s" % e) except OSError as e: if e.errno == errno.ESRCH: pass elif e.errno == errno.ECHILD: pass else: raise logger.debug('Watcher report:\n%s' % yaml.dump(self.process_watcher_report)) if first_failed_pid: if was_reporter: log = 'Reporter crashed %s exit with code %s' % \ (first_failed_pid, exit_code_with_signal) if report is not None: log += ' while writing into "%s".' % \ self.proc_details[first_failed_pid]['report_path'] else: for pid in failed_pids: name = 'unkown name' if pid in self.proc_details.keys(): name = self.proc_details[pid]['name'] stderr_listener = self.copy_processes_for_pid[pid][1] report = self.proc_details[stderr_listener]['tail'] if report and report != '': logger.error('stderr tail of %s (%s):\n%s' % (pid, name, report)) log = "Pipeline crashed while working in %s" % \ self.get_run().get_temp_output_directory() self.log(log) raise UAPError(log) def _launch_process_watcher(self, watcher_report_path): ''' Launch the process watcher via fork. The process watcher repeatedly determines all child processes of the main process and determines their current and maximum CPU and RAM usage. Initially, this is done in short intervals (0.1 seconds), so that very short-lived processes can be watched but the frequency drops quickly so that after a while, child processes are only examined every 10 seconds. ''' super_pid = os.getpid() def human_readable_size(size, decimal_places=1): for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size < 1024.0: break size /= 1024.0 return "%%.%df %%s" % decimal_places % (size, unit) watcher_pid = os.fork() if watcher_pid == 0: os.nice(10) try: signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGINT, signal.SIG_IGN) called_cpu_stat_for_childpid = set() procs = {} names = {} for pid in self.proc_details.keys(): if 'name' in self.proc_details[pid]: name = self.proc_details[pid]['name'] names[pid] = '%d (%s)' % (pid, name) procs[super_pid] = psutil.Process(super_pid) names[super_pid] = '%d (uap)' % super_pid procs[os.getpid()] = psutil.Process(os.getpid()) names[os.getpid()] = '%d (watcher)' % os.getpid() for pid in self.running_procs: try: procs[pid] = psutil.Process(pid) except psutil.NoSuchProcess: pass pid_list = copy.deepcopy(list(procs.keys())) for pid in pid_list: proc = procs[pid] try: cpu_percent = proc.cpu_percent(interval=None) # add values for all children if pid != super_pid: for p in proc.children(recursive=True): try: cpu_percent = p.cpu_percent(interval=None) called_cpu_stat_for_childpid.add(p.pid) except psutil.NoSuchProcess: pass except psutil.NoSuchProcess: del procs[pid] time.sleep(0.1) iterations = 0 delay = 0.5 max_data = dict() first_call = None while True: pid_list = copy.deepcopy(list(procs.keys())) sum_data = dict() if first_call is None: first_call = True first_io = psutil.disk_io_counters()._asdict() first_net = psutil.net_io_counters()._asdict() elif first_call is True: first_call = False for pid in pid_list: proc = procs[pid] if pid in names.keys(): name = names[pid] else: name = pid try: data = dict() with proc.oneshot(): data['cpu_percent'] = proc.cpu_percent( interval=None) data['threads'] = len(proc.threads()) data['memory_percent'] = proc.memory_percent() memory_info = proc.memory_info() data['rss'] = memory_info.rss data['vms'] = memory_info.vms # add values for all children if pid != super_pid: for p in proc.children(recursive=True): try: with p.oneshot(): v = p.cpu_percent(interval=None) if p.pid in called_cpu_stat_for_childpid: data['cpu_percent'] += v called_cpu_stat_for_childpid.add( p.pid) data['memory_percent'] += p.memory_percent() memory_info = p.memory_info() data['rss'] += memory_info.rss data['vms'] += memory_info.vms except psutil.NoSuchProcess: pass if name not in max_data: max_data[name] = copy.deepcopy(data) for k, v in data.items(): max_data[name][k] = max(max_data[name][k], v) if len(sum_data) == 0: for k, v in data.items(): sum_data[k] = 0.0 for k, v in data.items(): sum_data[k] += v except psutil.NoSuchProcess: del procs[pid] if 'sum' not in max_data: max_data['sum'] = copy.deepcopy(sum_data) for k, v in sum_data.items(): max_data['sum'][k] = max(max_data['sum'][k], v) cpu_data = psutil.cpu_times()._asdict() total = sum(cpu_data.values()) / 100 if 'host cpu percentages' not in max_data: max_data['host cpu percentages'] = \ {k: v / total for k, v in cpu_data.items()} else: for k, v in cpu_data.items(): max_data['host cpu percentages'][k] = max( max_data['host cpu percentages'][k], v / total) if len(procs) <= 2 and not first_call: # there's nothing more to watch, write report and exit # (now there's only the controlling python process and # the process watcher itself report = dict() io_data = psutil.disk_io_counters()._asdict() report['host io stats'] = { k: io_data[k] - first_io[k] for k in io_data.keys()} net_data = psutil.net_io_counters()._asdict() report['host net stats'] = { k: net_data[k] - first_net[k] for k in net_data.keys()} new_field = dict() for field in report.values(): for key in field.keys(): if 'bytes' in key: new_field[key.replace('bytes', 'data')] = \ human_readable_size(field[key]) for field in max_data.values(): for key in field.keys(): if key in ['rss', 'vms']: new_field[key + ' unit'] = human_readable_size(field[key]) field.update(new_field) report['max'] = max_data with open(watcher_report_path, 'w') as f: f.write( yaml.dump( report, default_flow_style=False)) os._exit(0) iterations += 1 if iterations == 10: delay = 1 if iterations == 20: delay = 2 if iterations == 30: delay = 10 time.sleep(delay) except BaseException: error = traceback.format_exception(*sys.exc_info())[-1] name = 'unkown name' if pid in self.proc_details.keys(): name = self.proc_details[pid]['name'] logger.error("PID %s (%s) Process Watcher Exception: %s" % (pid, name, error)) finally: os._exit(0) else: return watcher_pid
[docs] @classmethod def kill(cls): ''' Kills all user-launched processes. After that, the remaining process will end and a report will be written. ''' ProcessPool.process_pool_is_dead = True signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(ProcessPool.SIGTERM_TIMEOUT) if ProcessPool.current_instance is not None: self = ProcessPool.current_instance for pid in self.copy_processes_for_pid.keys(): logger.debug("Killing %s.") try: os.kill(pid, signal.SIGTERM) except Exception as e: if isinstance(e, OSError) and e.errno == errno.ESRCH: logger.debug('Trying to kill already dead process %s.' % pid) return name = 'unkown name' if pid in self.proc_details.keys(): name = self.proc_details[pid]['name'] logger.error( "While trying to kill PID %s (%s) there was %s: %s" % (pid, name, type(e).__name__, e)) logger.debug(traceback.format_exc())
[docs] @classmethod def kill_all_child_processes(cls): ''' Kill all child processes of this process by sending a SIGTERM to each of them. This includes all children which were not launched by this module, and their children etc. ''' proc = psutil.Process(os.getpid()) for p in proc.children(recursive=True): try: p.terminate() except psutil.NoSuchProcess: pass