Spaces:
Runtime error
Runtime error
| __all__ = 'create_subprocess_exec', 'create_subprocess_shell' | |
| import subprocess | |
| from . import events | |
| from . import protocols | |
| from . import streams | |
| from . import tasks | |
| from .log import logger | |
| PIPE = subprocess.PIPE | |
| STDOUT = subprocess.STDOUT | |
| DEVNULL = subprocess.DEVNULL | |
| class SubprocessStreamProtocol(streams.FlowControlMixin, | |
| protocols.SubprocessProtocol): | |
| """Like StreamReaderProtocol, but for a subprocess.""" | |
| def __init__(self, limit, loop): | |
| super().__init__(loop=loop) | |
| self._limit = limit | |
| self.stdin = self.stdout = self.stderr = None | |
| self._transport = None | |
| self._process_exited = False | |
| self._pipe_fds = [] | |
| self._stdin_closed = self._loop.create_future() | |
| def __repr__(self): | |
| info = [self.__class__.__name__] | |
| if self.stdin is not None: | |
| info.append(f'stdin={self.stdin!r}') | |
| if self.stdout is not None: | |
| info.append(f'stdout={self.stdout!r}') | |
| if self.stderr is not None: | |
| info.append(f'stderr={self.stderr!r}') | |
| return '<{}>'.format(' '.join(info)) | |
| def connection_made(self, transport): | |
| self._transport = transport | |
| stdout_transport = transport.get_pipe_transport(1) | |
| if stdout_transport is not None: | |
| self.stdout = streams.StreamReader(limit=self._limit, | |
| loop=self._loop) | |
| self.stdout.set_transport(stdout_transport) | |
| self._pipe_fds.append(1) | |
| stderr_transport = transport.get_pipe_transport(2) | |
| if stderr_transport is not None: | |
| self.stderr = streams.StreamReader(limit=self._limit, | |
| loop=self._loop) | |
| self.stderr.set_transport(stderr_transport) | |
| self._pipe_fds.append(2) | |
| stdin_transport = transport.get_pipe_transport(0) | |
| if stdin_transport is not None: | |
| self.stdin = streams.StreamWriter(stdin_transport, | |
| protocol=self, | |
| reader=None, | |
| loop=self._loop) | |
| def pipe_data_received(self, fd, data): | |
| if fd == 1: | |
| reader = self.stdout | |
| elif fd == 2: | |
| reader = self.stderr | |
| else: | |
| reader = None | |
| if reader is not None: | |
| reader.feed_data(data) | |
| def pipe_connection_lost(self, fd, exc): | |
| if fd == 0: | |
| pipe = self.stdin | |
| if pipe is not None: | |
| pipe.close() | |
| self.connection_lost(exc) | |
| if exc is None: | |
| self._stdin_closed.set_result(None) | |
| else: | |
| self._stdin_closed.set_exception(exc) | |
| return | |
| if fd == 1: | |
| reader = self.stdout | |
| elif fd == 2: | |
| reader = self.stderr | |
| else: | |
| reader = None | |
| if reader is not None: | |
| if exc is None: | |
| reader.feed_eof() | |
| else: | |
| reader.set_exception(exc) | |
| if fd in self._pipe_fds: | |
| self._pipe_fds.remove(fd) | |
| self._maybe_close_transport() | |
| def process_exited(self): | |
| self._process_exited = True | |
| self._maybe_close_transport() | |
| def _maybe_close_transport(self): | |
| if len(self._pipe_fds) == 0 and self._process_exited: | |
| self._transport.close() | |
| self._transport = None | |
| def _get_close_waiter(self, stream): | |
| if stream is self.stdin: | |
| return self._stdin_closed | |
| class Process: | |
| def __init__(self, transport, protocol, loop): | |
| self._transport = transport | |
| self._protocol = protocol | |
| self._loop = loop | |
| self.stdin = protocol.stdin | |
| self.stdout = protocol.stdout | |
| self.stderr = protocol.stderr | |
| self.pid = transport.get_pid() | |
| def __repr__(self): | |
| return f'<{self.__class__.__name__} {self.pid}>' | |
| def returncode(self): | |
| return self._transport.get_returncode() | |
| async def wait(self): | |
| """Wait until the process exit and return the process return code.""" | |
| return await self._transport._wait() | |
| def send_signal(self, signal): | |
| self._transport.send_signal(signal) | |
| def terminate(self): | |
| self._transport.terminate() | |
| def kill(self): | |
| self._transport.kill() | |
| async def _feed_stdin(self, input): | |
| debug = self._loop.get_debug() | |
| self.stdin.write(input) | |
| if debug: | |
| logger.debug( | |
| '%r communicate: feed stdin (%s bytes)', self, len(input)) | |
| try: | |
| await self.stdin.drain() | |
| except (BrokenPipeError, ConnectionResetError) as exc: | |
| # communicate() ignores BrokenPipeError and ConnectionResetError | |
| if debug: | |
| logger.debug('%r communicate: stdin got %r', self, exc) | |
| if debug: | |
| logger.debug('%r communicate: close stdin', self) | |
| self.stdin.close() | |
| async def _noop(self): | |
| return None | |
| async def _read_stream(self, fd): | |
| transport = self._transport.get_pipe_transport(fd) | |
| if fd == 2: | |
| stream = self.stderr | |
| else: | |
| assert fd == 1 | |
| stream = self.stdout | |
| if self._loop.get_debug(): | |
| name = 'stdout' if fd == 1 else 'stderr' | |
| logger.debug('%r communicate: read %s', self, name) | |
| output = await stream.read() | |
| if self._loop.get_debug(): | |
| name = 'stdout' if fd == 1 else 'stderr' | |
| logger.debug('%r communicate: close %s', self, name) | |
| transport.close() | |
| return output | |
| async def communicate(self, input=None): | |
| if input is not None: | |
| stdin = self._feed_stdin(input) | |
| else: | |
| stdin = self._noop() | |
| if self.stdout is not None: | |
| stdout = self._read_stream(1) | |
| else: | |
| stdout = self._noop() | |
| if self.stderr is not None: | |
| stderr = self._read_stream(2) | |
| else: | |
| stderr = self._noop() | |
| stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr) | |
| await self.wait() | |
| return (stdout, stderr) | |
| async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, | |
| limit=streams._DEFAULT_LIMIT, **kwds): | |
| loop = events.get_running_loop() | |
| protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, | |
| loop=loop) | |
| transport, protocol = await loop.subprocess_shell( | |
| protocol_factory, | |
| cmd, stdin=stdin, stdout=stdout, | |
| stderr=stderr, **kwds) | |
| return Process(transport, protocol, loop) | |
| async def create_subprocess_exec(program, *args, stdin=None, stdout=None, | |
| stderr=None, limit=streams._DEFAULT_LIMIT, | |
| **kwds): | |
| loop = events.get_running_loop() | |
| protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, | |
| loop=loop) | |
| transport, protocol = await loop.subprocess_exec( | |
| protocol_factory, | |
| program, *args, | |
| stdin=stdin, stdout=stdout, | |
| stderr=stderr, **kwds) | |
| return Process(transport, protocol, loop) | |