Spaces:
Build error
Build error
# MIT License | |
# Copyright (c) 2022 Hash Minner | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# The above copyright notice and this permission notice shall be included in all | |
# copies or substantial portions of the Software. | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE | |
import math | |
import time | |
import logging | |
logging.basicConfig(level=logging.DEBUG, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
logging.getLogger("pyrogram").setLevel(logging.WARNING) | |
async def progress_for_pyrogram( | |
current, | |
total, | |
ud_type, | |
message, | |
start | |
): | |
now = time.time() | |
diff = now - start | |
if round(diff % 10.00) == 0 or current == total: | |
# if round(current / total * 100, 0) % 5 == 0: | |
percentage = current * 100 / total | |
speed = current / diff | |
elapsed_time = round(diff) * 1000 | |
time_to_completion = round((total - current) / speed) * 1000 | |
estimated_total_time = elapsed_time + time_to_completion | |
elapsed_time = TimeFormatter(milliseconds=elapsed_time) | |
estimated_total_time = TimeFormatter(milliseconds=estimated_total_time) | |
progress = "[{0}{1}] \nP: {2}%\n".format( | |
''.join(["◾" for _ in range(math.floor(percentage / 5))]), | |
''.join(["◽" for _ in range(20 - math.floor(percentage / 5))]), | |
round(percentage, 2), | |
) | |
tmp = progress + "{0} of {1}\n\nSpeed: {2}/s\n\nETA: {3}\n\n".format( | |
humanbytes(current), | |
humanbytes(total), | |
humanbytes(speed), | |
# elapsed_time if elapsed_time != '' else "0 s", | |
estimated_total_time if estimated_total_time != '' else "0 s" | |
) | |
try: | |
await message.edit(text=f"{ud_type}\n {tmp}") | |
except Exception as e: | |
logger.info(f"Error {e}") | |
return | |
SIZE_UNITS = ['B', 'KB', 'MB', 'GB', 'TB', 'PB'] | |
def huanbytes(size_in_bytes) -> str: | |
if size_in_bytes is None: | |
return '0B' | |
index = 0 | |
while size_in_bytes >= 1024: | |
size_in_bytes /= 1024 | |
index += 1 | |
try: | |
return f'{round(size_in_bytes, 2)}{SIZE_UNITS[index]}' | |
except IndexError: | |
return 'File too large' | |
def humanbytes(size): | |
# https://stackoverflow.com/a/49361727/4723940 | |
# 2**10 = 1024 | |
if not size: | |
return "" | |
power = 2**10 | |
n = 0 | |
Dic_powerN = {0: ' ', 1: 'K', 2: 'M', 3: 'G', 4: 'T'} | |
while size > power: | |
size /= power | |
n += 1 | |
return f"{str(round(size, 2))} {Dic_powerN[n]}B" | |
def TimeFormatter(milliseconds: int) -> str: | |
seconds, milliseconds = divmod(milliseconds, 1000) | |
minutes, seconds = divmod(seconds, 60) | |
hours, minutes = divmod(minutes, 60) | |
days, hours = divmod(hours, 24) | |
tmp = ( | |
(f"{str(days)}d, " if days else "") | |
+ (f"{str(hours)}h, " if hours else "") | |
+ (f"{str(minutes)}m, " if minutes else "") | |
+ (f"{str(seconds)}s, " if seconds else "") | |
+ (f"{str(milliseconds)}ms, " if milliseconds else "") | |
) | |
return tmp[:-2] | |