# This file is part of the AutoAnime distribution. # Copyright (c) 2025 Kaif_00z # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, version 3. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # License can be found in < # https://github.com/kaif-00z/AutoAnimeBot/blob/main/LICENSE > . # if you are using this following code then don't forgot to give proper # credit to t.me/kAiF_00z (github.com/kaif-00z) import asyncio import json import math import os import re import shutil import subprocess import time from traceback import format_exc import aiofiles import aiohttp import requests from html_telegraph_poster import TelegraphPoster from telethon.errors.rpcerrorlist import MessageNotModifiedError from functions.config import Var from libs.logger import LOGS class Tools: def __init__(self): if Var.DEV_MODE: self.ffmpeg_threads = int(os.cpu_count() or 0) + 2 else: self.ffmpeg_threads = 2 async def async_searcher( self, url: str, post: bool = None, headers: dict = None, params: dict = None, json: dict = None, data: dict = None, ssl=None, re_json: bool = False, re_content: bool = False, real: bool = False, *args, **kwargs, ): async with aiohttp.ClientSession(headers=headers) as client: if post: data = await client.post( url, json=json, data=data, ssl=ssl, *args, **kwargs ) else: data = await client.get(url, params=params, ssl=ssl, *args, **kwargs) if re_json: return await data.json() if re_content: return await data.read() if real: return data return await data.text() async def cover_dl(self, link): try: if not link: return None image = await self.async_searcher(link, re_content=True) fn = f"thumbs/{link.split('/')[-1]}" if not fn.endswith((".jpg" or ".png")): fn += ".jpg" async with aiofiles.open(fn, "wb") as file: await file.write(image) return fn except Exception as error: LOGS.exception(format_exc()) LOGS.error(str(error)) async def mediainfo(self, file, bot): try: process = await asyncio.create_subprocess_shell( f"mediainfo '''{file}''' --Output=HTML", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() out = stdout.decode() client = TelegraphPoster(use_api=True) client.create_api_token("Mediainfo") page = client.post( title="Mediainfo", author=((await bot.get_me()).first_name), author_url=f"https://t.me/{((await bot.get_me()).username)}", text=out, ) return page.get("url") except Exception as error: LOGS.exception(format_exc()) LOGS.error(str(error)) async def _poster(self, bot, anime_info, channel_id=None): thumb = await self.cover_dl((await anime_info.get_cover())) caption = await anime_info.get_caption() return await bot.upload_poster( thumb or "assest/poster_not_found.jpg", caption, channel_id if channel_id else None, ) async def get_chat_info(self, bot, anime_info, dB): try: chat_info = await dB.get_anime_channel_info(anime_info.proper_name) if not chat_info: chat_id = await bot.create_channel( (await anime_info.get_english()), (await self.cover_dl((await anime_info.get_poster()))), ) invite_link = await bot.generate_invite_link(chat_id) chat_info = {"chat_id": chat_id, "invite_link": invite_link} await dB.add_anime_channel_info(anime_info.proper_name, chat_info) return chat_info except BaseException: LOGS.error(str(format_exc())) def init_dir(self): if not os.path.exists("thumb.jpg"): content = requests.get(Var.THUMB).content with open("thumb.jpg", "wb") as f: f.write(content) if not os.path.isdir("encode/"): os.mkdir("encode/") if not os.path.isdir("thumbs/"): os.mkdir("thumbs/") if not os.path.isdir("downloads/"): os.mkdir("downloads/") def hbs(self, size): if not size: return "" power = 2**10 raised_to_pow = 0 dict_power_n = {0: "B", 1: "K", 2: "M", 3: "G", 4: "T", 5: "P"} while size > power: size /= power raised_to_pow += 1 return str(round(size, 2)) + " " + dict_power_n[raised_to_pow] + "B" def ts(self, milliseconds: int) -> str: seconds, milliseconds = divmod(int(milliseconds), 1000) minutes, seconds = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) days, hours = divmod(hours, 24) tmp = ( ((str(days) + "d:") if days else "") + ((str(hours) + "h:") if hours else "") + ((str(minutes) + "m:") if minutes else "") + ((str(seconds) + "s:") if seconds else "") + ((str(milliseconds) + "ms:") if milliseconds else "") ) return tmp[:-1] async def rename_file(self, dl, out): try: os.rename(dl, out) except BaseException: return False, format_exc() return True, out async def bash_(self, cmd, run_code=0): process = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() err = stderr.decode().strip() or None out = stdout.decode().strip() if not run_code and err: if match := re.match("\\/bin\\/sh: (.*): ?(\\w+): not found", err): return out, f"{match.group(2).upper()}_NOT_FOUND" return out, err async def frame_counts(self, dl): _x, _y = await self.bash_( f'mediainfo --fullscan """{dl}""" | grep "Frame count"' ) if _y and _y.endswith("NOT_FOUND"): LOGS.error(f"ERROR: `{_y}`") return False return _x.split(":")[1].split("\n")[0] async def compress(self, dl, out, log_msg): total_frames = await self.frame_counts(dl) if not total_frames: return False, "Unable to Count The Frames!" _progress = f"progress-{time.time()}.txt" cmd = f'''{Var.FFMPEG} -hide_banner -loglevel quiet -progress """{_progress}""" -i """{dl}""" -metadata "Encoded By"="https://github.com/kaif-00z/AutoAnimeBot/" -preset ultrafast -c:v libx265 -crf {Var.CRF} -map 0:v -c:a aac -map 0:a -c:s copy -map 0:s? -threads {self.ffmpeg_threads} """{out}""" -y''' process = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) d_time = time.time() while process.returncode != 0: await asyncio.sleep(5) with open(_progress, "r+") as fil: text = fil.read() frames = re.findall("frame=(\\d+)", text) size = re.findall("total_size=(\\d+)", text) speed = 0 if not os.path.exists(out) or os.path.getsize(out) == 0: return False, "Unable To Encode This Video!" if len(frames): elapse = int(frames[-1]) if len(size): size = int(size[-1]) per = elapse * 100 / int(total_frames) time_diff = time.time() - int(d_time) speed = round(elapse / time_diff, 2) if int(speed) != 0: some_eta = ((int(total_frames) - elapse) / speed) * 1000 text = f"**Successfully Downloaded The Anime**\n\n **File Name:** ```{dl.split('/')[-1]}```\n\n**STATUS:** \n" progress_str = "`[{0}{1}] {2}%\n\n`".format( "".join("●" for _ in range(math.floor(per / 5))), "".join("" for _ in range(20 - math.floor(per / 5))), round(per, 2), ) e_size = f"{self.hbs(size)} of ~{self.hbs((size / per) * 100)}" eta = f"~{self.ts(some_eta)}" try: _new_log_msg = await log_msg.edit( text + progress_str + "`" + e_size + "`" + "\n\n`" + eta + "`" ) except MessageNotModifiedError: pass try: os.remove(_progress) except BaseException: pass return True, _new_log_msg async def genss(self, file): process = subprocess.Popen( # just for better codefactor rating :) [shutil.which("mediainfo"), file, "--Output=JSON"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) stdout, stderr = process.communicate() out = stdout.decode().strip() z = json.loads(out) p = z["media"]["track"][0]["Duration"] return int(p.split(".")[-2]) def stdr(self, seconds: int) -> str: minutes, seconds = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) if len(str(minutes)) == 1: minutes = "0" + str(minutes) if len(str(hours)) == 1: hours = "0" + str(hours) if len(str(seconds)) == 1: seconds = "0" + str(seconds) dur = ( ((str(hours) + ":") if hours else "00:") + ((str(minutes) + ":") if minutes else "00:") + ((str(seconds)) if seconds else "") ) return dur async def duration_s(self, file): tsec = await self.genss(file) x = round(tsec / 5) y = round(tsec / 5 + 30) pin = self.stdr(x) if y < tsec: pon = self.stdr(y) else: pon = self.stdr(tsec) return pin, pon async def gen_ss_sam(self, _hash, filename): try: ss_path, sp_path = None, None os.mkdir(_hash) tsec = await self.genss(filename) fps = 10 / tsec ncmd = f"ffmpeg -i '{filename}' -vf fps={fps} -vframes 10 '{_hash}/pic%01d.png'" process = await asyncio.create_subprocess_shell( ncmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) await process.communicate() ss, dd = await self.duration_s(filename) __ = filename.split(".mkv")[-2] out = __ + "_sample.mkv" _ncmd = f'ffmpeg -i """{filename}""" -preset ultrafast -ss {ss} -to {dd} -c:v libx265 -crf 27 -map 0:v -c:a aac -map 0:a -c:s copy -map 0:s? """{out}""" -y' process = await asyncio.create_subprocess_shell( _ncmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() er = stderr.decode().strip() try: if er: if not os.path.exists(out) or os.path.getsize(out) == 0: LOGS.error(str(er)) return (ss_path, sp_path) except BaseException: pass return _hash, out except Exception as error: LOGS.error(str(error)) LOGS.exception(format_exc())