Spaces:
Sleeping
Sleeping
File size: 4,435 Bytes
e227a15 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
import functools
import sys
from typing import Callable, Generator, Iterable, Iterator, Optional, Tuple, TypeVar
from pip._vendor.rich.progress import (
BarColumn,
DownloadColumn,
FileSizeColumn,
MofNCompleteColumn,
Progress,
ProgressColumn,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
TransferSpeedColumn,
)
from pip._internal.cli.spinners import RateLimiter
from pip._internal.req.req_install import InstallRequirement
from pip._internal.utils.logging import get_console, get_indentation
T = TypeVar("T")
ProgressRenderer = Callable[[Iterable[T]], Iterator[T]]
def _rich_download_progress_bar(
iterable: Iterable[bytes],
*,
bar_type: str,
size: Optional[int],
initial_progress: Optional[int] = None,
) -> Generator[bytes, None, None]:
assert bar_type == "on", "This should only be used in the default mode."
if not size:
total = float("inf")
columns: Tuple[ProgressColumn, ...] = (
TextColumn("[progress.description]{task.description}"),
SpinnerColumn("line", speed=1.5),
FileSizeColumn(),
TransferSpeedColumn(),
TimeElapsedColumn(),
)
else:
total = size
columns = (
TextColumn("[progress.description]{task.description}"),
BarColumn(),
DownloadColumn(),
TransferSpeedColumn(),
TextColumn("eta"),
TimeRemainingColumn(),
)
progress = Progress(*columns, refresh_per_second=5)
task_id = progress.add_task(" " * (get_indentation() + 2), total=total)
if initial_progress is not None:
progress.update(task_id, advance=initial_progress)
with progress:
for chunk in iterable:
yield chunk
progress.update(task_id, advance=len(chunk))
def _rich_install_progress_bar(
iterable: Iterable[InstallRequirement], *, total: int
) -> Iterator[InstallRequirement]:
columns = (
TextColumn("{task.fields[indent]}"),
BarColumn(),
MofNCompleteColumn(),
TextColumn("{task.description}"),
)
console = get_console()
bar = Progress(*columns, refresh_per_second=6, console=console, transient=True)
# Hiding the progress bar at initialization forces a refresh cycle to occur
# until the bar appears, avoiding very short flashes.
task = bar.add_task("", total=total, indent=" " * get_indentation(), visible=False)
with bar:
for req in iterable:
bar.update(task, description=rf"\[{req.name}]", visible=True)
yield req
bar.advance(task)
def _raw_progress_bar(
iterable: Iterable[bytes],
*,
size: Optional[int],
initial_progress: Optional[int] = None,
) -> Generator[bytes, None, None]:
def write_progress(current: int, total: int) -> None:
sys.stdout.write(f"Progress {current} of {total}\n")
sys.stdout.flush()
current = initial_progress or 0
total = size or 0
rate_limiter = RateLimiter(0.25)
write_progress(current, total)
for chunk in iterable:
current += len(chunk)
if rate_limiter.ready() or current == total:
write_progress(current, total)
rate_limiter.reset()
yield chunk
def get_download_progress_renderer(
*, bar_type: str, size: Optional[int] = None, initial_progress: Optional[int] = None
) -> ProgressRenderer[bytes]:
"""Get an object that can be used to render the download progress.
Returns a callable, that takes an iterable to "wrap".
"""
if bar_type == "on":
return functools.partial(
_rich_download_progress_bar,
bar_type=bar_type,
size=size,
initial_progress=initial_progress,
)
elif bar_type == "raw":
return functools.partial(
_raw_progress_bar,
size=size,
initial_progress=initial_progress,
)
else:
return iter # no-op, when passed an iterator
def get_install_progress_renderer(
*, bar_type: str, total: int
) -> ProgressRenderer[InstallRequirement]:
"""Get an object that can be used to render the install progress.
Returns a callable, that takes an iterable to "wrap".
"""
if bar_type == "on":
return functools.partial(_rich_install_progress_bar, total=total)
else:
return iter
|