Source code for pipettor.processes

# Copyright 2006-2015 Mark Diekhans
"""
Robust, easy to use Unix process pipelines.
"""
import os
import signal
import shlex
import logging
import subprocess
import enum
from io import UnsupportedOperation
from threading import RLock
from pipettor.devices import Dev
from pipettor.devices import DataReader
from pipettor.devices import _SiblingPipe
from pipettor.devices import File
from pipettor.exceptions import PipettorException
from pipettor.exceptions import ProcessException
from pipettor.exceptions import _warn_error_during_error_handling

_defaultLogger = None
_defaultLogLevel = logging.DEBUG

[docs]def setDefaultLogger(logger): """Set the default pipettor logger used in logging command and errors. If None, there is no default logging. The logger can be the name of a logger or the logger itself. Standard value is None""" global _defaultLogger _defaultLogger = logging.getLogger(logger) if isinstance(logger, str) else logger
[docs]def getDefaultLogger(): """return the current value of the pipettor default logger""" return _defaultLogger
def setDefaultLogLevel(level): """Set the default pipettor log level to use in logging command and errors. Standard value is logging.DEBUG""" global _defaultLogLevel _defaultLogLevel = level def getDefaultLogLevel(): """Get the default pipettor log level to use in logging command and errors.""" return _defaultLogLevel def setDefaultLogging(logger, level): """Set both default logger and level. Either can be None to leave as default""" if logger is not None: setDefaultLogger(logger) if level is not None: setDefaultLogLevel(level) def _getLoggerToUse(logger): """if logger is None, get default, otherwise if it's a string, look it up, otherwise it's the logger object.""" if logger is None: return _defaultLogger elif isinstance(logger, str): return logging.getLogger(logger) else: return logger def _getLogLevelToUse(logLevel): "get log level to use, either what is specified or default" return logLevel if logLevel is not None else getDefaultLogLevel() class State(enum.IntEnum): """Current state of a process""" PREINIT = 0 STARTUP = 1 RUNNING = 2 FINISHED = 4 class Process(object): """A process, represented as a node a pipeline, connected by Dev objects. Process arguments can be can be any object, with str() being called on the object before exec. If the stdin/out/err arguments can have the following values: - None - stdio file descriptior is inherited. - str - opened as a file - int - file number - file-like object - fileno() is dupped - a Dev derived object If stderr is an instance of DataReader, then stderr is included in ProcessException on process error. If the class DataReader is passed in as stderr, a DataReader object is created. start() must be called to run process """ def __init__(self, cmd, stdin=None, stdout=None, stderr=None): self.lock = RLock() self.cmd = tuple(cmd) # stdio and argument Dev association self.stdin = self._stdio_assoc(stdin, "r") self.stdout = self._stdio_assoc(stdout, "w") if stderr == DataReader: stderr = DataReader(errors='backslashreplace') self.stderr = self._stdio_assoc(stderr, "w") self.popen = None self.pid = None self.returncode = None # exit code, or -signal # FIXME: should this just be exception for the users?? self.procExcept = None # exception because of failed process self.state = State.PREINIT self.forced = False # force termination during error cleanup def __str__(self): "get simple description of process" return " ".join([shlex.quote(str(arg)) for arg in self.cmd]) def _stdio_assoc(self, spec, mode): """pre-fork check a stdio spec validity and associate Dev or file number. mode is mode in child""" if (spec is None) or isinstance(spec, int): return spec # passed unchanged elif isinstance(spec, Dev): spec._bind_to_process(self, mode) return spec # passed unchanged elif callable(getattr(spec, "fileno", None)): return spec.fileno() # is file-like elif isinstance(spec, str): return File(spec, mode) else: raise PipettorException("invalid stdio specification object type: {} {}".format(type(spec), spec)) def _get_child_stdio(self, spec, stdfd): """get fd to pass to child as one of the stdio handles.""" if spec is None: return stdfd elif isinstance(spec, int): return spec elif isinstance(spec, Dev): return spec.read_fd if stdfd == 0 else spec.write_fd else: # this should have been detected earlier raise PipettorException("_get_child_stdio logic error: {} {}".format(type(spec), stdfd)) def _start_process(self): """Do work of starting the process""" self.state = State.STARTUP # do first to prevent restarts on error try: self.popen = subprocess.Popen(self.cmd, stdin=self._get_child_stdio(self.stdin, 0), stdout=self._get_child_stdio(self.stdout, 1), stderr=self._get_child_stdio(self.stderr, 2)) except Exception as ex: raise ProcessException(str(self)) from ex self.pid = self.popen.pid self.state = State.RUNNING def _start(self): """Start the process,""" try: self._start_process() except BaseException as ex: self.procExcept = ex if self.procExcept is not None: raise self.procExcept @property def running(self): "determined if this process has been running" return self.state is State.RUNNING @property def finished(self): "determined if been detected as finished (waited on)" return self.state is State.FINISHED def _parent_stdio_exit_close(self): "close devices on edit" # MUST do before reading stderr in _handle_error_exit for std in (self.stdin, self.stdout, self.stderr): if isinstance(std, Dev): std.close() def _handle_error_exit(self): # get saved stderr, if possible stderr = None if isinstance(self.stderr, DataReader): stderr = self.stderr.data # don't save exception if we force it to be killed if not self.forced: self.procExcept = ProcessException(str(self), self.returncode, stderr) def _handle_exit(self, waitStat): """Handle process exiting, saving status """ self.state = State.FINISHED assert os.WIFEXITED(waitStat) or os.WIFSIGNALED(waitStat) self.returncode = os.WEXITSTATUS(waitStat) if os.WIFEXITED(waitStat) else -os.WTERMSIG(waitStat) # must tell subprocess.Popen about this self.popen.returncode = self.returncode self._parent_stdio_exit_close() # MUST DO BEFORE _handle_error_exit if not ((self.returncode == 0) or (self.returncode == -signal.SIGPIPE)): self._handle_error_exit() def _waitpid(self, flag=0): "Do waitpid and handle exit if finished, return True if finished" if self.pid is None: raise PipettorException("process has not been started") w = os.waitpid(self.pid, flag) if w[0] != 0: self._handle_exit(w[1]) return (w[0] != 0) def poll(self): """Check if the process has completed. Return True if it has, False if it hasn't.""" with self.lock: if self.state is State.FINISHED: return True return self._waitpid(os.WNOHANG) def _force_finish(self): """Force termination of process. The forced flag is set, as an indication that this was not a primary failure in the pipeline. """ with self.lock: # check if finished before killing if (self.state is State.RUNNING) and (not self.poll()): self.forced = True os.kill(self.pid, signal.SIGKILL) self._waitpid() def failed(self): "check if process failed, call after poll() or wait()" return self.procExcept is not None
[docs]class Pipeline(object): """ A process pipeline. Once constructed, the pipeline is started with start(), poll(), or wait() functions. The cmds argument is either a list of arguments for a single process, or a list of such lists for a pipeline. If the stdin/out/err arguments are none, the open files are are inherited. Otherwise they can be string file names, file-like objects, file number, or Dev object. Stdin is input to the first process, stdout is output to the last process and stderr is attached to all processed. DataReader and DataWriter objects can be specified for stdin/out/err asynchronously I/O with the pipeline without the danger of deadlock. If stderr is the class DataReader, a new instance is created for each process in the pipeline. The contents of stderr will include an exception if an occurs in that process. If an instance of DataReader is provided, the contents of stderr from all process will be included in the exception. Command arguments will be converted to strings. The logger argument can be the name of a logger or a logger object. If none, default is user. """ def __init__(self, cmds, *, stdin=None, stdout=None, stderr=DataReader, logger=None, logLevel=None): self.lock = RLock() self.stdin = stdin self.stdout = stdout self.stderr = stderr self.procs = [] self.devs = set() self.bypid = dict() # indexed by pid self.state = State.PREINIT self.logger = _getLoggerToUse(logger) self.logLevel = _getLogLevelToUse(logLevel) if isinstance(cmds[0], str): cmds = [cmds] # one-process pipeline cmds = self._stringify(cmds) try: self._setup_processes(cmds) except BaseException: self._error_cleanup() raise @staticmethod def _stringify(cmds): ncmds = [] for cmd in cmds: ncmds.append([str(a) for a in cmd]) return ncmds @property def running(self): "determined if this process has been running" return self.state is State.RUNNING @property def finished(self): "determined if been detected as finished (waited on)" return self.state is State.FINISHED def _log(self, level, message, ex=None): """If logging is available and enabled, log message and optional exception""" if (self.logger is not None) and (self.logger.isEnabledFor(level)): kwargs = {} if ex is not None: kwargs["exc_info"] = ex self.logger.log(level, "{}: {}".format(message, str(self)), **kwargs) def _setup_processes(self, cmds): prevPipe = None lastCmdIdx = len(cmds) - 1 for i in range(len(cmds)): prevPipe = self._add_process(cmds[i], prevPipe, (i == lastCmdIdx), self.stdin, self.stdout, self.stderr) def _add_process(self, cmd, prevPipe, isLastCmd, stdinFirst, stdoutLast, stderr): """add one process to the pipeline, return the output pipe if not the last process""" if prevPipe is None: stdin = stdinFirst # first process in pipeline else: stdin = prevPipe if isLastCmd: outPipe = None stdout = stdoutLast # last process in pipeline else: outPipe = stdout = _SiblingPipe() try: self._create_process(cmd, stdin, stdout, stderr) except BaseException: if outPipe is not None: outPipe.close() raise return outPipe def _create_process(self, cmd, stdin, stdout, stderr): """create process and track Dev objects""" proc = Process(cmd, stdin, stdout, stderr) self.procs.append(proc) # Proc maybe have wrapped a Dev for std in (proc.stdin, proc.stdout, proc.stderr): if isinstance(std, Dev): self.devs.add(std) def __str__(self): """get a string describing the pipe""" desc = str(self.procs[0]) if self.stdin not in (None, 0): desc += " <" + str(self.stdin) if len(self.procs) > 1: desc += " | " + " | ".join([str(proc) for proc in self.procs[1:]]) if self.stdout not in (None, 1): desc += " >" + str(self.stdout) if self.stderr == DataReader: desc += " 2>[DataReader]" # instance made in Process elif self.stderr not in (None, 2): desc += " 2>" + str(self.stderr) return desc def _start_process(self, proc): proc._start() self.bypid[proc.pid] = proc def _start_processes(self): for proc in self.procs: self._start_process(proc) def _post_start_parent(self): for d in self.devs: d._post_start_parent() def _finish(self): "finish up when no errors have occurred" assert not self.failed() self.state = State.FINISHED for d in self.devs: d.close() self._log(self.logLevel, "success") def _log_failure(self, ex): self._log(logging.ERROR, "failure", ex) def _error_cleanup_dev(self, dev): try: dev.close() except Exception as ex: _warn_error_during_error_handling("error during device cleanup on error", ex) def _error_cleanup_process(self, proc): try: if not proc.finished: proc._force_finish() except Exception as ex: _warn_error_during_error_handling("error during process cleanup on error", ex) def _error_cleanup(self): """forced cleanup of child processed after failure""" self.state = State.FINISHED for d in self.devs: self._error_cleanup_dev(d) for p in self.procs: self._error_cleanup_process(p) def _start_guts(self): self._log(self.logLevel, "start") self.state = State.RUNNING # clean up devices and process if there is a failure try: self._start_processes() self._post_start_parent() except Exception as ex: self._log_failure(ex) self._error_cleanup() raise def start(self): """start processes""" if self.state >= State.STARTUP: raise PipettorException("Pipeline is already been started") with self.lock: self._start_guts() def _raise_if_failed(self): """raise exception if any process has one, otherwise do nothing""" try: for p in self.procs: if p.procExcept is not None: raise p.procExcept except Exception as ex: self._log_failure(ex) raise ex def _poll_guts(self): for p in self.procs: if not p.poll(): return False self._finish() return True def poll(self): """Check if all of the processes have completed. Return True if it has, False if it hasn't.""" with self.lock: if self.state is State.PREINIT: self.start() try: return self._poll_guts() except BaseException: self._error_cleanup() raise def _wait_on_one(self, proc): "wait on the next process in group to complete" w = os.waitpid(proc.pid, 0) self.bypid[w[0]]._handle_exit(w[1]) def _wait_guts(self): if self.state < State.RUNNING: self.start() try: for p in self.procs: if not p.finished: self._wait_on_one(p) except BaseException as ex: self._log_failure(ex) self._error_cleanup() raise ex self._raise_if_failed() def wait(self): """Wait for all of the process to complete. Generate an exception if any exits non-zero or signals. Starts process if not already running.""" with self.lock: self._wait_guts() self._finish() def _shutdown(self): "guts of shutdown" self.kill(sig=signal.SIGKILL) try: self.wait() except PipettorException: pass # ignore errors we report def shutdown(self): """Close down the pipeline prematurely. If the pipeline is running, it's killed. This does not report errors from child process and differs from wait in the fact that it doesn't start the pipeline if it has not been started, just frees up open pipes. Primary intended for error recovery""" with self.lock: if self.logger is not None: self.logger.log(self.logLevel, "Shutting down pipeline: {}".format(str(self))) if self.state is State.RUNNING: self._shutdown() elif self.state is not State.FINISHED: self._finish() # just clean up pipes def failed(self): "check if any process failed, call after poll() or wait()" with self.lock: for p in self.procs: if p.failed(): return True return False def kill(self, sig=signal.SIGTERM): "send a signal to all of the processes in the pipeline" for p in self.procs: os.kill(p.pid, sig)
[docs]class Popen(Pipeline): """File-like object of processes to read from or write to a Pipeline. The cmds argument is either a list of arguments for a single process, or a list of such lists for a pipeline. Mode is 'r' for a pipeline who's output will be read, or 'w' for a pipeline to that is to have data written to it. If stdin or stdout is specified, and is a string, it is a file to open as other file at the other end of the pipeline. If it's not a string, it is assumed to be a file object to use for input or output. For a read pipe, only stdin can be specified, for a write pipe, only stdout can be used. read pipeline ('r'): stdin --> cmd[0] --> ... --> cmd[n] --> Popen write pipeline ('w') Popen --> cmd[0] --> ... --> cmd[n] --> stdout Command arguments will be converted to strings. The logger argument can be the name of a logger or a logger object. If none, default is user. Specifying binary access results in data of type bytes, otherwise str type is returned. The buffering, encoding, and errors arguments are as used in the open() function. """ # note: this follows I/O _pyio.py structure, but doesn't extend class # due to it doing both binary and text I/O. Probably could do this # with some kind of dynamic base class setting. def __init__(self, cmds, mode='r', *, stdin=None, stdout=None, logger=None, logLevel=None, buffering=-1, encoding=None, errors=None): self.mode = mode self._pipeline_fh = None self._child_fd = None if mode.find('a') >= 0: raise PipettorException("can not specify append mode") if mode.find('r') >= 0: if stdout is not None: raise PipettorException("can not specify stdout with read mode") else: if stdin is not None: raise PipettorException("can not specify stdin with write mode") pipe_read_fd, pipe_write_fd = os.pipe() if mode.find('r') >= 0: firstIn = stdin lastOut = pipe_write_fd self._child_fd = pipe_write_fd self._pipeline_fh = open(pipe_read_fd, mode, buffering=buffering, encoding=encoding, errors=errors) else: firstIn = pipe_read_fd lastOut = stdout self._child_fd = pipe_read_fd self._pipeline_fh = open(pipe_write_fd, mode, buffering=buffering, encoding=encoding, errors=errors) super(Popen, self).__init__(cmds, stdin=firstIn, stdout=lastOut, logger=logger, logLevel=logLevel) self.start() os.close(self._child_fd) self._child_fd = None ### Internal ### def _close(self): if self._pipeline_fh is not None: self._pipeline_fh.close() self._pipeline_fh = None if self._child_fd is not None: os.close(self._child_fd) self._child_fd = None def _unsupported(self, name): """from _pyio.py: raise an OSError exception for unsupported operations.""" raise UnsupportedOperation("%s.%s() not supported" % (self.__class__.__name__, name)) def _checkClosed(self, msg=None): """Internal: raise a ValueError if file is closed """ if self.closed: raise ValueError("I/O operation on closed file." if msg is None else msg) ### Positioning ### def seek(self, pos, whence=0): """Changing stream position not supported""" self._unsupported("seek") def tell(self): """Return an int indicating the current stream position.""" return self._pipeline_fh.tell() def truncate(self, pos=None): """Truncate file unsupported""" self._unsupported("truncate") ### Flush and close ### def flush(self): "Flush the internal I/O buffer." self._pipeline_fh.flush() def close(self): "wait for process to complete, with an error if it exited non-zero" with self.lock: self._close() if self.state is not State.FINISHED: self.wait() def __del__(self): """Destructor. Calls close().""" if self._pipeline_fh is not None: self.close() ### Inquiries ### def seekable(self): """Not seekable""" return False def readable(self): """Return a bool indicating whether object was opened for reading.""" return self._pipeline_fh.readable() def writable(self): """Return a bool indicating whether object was opened for writing.""" return self._pipeline_fh.writable() @property def closed(self): """closed: bool. True if the file has been closed.""" if self._pipeline_fh is None: return True else: return self._pipeline_fh.closed ### Context manager ### def __enter__(self): "support for with statement" self._checkClosed() return self def __exit__(self, type, value, traceback): "support for with statement" self.close() ### Lower-level APIs ### def fileno(self): "get the integer OS-dependent file handle" return self._pipeline_fh.fileno() def isatty(self): """Return a bool indicating whether this is an 'interactive' stream. """ self._checkClosed() return False ### read, write and readline[s] and writelines ### def read(self, size=-1): return self._pipeline_fh.read(size) def readline(self, size=-1): return self._pipeline_fh.readline(size) def readlines(self, size=-1): return self._pipeline_fh.readlines(size) def __iter__(self): "iter over contents of file" return self._pipeline_fh.__iter__() def __next__(self): return next(self._pipeline_fh) def write(self, str): "Write string str to file." self._pipeline_fh.write(str) def writelines(self, lines): """Write a list of lines to the stream. Line separators are not added, so it is usual for each of the lines provided to have a line separator at the end. """ self._pipeline_fh.writelines(lines) ### not part of file-like ### def wait(self): """wait to for processes to complete, generate an exception if one exits no-zero""" with self.lock: self._close() super(Popen, self).wait() def poll(self): "poll is not allowed for Pipeline objects" # FIXME: don't know what to do about our open pipe keeping process from # exiting so we can get a status, so disallow it. Not sure how to # address this. Can probably address this with select on pipe. self._unsupported("poll")