Source code for pipettor.devices

# Copyright 2006-2015 Mark Diekhans
"""
pipettor interfaces to files and pipes, as well as some other IPC stuff.
"""
import os
import errno
import threading
from pipettor.exceptions import PipettorException


# note:
# A problem with python threads and signal handling is that SIGINT (C-c) will
# not raise an exception if it's blocked in thread join.  If the thread never
# terminates, the process hangs, not responding to SIGINT.  This can happen if
# the forked process is hung.  to work around this, setting the I/O threads to
# daemon solves the problem.  It also cause the process to to wait if the main
# process exists and close hasn't been called.
#
#  http://bugs.python.org/issue21822
#  http://code.activestate.com/recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/


class Dev(object):
    """Base class for objects specifying process input or output.  They
    provide a way of hide details of setting up interprocess
    communication.

    Derived class implement the following properties, if applicable:
       read_fd - file integer descriptor for reading
       read_fh - file object for reading
       write_fd - file integer descriptor for writing
       write_fh - file object for writing"""

    def _bind_read_to_process(self, process):
        """associate read side with child process."""
        pass

    def _bind_write_to_process(self, process):
        """associate write side with child process."""
        pass

    def _bind_to_process(self, process, mode):
        """associate with a child process based on mode"""
        if mode.startswith("r"):
            self._bind_read_to_process(process)
        else:
            self._bind_write_to_process(process)

    def _post_start_parent(self):
        "called do any post-exec handling in the parent"
        pass

    def close(self):
        """close the device"""
        pass


[docs]class DataReader(Dev): """Object to asynchronously read data from process into memory via a pipe. A thread is use to prevent deadlock when both reading and writing to a child pipeline. Specifying binary access results in data of type bytes, otherwise str type is returned. The buffering, encoding, and errors arguments are as used in the open() function. """ def __init__(self, *, binary=False, buffering=-1, encoding=None, errors=None): super(DataReader, self).__init__() self.binary = binary self._process = None self._buffer = [] self._thread = None self.read_fh = self.write_fd = None read_fd, self.write_fd = os.pipe() mode = "rb" if binary else "r" self.read_fh = open(read_fd, mode, buffering=buffering, encoding=encoding, errors=errors) def __str__(self): return "[DataReader]" def _bind_write_to_process(self, process): """associate write side with child process.""" if self._process is not None: raise PipettorException("DataReader already bound to a process") self._process = process def _post_start_parent(self): "called to do any post-start handling in the parent" os.close(self.write_fd) self.write_fd = None self._thread = threading.Thread(target=self._reader) self._thread.daemon = True # see note at top of this file self._thread.start() def close(self): "close pipes and terminate thread" if self._thread is not None: self._thread.join() self._thread = None if self.read_fh is not None: self.read_fh.close() self.read_fh = None if self.write_fd is not None: os.close(self.write_fd) self.write_fd = None def _reader(self): "child read thread function" assert self.write_fd is None self._buffer.append(self.read_fh.read()) @property def data(self): "return buffered data as a string or bytes" if self.binary: return b"".join(self._buffer) else: return "".join(self._buffer)
[docs]class DataWriter(Dev): """Object to asynchronously write data to process from memory via a pipe. A thread is use to prevent deadlock when both reading and writing to a child pipeline. Text or binary output is determined by the type of data. The buffering, encoding, and errors arguments are as used in the open() function. """ def __init__(self, data, *, buffering=-1, encoding=None, errors=None): super(DataWriter, self).__init__() binary = not isinstance(data, str) self._data = data self._thread = None self._process = None self.read_fd = self.write_fh = None self.read_fd, write_fd = os.pipe() mode = "wb" if binary else "w" self.write_fh = open(write_fd, mode, buffering=buffering, encoding=encoding, errors=errors) def __str__(self): return "[DataWriter]" def _bind_read_to_process(self, process): """associate write side with child process.""" if self._process is not None: raise PipettorException("DataWriter already bound to a process") self._process = process def _post_start_parent(self): "called to do any start-exec handling in the parent" os.close(self.read_fd) self.read_fd = None self._thread = threading.Thread(target=self._writer) self._thread.daemon = True # see note at top of this file self._thread.start() def close(self): "close pipes and terminate thread" if self._thread is not None: self._thread.join() self._thread = None if self.read_fd is not None: os.close(self.read_fd) self.read_fd = None if self.write_fh is not None: self.write_fh.close() self.write_fh = None def _writer(self): "write thread function" assert self.read_fd is None try: self.write_fh.write(self._data) self.write_fh.close() self.write_fh = None except IOError as ex: # don't raise error on broken pipe if ex.errno != errno.EPIPE: raise
[docs]class File(Dev): """A file path for input or output, used for specifying stdio associated with files. Mode is invalued on of standard r, w, or a""" def __init__(self, path, mode="r"): super(File, self).__init__() self.path = path self.mode = mode # only one of the file descriptors is ever opened self.read_fd = self.write_fd = None if mode.find('r') >= 0: self.read_fd = os.open(self.path, os.O_RDONLY) elif mode.find('w') >= 0: self.write_fd = os.open(self.path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o666) elif mode.find('a') >= 0: self.write_fd = os.open(self.path, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o666) else: raise PipettorException("invalid or unsupported mode '{}' opening {}".format(mode, path)) def __str__(self): return self.path def close(self): "close file if open" if self.read_fd is not None: os.close(self.read_fd) self.read_fd = None if self.write_fd is not None: os.close(self.write_fd) self.write_fd = None def _post_start_parent(self): """post-fork child setup.""" self.close()
class _SiblingPipe(Dev): """Interprocess communication between two child process by anonymous pipes.""" def __init__(self): super(_SiblingPipe, self).__init__() self.read_fd, self.write_fd = os.pipe() def __str__(self): return "[Pipe]" def _post_start_parent(self): "called to do any post-exec handling in the parent" os.close(self.read_fd) self.read_fd = None os.close(self.write_fd) self.write_fd = None def close(self): if self.read_fd is not None: os.close(self.read_fd) self.read_fd = None if self.write_fd is not None: os.close(self.write_fd) self.write_fd = None