Spaces:
Running
Running
from __future__ import annotations | |
import collections.abc as cabc | |
import contextlib | |
import io | |
import os | |
import shlex | |
import shutil | |
import sys | |
import tempfile | |
import typing as t | |
from types import TracebackType | |
from . import _compat | |
from . import formatting | |
from . import termui | |
from . import utils | |
from ._compat import _find_binary_reader | |
if t.TYPE_CHECKING: | |
from _typeshed import ReadableBuffer | |
from .core import Command | |
class EchoingStdin: | |
def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None: | |
self._input = input | |
self._output = output | |
self._paused = False | |
def __getattr__(self, x: str) -> t.Any: | |
return getattr(self._input, x) | |
def _echo(self, rv: bytes) -> bytes: | |
if not self._paused: | |
self._output.write(rv) | |
return rv | |
def read(self, n: int = -1) -> bytes: | |
return self._echo(self._input.read(n)) | |
def read1(self, n: int = -1) -> bytes: | |
return self._echo(self._input.read1(n)) # type: ignore | |
def readline(self, n: int = -1) -> bytes: | |
return self._echo(self._input.readline(n)) | |
def readlines(self) -> list[bytes]: | |
return [self._echo(x) for x in self._input.readlines()] | |
def __iter__(self) -> cabc.Iterator[bytes]: | |
return iter(self._echo(x) for x in self._input) | |
def __repr__(self) -> str: | |
return repr(self._input) | |
def _pause_echo(stream: EchoingStdin | None) -> cabc.Iterator[None]: | |
if stream is None: | |
yield | |
else: | |
stream._paused = True | |
yield | |
stream._paused = False | |
class BytesIOCopy(io.BytesIO): | |
"""Patch ``io.BytesIO`` to let the written stream be copied to another. | |
.. versionadded:: 8.2 | |
""" | |
def __init__(self, copy_to: io.BytesIO) -> None: | |
super().__init__() | |
self.copy_to = copy_to | |
def flush(self) -> None: | |
super().flush() | |
self.copy_to.flush() | |
def write(self, b: ReadableBuffer) -> int: | |
self.copy_to.write(b) | |
return super().write(b) | |
class StreamMixer: | |
"""Mixes `<stdout>` and `<stderr>` streams. | |
The result is available in the ``output`` attribute. | |
.. versionadded:: 8.2 | |
""" | |
def __init__(self) -> None: | |
self.output: io.BytesIO = io.BytesIO() | |
self.stdout: io.BytesIO = BytesIOCopy(copy_to=self.output) | |
self.stderr: io.BytesIO = BytesIOCopy(copy_to=self.output) | |
class _NamedTextIOWrapper(io.TextIOWrapper): | |
def __init__( | |
self, buffer: t.BinaryIO, name: str, mode: str, **kwargs: t.Any | |
) -> None: | |
super().__init__(buffer, **kwargs) | |
self._name = name | |
self._mode = mode | |
def name(self) -> str: | |
return self._name | |
def mode(self) -> str: | |
return self._mode | |
def __next__(self) -> str: # type: ignore | |
try: | |
line = super().__next__() | |
except StopIteration as e: | |
raise EOFError() from e | |
return line | |
def make_input_stream( | |
input: str | bytes | t.IO[t.Any] | None, charset: str | |
) -> t.BinaryIO: | |
# Is already an input stream. | |
if hasattr(input, "read"): | |
rv = _find_binary_reader(t.cast("t.IO[t.Any]", input)) | |
if rv is not None: | |
return rv | |
raise TypeError("Could not find binary reader for input stream.") | |
if input is None: | |
input = b"" | |
elif isinstance(input, str): | |
input = input.encode(charset) | |
return io.BytesIO(input) | |
class Result: | |
"""Holds the captured result of an invoked CLI script. | |
:param runner: The runner that created the result | |
:param stdout_bytes: The standard output as bytes. | |
:param stderr_bytes: The standard error as bytes. | |
:param output_bytes: A mix of ``stdout_bytes`` and ``stderr_bytes``, as the | |
user would see it in its terminal. | |
:param return_value: The value returned from the invoked command. | |
:param exit_code: The exit code as integer. | |
:param exception: The exception that happened if one did. | |
:param exc_info: Exception information (exception type, exception instance, | |
traceback type). | |
.. versionchanged:: 8.2 | |
``stderr_bytes`` no longer optional, ``output_bytes`` introduced and | |
``mix_stderr`` has been removed. | |
.. versionadded:: 8.0 | |
Added ``return_value``. | |
""" | |
def __init__( | |
self, | |
runner: CliRunner, | |
stdout_bytes: bytes, | |
stderr_bytes: bytes, | |
output_bytes: bytes, | |
return_value: t.Any, | |
exit_code: int, | |
exception: BaseException | None, | |
exc_info: tuple[type[BaseException], BaseException, TracebackType] | |
| None = None, | |
): | |
self.runner = runner | |
self.stdout_bytes = stdout_bytes | |
self.stderr_bytes = stderr_bytes | |
self.output_bytes = output_bytes | |
self.return_value = return_value | |
self.exit_code = exit_code | |
self.exception = exception | |
self.exc_info = exc_info | |
def output(self) -> str: | |
"""The terminal output as unicode string, as the user would see it. | |
.. versionchanged:: 8.2 | |
No longer a proxy for ``self.stdout``. Now has its own independent stream | |
that is mixing `<stdout>` and `<stderr>`, in the order they were written. | |
""" | |
return self.output_bytes.decode(self.runner.charset, "replace").replace( | |
"\r\n", "\n" | |
) | |
def stdout(self) -> str: | |
"""The standard output as unicode string.""" | |
return self.stdout_bytes.decode(self.runner.charset, "replace").replace( | |
"\r\n", "\n" | |
) | |
def stderr(self) -> str: | |
"""The standard error as unicode string. | |
.. versionchanged:: 8.2 | |
No longer raise an exception, always returns the `<stderr>` string. | |
""" | |
return self.stderr_bytes.decode(self.runner.charset, "replace").replace( | |
"\r\n", "\n" | |
) | |
def __repr__(self) -> str: | |
exc_str = repr(self.exception) if self.exception else "okay" | |
return f"<{type(self).__name__} {exc_str}>" | |
class CliRunner: | |
"""The CLI runner provides functionality to invoke a Click command line | |
script for unittesting purposes in a isolated environment. This only | |
works in single-threaded systems without any concurrency as it changes the | |
global interpreter state. | |
:param charset: the character set for the input and output data. | |
:param env: a dictionary with environment variables for overriding. | |
:param echo_stdin: if this is set to `True`, then reading from `<stdin>` writes | |
to `<stdout>`. This is useful for showing examples in | |
some circumstances. Note that regular prompts | |
will automatically echo the input. | |
:param catch_exceptions: Whether to catch any exceptions other than | |
``SystemExit`` when running :meth:`~CliRunner.invoke`. | |
.. versionchanged:: 8.2 | |
Added the ``catch_exceptions`` parameter. | |
.. versionchanged:: 8.2 | |
``mix_stderr`` parameter has been removed. | |
""" | |
def __init__( | |
self, | |
charset: str = "utf-8", | |
env: cabc.Mapping[str, str | None] | None = None, | |
echo_stdin: bool = False, | |
catch_exceptions: bool = True, | |
) -> None: | |
self.charset = charset | |
self.env: cabc.Mapping[str, str | None] = env or {} | |
self.echo_stdin = echo_stdin | |
self.catch_exceptions = catch_exceptions | |
def get_default_prog_name(self, cli: Command) -> str: | |
"""Given a command object it will return the default program name | |
for it. The default is the `name` attribute or ``"root"`` if not | |
set. | |
""" | |
return cli.name or "root" | |
def make_env( | |
self, overrides: cabc.Mapping[str, str | None] | None = None | |
) -> cabc.Mapping[str, str | None]: | |
"""Returns the environment overrides for invoking a script.""" | |
rv = dict(self.env) | |
if overrides: | |
rv.update(overrides) | |
return rv | |
def isolation( | |
self, | |
input: str | bytes | t.IO[t.Any] | None = None, | |
env: cabc.Mapping[str, str | None] | None = None, | |
color: bool = False, | |
) -> cabc.Iterator[tuple[io.BytesIO, io.BytesIO, io.BytesIO]]: | |
"""A context manager that sets up the isolation for invoking of a | |
command line tool. This sets up `<stdin>` with the given input data | |
and `os.environ` with the overrides from the given dictionary. | |
This also rebinds some internals in Click to be mocked (like the | |
prompt functionality). | |
This is automatically done in the :meth:`invoke` method. | |
:param input: the input stream to put into `sys.stdin`. | |
:param env: the environment overrides as dictionary. | |
:param color: whether the output should contain color codes. The | |
application can still override this explicitly. | |
.. versionadded:: 8.2 | |
An additional output stream is returned, which is a mix of | |
`<stdout>` and `<stderr>` streams. | |
.. versionchanged:: 8.2 | |
Always returns the `<stderr>` stream. | |
.. versionchanged:: 8.0 | |
`<stderr>` is opened with ``errors="backslashreplace"`` | |
instead of the default ``"strict"``. | |
.. versionchanged:: 4.0 | |
Added the ``color`` parameter. | |
""" | |
bytes_input = make_input_stream(input, self.charset) | |
echo_input = None | |
old_stdin = sys.stdin | |
old_stdout = sys.stdout | |
old_stderr = sys.stderr | |
old_forced_width = formatting.FORCED_WIDTH | |
formatting.FORCED_WIDTH = 80 | |
env = self.make_env(env) | |
stream_mixer = StreamMixer() | |
if self.echo_stdin: | |
bytes_input = echo_input = t.cast( | |
t.BinaryIO, EchoingStdin(bytes_input, stream_mixer.stdout) | |
) | |
sys.stdin = text_input = _NamedTextIOWrapper( | |
bytes_input, encoding=self.charset, name="<stdin>", mode="r" | |
) | |
if self.echo_stdin: | |
# Force unbuffered reads, otherwise TextIOWrapper reads a | |
# large chunk which is echoed early. | |
text_input._CHUNK_SIZE = 1 # type: ignore | |
sys.stdout = _NamedTextIOWrapper( | |
stream_mixer.stdout, encoding=self.charset, name="<stdout>", mode="w" | |
) | |
sys.stderr = _NamedTextIOWrapper( | |
stream_mixer.stderr, | |
encoding=self.charset, | |
name="<stderr>", | |
mode="w", | |
errors="backslashreplace", | |
) | |
# type: ignore | |
def visible_input(prompt: str | None = None) -> str: | |
sys.stdout.write(prompt or "") | |
val = next(text_input).rstrip("\r\n") | |
sys.stdout.write(f"{val}\n") | |
sys.stdout.flush() | |
return val | |
# type: ignore | |
def hidden_input(prompt: str | None = None) -> str: | |
sys.stdout.write(f"{prompt or ''}\n") | |
sys.stdout.flush() | |
return next(text_input).rstrip("\r\n") | |
# type: ignore | |
def _getchar(echo: bool) -> str: | |
char = sys.stdin.read(1) | |
if echo: | |
sys.stdout.write(char) | |
sys.stdout.flush() | |
return char | |
default_color = color | |
def should_strip_ansi( | |
stream: t.IO[t.Any] | None = None, color: bool | None = None | |
) -> bool: | |
if color is None: | |
return not default_color | |
return not color | |
old_visible_prompt_func = termui.visible_prompt_func | |
old_hidden_prompt_func = termui.hidden_prompt_func | |
old__getchar_func = termui._getchar | |
old_should_strip_ansi = utils.should_strip_ansi # type: ignore | |
old__compat_should_strip_ansi = _compat.should_strip_ansi | |
termui.visible_prompt_func = visible_input | |
termui.hidden_prompt_func = hidden_input | |
termui._getchar = _getchar | |
utils.should_strip_ansi = should_strip_ansi # type: ignore | |
_compat.should_strip_ansi = should_strip_ansi | |
old_env = {} | |
try: | |
for key, value in env.items(): | |
old_env[key] = os.environ.get(key) | |
if value is None: | |
try: | |
del os.environ[key] | |
except Exception: | |
pass | |
else: | |
os.environ[key] = value | |
yield (stream_mixer.stdout, stream_mixer.stderr, stream_mixer.output) | |
finally: | |
for key, value in old_env.items(): | |
if value is None: | |
try: | |
del os.environ[key] | |
except Exception: | |
pass | |
else: | |
os.environ[key] = value | |
sys.stdout = old_stdout | |
sys.stderr = old_stderr | |
sys.stdin = old_stdin | |
termui.visible_prompt_func = old_visible_prompt_func | |
termui.hidden_prompt_func = old_hidden_prompt_func | |
termui._getchar = old__getchar_func | |
utils.should_strip_ansi = old_should_strip_ansi # type: ignore | |
_compat.should_strip_ansi = old__compat_should_strip_ansi | |
formatting.FORCED_WIDTH = old_forced_width | |
def invoke( | |
self, | |
cli: Command, | |
args: str | cabc.Sequence[str] | None = None, | |
input: str | bytes | t.IO[t.Any] | None = None, | |
env: cabc.Mapping[str, str | None] | None = None, | |
catch_exceptions: bool | None = None, | |
color: bool = False, | |
**extra: t.Any, | |
) -> Result: | |
"""Invokes a command in an isolated environment. The arguments are | |
forwarded directly to the command line script, the `extra` keyword | |
arguments are passed to the :meth:`~clickpkg.Command.main` function of | |
the command. | |
This returns a :class:`Result` object. | |
:param cli: the command to invoke | |
:param args: the arguments to invoke. It may be given as an iterable | |
or a string. When given as string it will be interpreted | |
as a Unix shell command. More details at | |
:func:`shlex.split`. | |
:param input: the input data for `sys.stdin`. | |
:param env: the environment overrides. | |
:param catch_exceptions: Whether to catch any other exceptions than | |
``SystemExit``. If :data:`None`, the value | |
from :class:`CliRunner` is used. | |
:param extra: the keyword arguments to pass to :meth:`main`. | |
:param color: whether the output should contain color codes. The | |
application can still override this explicitly. | |
.. versionadded:: 8.2 | |
The result object has the ``output_bytes`` attribute with | |
the mix of ``stdout_bytes`` and ``stderr_bytes``, as the user would | |
see it in its terminal. | |
.. versionchanged:: 8.2 | |
The result object always returns the ``stderr_bytes`` stream. | |
.. versionchanged:: 8.0 | |
The result object has the ``return_value`` attribute with | |
the value returned from the invoked command. | |
.. versionchanged:: 4.0 | |
Added the ``color`` parameter. | |
.. versionchanged:: 3.0 | |
Added the ``catch_exceptions`` parameter. | |
.. versionchanged:: 3.0 | |
The result object has the ``exc_info`` attribute with the | |
traceback if available. | |
""" | |
exc_info = None | |
if catch_exceptions is None: | |
catch_exceptions = self.catch_exceptions | |
with self.isolation(input=input, env=env, color=color) as outstreams: | |
return_value = None | |
exception: BaseException | None = None | |
exit_code = 0 | |
if isinstance(args, str): | |
args = shlex.split(args) | |
try: | |
prog_name = extra.pop("prog_name") | |
except KeyError: | |
prog_name = self.get_default_prog_name(cli) | |
try: | |
return_value = cli.main(args=args or (), prog_name=prog_name, **extra) | |
except SystemExit as e: | |
exc_info = sys.exc_info() | |
e_code = t.cast("int | t.Any | None", e.code) | |
if e_code is None: | |
e_code = 0 | |
if e_code != 0: | |
exception = e | |
if not isinstance(e_code, int): | |
sys.stdout.write(str(e_code)) | |
sys.stdout.write("\n") | |
e_code = 1 | |
exit_code = e_code | |
except Exception as e: | |
if not catch_exceptions: | |
raise | |
exception = e | |
exit_code = 1 | |
exc_info = sys.exc_info() | |
finally: | |
sys.stdout.flush() | |
sys.stderr.flush() | |
stdout = outstreams[0].getvalue() | |
stderr = outstreams[1].getvalue() | |
output = outstreams[2].getvalue() | |
return Result( | |
runner=self, | |
stdout_bytes=stdout, | |
stderr_bytes=stderr, | |
output_bytes=output, | |
return_value=return_value, | |
exit_code=exit_code, | |
exception=exception, | |
exc_info=exc_info, # type: ignore | |
) | |
def isolated_filesystem( | |
self, temp_dir: str | os.PathLike[str] | None = None | |
) -> cabc.Iterator[str]: | |
"""A context manager that creates a temporary directory and | |
changes the current working directory to it. This isolates tests | |
that affect the contents of the CWD to prevent them from | |
interfering with each other. | |
:param temp_dir: Create the temporary directory under this | |
directory. If given, the created directory is not removed | |
when exiting. | |
.. versionchanged:: 8.0 | |
Added the ``temp_dir`` parameter. | |
""" | |
cwd = os.getcwd() | |
dt = tempfile.mkdtemp(dir=temp_dir) | |
os.chdir(dt) | |
try: | |
yield dt | |
finally: | |
os.chdir(cwd) | |
if temp_dir is None: | |
try: | |
shutil.rmtree(dt) | |
except OSError: | |
pass | |