# Copyright 2006-2025 Mark Diekhans
"""
pipettor interfaces to files and pipes, as well as some other IPC stuff.
"""
import os
import errno
import threading
from pipettor.exceptions import PipettorException
class Dev:
"""Base class for objects specifying process input or output. They
provide a way of hide details of setting up interprocess
communication.
"""
def get_child_write_fd(self, process):
"""get write-to fileno for specified process associated with this device"""
raise NotImplementedError('get_child_write_fd')
def get_child_read_fd(self, process):
"""get read-fromo fileno for specified process associated with this device"""
raise NotImplementedError('get_child_read_fd')
def _bind_read_to_process(self, process):
"""associate read side with child process."""
pass
def _bind_write_to_process(self, process):
"""associate write side with child process."""
pass
def _bind_to_process(self, process, mode):
"""associate with a child process based on mode"""
if mode.startswith("r"):
self._bind_read_to_process(process)
else:
self._bind_write_to_process(process)
def _post_start_parent(self):
"called do any post-exec handling in the parent"
pass
def close(self):
"""close the device"""
pass
class _ReaderThread:
"""thread and pipe associated with DataReader.
this is a separate class to allow for multiple
process that write to stderr"""
def __init__(self, process, readfn, binary, buffering, encoding, errors, newline):
self.process = process
self._readfn = readfn
self._thread = None
self.read_fh = self.write_fd = None
read_fd, self.write_fd = os.pipe()
mode = "rb" if binary else "r"
self.read_fh = open(read_fd, mode, buffering=buffering,
encoding=encoding, errors=errors, newline=newline)
def post_start_parent(self):
"called to do any post-start handling in the parent"
os.close(self.write_fd)
self.write_fd = None
self._thread = threading.Thread(target=self._reader, daemon=True)
self._thread.start()
def close(self):
"close pipes and terminate thread"
if self._thread is not None:
self._thread.join()
self._thread = None
if self.read_fh is not None:
self.read_fh.close()
self.read_fh = None
if self.write_fd is not None:
os.close(self.write_fd)
self.write_fd = None
def _reader(self):
"child read thread function"
assert self.write_fd is None
self._readfn(self.read_fh.read())
[docs]
class DataReader(Dev):
"""Object to asynchronously read data from process into memory via a pipe. A
thread is use to prevent deadlock when both reading and writing to a child
pipeline.
Specifying binary access results in data of type bytes, otherwise str type
is returned. The buffering, encoding, and errors arguments are as used in
the open() function.
A reader maybe read from multiple process.
"""
def __init__(self, *, binary=False, buffering=-1, encoding=None, errors=None, newline=None):
super().__init__()
self.binary = binary
self._threads = []
self._buffer = []
self._lock = threading.Lock()
self.binary = binary
self.buffering = buffering
self.encoding = encoding
self.errors = errors
self.newline = newline
def __str__(self):
return "[DataReader]"
def _bind_read_to_process(self, process):
"""associate read side with child process."""
raise NotImplementedError('DataReader._bind_read_to_process')
def _bind_write_to_process(self, process):
"""associate write side with child process."""
thread = _ReaderThread(process, self._readfn, self.binary, self.buffering,
self.encoding, self.errors, self.newline)
self._threads.append(thread)
def _post_start_parent(self):
for thread in self._threads:
thread.post_start_parent()
[docs]
def close(self):
"close pipes and terminate thread"
for thread in self._threads:
thread.close()
def _readfn(self, data):
"store to buffer"
with self._lock:
self._buffer.append(data)
def get_child_write_fd(self, process):
for thread in self._threads:
if process is thread.process:
return thread.write_fd
raise ValueError("process not associated with this device")
@property
def data(self):
"return buffered data as a string or bytes"
if self.binary:
return b"".join(self._buffer)
else:
return "".join(self._buffer)
[docs]
class DataWriter(Dev):
"""Object to asynchronously write data to process from memory via a pipe. A
thread is use to prevent deadlock when both reading and writing to a child
pipeline. Text or binary output is determined by the type of data.
The buffering, encoding, and errors arguments are as used in
the open() function.
"""
def __init__(self, data, *, buffering=-1, encoding=None, errors=None, newline=None):
super().__init__()
binary = not isinstance(data, str)
self._data = data
self._thread = None
self._process = None
self._read_fd = self._write_fh = None
self._read_fd, write_fd = os.pipe()
mode = "wb" if binary else "w"
self._write_fh = open(write_fd, mode, buffering=buffering,
encoding=encoding, errors=errors, newline=newline)
def __str__(self):
return "[DataWriter]"
def _bind_read_to_process(self, process):
"""associate write side with child process."""
if self._process is not None:
raise PipettorException("DataWriter already bound to a process")
self._process = process
def _bind_write_to_process(self, process):
"""associate write side with child process."""
raise NotImplementedError('_bind_write_to_process')
def _post_start_parent(self):
"called to do any start-exec handling in the parent"
os.close(self._read_fd)
self._read_fd = None
self._thread = threading.Thread(target=self._writer, daemon=True)
self._thread.start()
def get_child_read_fd(self, process):
assert process is self._process
return self._read_fd
[docs]
def close(self):
"close pipes and terminate thread"
if self._thread is not None:
self._thread.join()
self._thread = None
if self._read_fd is not None:
os.close(self._read_fd)
self._read_fd = None
if self._write_fh is not None:
self._write_fh.close()
self._write_fh = None
def _writer(self):
"write thread function"
assert self._read_fd is None
try:
self._write_fh.write(self._data)
self._write_fh.close()
self._write_fh = None
except IOError as ex:
# don't raise error on broken pipe
if ex.errno != errno.EPIPE:
raise
[docs]
class File(Dev):
"""A file path for input or output, used for specifying stdio associated
with files. Mode is one of `r`, `w`, or `a`"""
def __init__(self, path, mode="r"):
super().__init__()
self.path = path
self.mode = mode
# only one of the file descriptors is ever opened
self._read_fd = self._write_fd = None
if mode.find('r') >= 0:
self._read_fd = os.open(self.path, os.O_RDONLY)
elif mode.find('w') >= 0:
self._write_fd = os.open(self.path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o666)
elif mode.find('a') >= 0:
self._write_fd = os.open(self.path, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o666)
else:
raise PipettorException("invalid or unsupported mode '{}' opening {}".format(mode, path))
def __str__(self):
return self.path
def get_child_write_fd(self, process):
assert self._write_fd is not None
return self._write_fd
def get_child_read_fd(self, process):
assert self._read_fd is not None
return self._read_fd
def _post_start_parent(self):
"""post-fork child setup."""
self.close()
[docs]
def close(self):
"close file if open"
if self._read_fd is not None:
os.close(self._read_fd)
self._read_fd = None
if self._write_fd is not None:
os.close(self._write_fd)
self._write_fd = None
class _SiblingPipe(Dev):
"""Interprocess communication between two child process by anonymous
pipes."""
def __init__(self):
super().__init__()
self._read_fd, self._write_fd = os.pipe()
def __str__(self):
return "[Pipe]"
def get_child_write_fd(self, process):
return self._write_fd
def get_child_read_fd(self, process):
return self._read_fd
def _post_start_parent(self):
"called to do any post-exec handling in the parent"
os.close(self._read_fd)
self._read_fd = None
os.close(self._write_fd)
self._write_fd = None
def close(self):
if self._read_fd is not None:
os.close(self._read_fd)
self._read_fd = None
if self._write_fd is not None:
os.close(self._write_fd)
self._write_fd = None