Spaces:
Running
Running
# This file is part of the AutoAnime distribution. | |
# Copyright (c) 2025 Kaif_00z | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, version 3. | |
# | |
# This program is distributed in the hope that it will be useful, but | |
# WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
# General Public License for more details. | |
# | |
# License can be found in < | |
# https://github.com/kaif-00z/AutoAnimeBot/blob/main/LICENSE > . | |
# if you are using this following code then don't forgot to give proper | |
# credit to t.me/kAiF_00z (github.com/kaif-00z) | |
import asyncio | |
import json | |
import math | |
import os | |
import re | |
import shutil | |
import subprocess | |
import time | |
from traceback import format_exc | |
import aiofiles | |
import aiohttp | |
import requests | |
from html_telegraph_poster import TelegraphPoster | |
from telethon.errors.rpcerrorlist import MessageNotModifiedError | |
from functions.config import Var | |
from libs.logger import LOGS | |
class Tools: | |
def __init__(self): | |
if Var.DEV_MODE: | |
self.ffmpeg_threads = int(os.cpu_count() or 0) + 2 | |
else: | |
self.ffmpeg_threads = 2 | |
async def async_searcher( | |
self, | |
url: str, | |
post: bool = None, | |
headers: dict = None, | |
params: dict = None, | |
json: dict = None, | |
data: dict = None, | |
ssl=None, | |
re_json: bool = False, | |
re_content: bool = False, | |
real: bool = False, | |
*args, | |
**kwargs, | |
): | |
async with aiohttp.ClientSession(headers=headers) as client: | |
if post: | |
data = await client.post( | |
url, json=json, data=data, ssl=ssl, *args, **kwargs | |
) | |
else: | |
data = await client.get(url, params=params, ssl=ssl, *args, **kwargs) | |
if re_json: | |
return await data.json() | |
if re_content: | |
return await data.read() | |
if real: | |
return data | |
return await data.text() | |
async def cover_dl(self, link): | |
try: | |
if not link: | |
return None | |
image = await self.async_searcher(link, re_content=True) | |
fn = f"thumbs/{link.split('/')[-1]}" | |
if not fn.endswith((".jpg" or ".png")): | |
fn += ".jpg" | |
async with aiofiles.open(fn, "wb") as file: | |
await file.write(image) | |
return fn | |
except Exception as error: | |
LOGS.exception(format_exc()) | |
LOGS.error(str(error)) | |
async def mediainfo(self, file, bot): | |
try: | |
process = await asyncio.create_subprocess_shell( | |
f"mediainfo '''{file}''' --Output=HTML", | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE, | |
) | |
stdout, stderr = await process.communicate() | |
out = stdout.decode() | |
client = TelegraphPoster(use_api=True) | |
client.create_api_token("Mediainfo") | |
page = client.post( | |
title="Mediainfo", | |
author=((await bot.get_me()).first_name), | |
author_url=f"https://t.me/{((await bot.get_me()).username)}", | |
text=out, | |
) | |
return page.get("url") | |
except Exception as error: | |
LOGS.exception(format_exc()) | |
LOGS.error(str(error)) | |
async def _poster(self, bot, anime_info, channel_id=None): | |
thumb = await self.cover_dl((await anime_info.get_cover())) | |
caption = await anime_info.get_caption() | |
return await bot.upload_poster( | |
thumb or "assest/poster_not_found.jpg", | |
caption, | |
channel_id if channel_id else None, | |
) | |
async def get_chat_info(self, bot, anime_info, dB): | |
try: | |
chat_info = await dB.get_anime_channel_info(anime_info.proper_name) | |
if not chat_info: | |
chat_id = await bot.create_channel( | |
(await anime_info.get_english()), | |
(await self.cover_dl((await anime_info.get_poster()))), | |
) | |
invite_link = await bot.generate_invite_link(chat_id) | |
chat_info = {"chat_id": chat_id, "invite_link": invite_link} | |
await dB.add_anime_channel_info(anime_info.proper_name, chat_info) | |
return chat_info | |
except BaseException: | |
LOGS.error(str(format_exc())) | |
def init_dir(self): | |
if not os.path.exists("thumb.jpg"): | |
content = requests.get(Var.THUMB).content | |
with open("thumb.jpg", "wb") as f: | |
f.write(content) | |
if not os.path.isdir("encode/"): | |
os.mkdir("encode/") | |
if not os.path.isdir("thumbs/"): | |
os.mkdir("thumbs/") | |
if not os.path.isdir("downloads/"): | |
os.mkdir("downloads/") | |
def hbs(self, size): | |
if not size: | |
return "" | |
power = 2**10 | |
raised_to_pow = 0 | |
dict_power_n = {0: "B", 1: "K", 2: "M", 3: "G", 4: "T", 5: "P"} | |
while size > power: | |
size /= power | |
raised_to_pow += 1 | |
return str(round(size, 2)) + " " + dict_power_n[raised_to_pow] + "B" | |
def ts(self, milliseconds: int) -> str: | |
seconds, milliseconds = divmod(int(milliseconds), 1000) | |
minutes, seconds = divmod(seconds, 60) | |
hours, minutes = divmod(minutes, 60) | |
days, hours = divmod(hours, 24) | |
tmp = ( | |
((str(days) + "d:") if days else "") | |
+ ((str(hours) + "h:") if hours else "") | |
+ ((str(minutes) + "m:") if minutes else "") | |
+ ((str(seconds) + "s:") if seconds else "") | |
+ ((str(milliseconds) + "ms:") if milliseconds else "") | |
) | |
return tmp[:-1] | |
async def rename_file(self, dl, out): | |
try: | |
os.rename(dl, out) | |
except BaseException: | |
return False, format_exc() | |
return True, out | |
async def bash_(self, cmd, run_code=0): | |
process = await asyncio.create_subprocess_shell( | |
cmd, | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE, | |
) | |
stdout, stderr = await process.communicate() | |
err = stderr.decode().strip() or None | |
out = stdout.decode().strip() | |
if not run_code and err: | |
if match := re.match("\\/bin\\/sh: (.*): ?(\\w+): not found", err): | |
return out, f"{match.group(2).upper()}_NOT_FOUND" | |
return out, err | |
async def frame_counts(self, dl): | |
_x, _y = await self.bash_( | |
f'mediainfo --fullscan """{dl}""" | grep "Frame count"' | |
) | |
if _y and _y.endswith("NOT_FOUND"): | |
LOGS.error(f"ERROR: `{_y}`") | |
return False | |
return _x.split(":")[1].split("\n")[0] | |
async def compress(self, dl, out, log_msg): | |
total_frames = await self.frame_counts(dl) | |
if not total_frames: | |
return False, "Unable to Count The Frames!" | |
_progress = f"progress-{time.time()}.txt" | |
cmd = f'''{Var.FFMPEG} -hide_banner -loglevel quiet -progress """{_progress}""" -i """{dl}""" -metadata "Encoded By"="https://github.com/kaif-00z/AutoAnimeBot/" -preset ultrafast -c:v libx265 -crf {Var.CRF} -map 0:v -c:a aac -map 0:a -c:s copy -map 0:s? -threads {self.ffmpeg_threads} """{out}""" -y''' | |
process = await asyncio.create_subprocess_shell( | |
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE | |
) | |
d_time = time.time() | |
while process.returncode != 0: | |
await asyncio.sleep(5) | |
with open(_progress, "r+") as fil: | |
text = fil.read() | |
frames = re.findall("frame=(\\d+)", text) | |
size = re.findall("total_size=(\\d+)", text) | |
speed = 0 | |
if not os.path.exists(out) or os.path.getsize(out) == 0: | |
return False, "Unable To Encode This Video!" | |
if len(frames): | |
elapse = int(frames[-1]) | |
if len(size): | |
size = int(size[-1]) | |
per = elapse * 100 / int(total_frames) | |
time_diff = time.time() - int(d_time) | |
speed = round(elapse / time_diff, 2) | |
if int(speed) != 0: | |
some_eta = ((int(total_frames) - elapse) / speed) * 1000 | |
text = f"**Successfully Downloaded The Anime**\n\n **File Name:** ```{dl.split('/')[-1]}```\n\n**STATUS:** \n" | |
progress_str = "`[{0}{1}] {2}%\n\n`".format( | |
"".join("●" for _ in range(math.floor(per / 5))), | |
"".join("" for _ in range(20 - math.floor(per / 5))), | |
round(per, 2), | |
) | |
e_size = f"{self.hbs(size)} of ~{self.hbs((size / per) * 100)}" | |
eta = f"~{self.ts(some_eta)}" | |
try: | |
_new_log_msg = await log_msg.edit( | |
text | |
+ progress_str | |
+ "`" | |
+ e_size | |
+ "`" | |
+ "\n\n`" | |
+ eta | |
+ "`" | |
) | |
except MessageNotModifiedError: | |
pass | |
try: | |
os.remove(_progress) | |
except BaseException: | |
pass | |
return True, _new_log_msg | |
async def genss(self, file): | |
process = subprocess.Popen( | |
# just for better codefactor rating :) | |
[shutil.which("mediainfo"), file, "--Output=JSON"], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
) | |
stdout, stderr = process.communicate() | |
out = stdout.decode().strip() | |
z = json.loads(out) | |
p = z["media"]["track"][0]["Duration"] | |
return int(p.split(".")[-2]) | |
def stdr(self, seconds: int) -> str: | |
minutes, seconds = divmod(seconds, 60) | |
hours, minutes = divmod(minutes, 60) | |
if len(str(minutes)) == 1: | |
minutes = "0" + str(minutes) | |
if len(str(hours)) == 1: | |
hours = "0" + str(hours) | |
if len(str(seconds)) == 1: | |
seconds = "0" + str(seconds) | |
dur = ( | |
((str(hours) + ":") if hours else "00:") | |
+ ((str(minutes) + ":") if minutes else "00:") | |
+ ((str(seconds)) if seconds else "") | |
) | |
return dur | |
async def duration_s(self, file): | |
tsec = await self.genss(file) | |
x = round(tsec / 5) | |
y = round(tsec / 5 + 30) | |
pin = self.stdr(x) | |
if y < tsec: | |
pon = self.stdr(y) | |
else: | |
pon = self.stdr(tsec) | |
return pin, pon | |
async def gen_ss_sam(self, _hash, filename): | |
try: | |
ss_path, sp_path = None, None | |
os.mkdir(_hash) | |
tsec = await self.genss(filename) | |
fps = 10 / tsec | |
ncmd = f"ffmpeg -i '{filename}' -vf fps={fps} -vframes 10 '{_hash}/pic%01d.png'" | |
process = await asyncio.create_subprocess_shell( | |
ncmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE | |
) | |
await process.communicate() | |
ss, dd = await self.duration_s(filename) | |
__ = filename.split(".mkv")[-2] | |
out = __ + "_sample.mkv" | |
_ncmd = f'ffmpeg -i """{filename}""" -preset ultrafast -ss {ss} -to {dd} -c:v libx265 -crf 27 -map 0:v -c:a aac -map 0:a -c:s copy -map 0:s? """{out}""" -y' | |
process = await asyncio.create_subprocess_shell( | |
_ncmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE | |
) | |
stdout, stderr = await process.communicate() | |
er = stderr.decode().strip() | |
try: | |
if er: | |
if not os.path.exists(out) or os.path.getsize(out) == 0: | |
LOGS.error(str(er)) | |
return (ss_path, sp_path) | |
except BaseException: | |
pass | |
return _hash, out | |
except Exception as error: | |
LOGS.error(str(error)) | |
LOGS.exception(format_exc()) | |