taslim19
uploading: by drag
de1ee14
# This file is part of the AutoAnime distribution.
# Copyright (c) 2025 Kaif_00z
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# License can be found in <
# https://github.com/kaif-00z/AutoAnimeBot/blob/main/LICENSE > .
# if you are using this following code then don't forgot to give proper
# credit to t.me/kAiF_00z (github.com/kaif-00z)
import asyncio
import json
import math
import os
import re
import shutil
import subprocess
import time
from traceback import format_exc
import aiofiles
import aiohttp
import requests
from html_telegraph_poster import TelegraphPoster
from telethon.errors.rpcerrorlist import MessageNotModifiedError
from functions.config import Var
from libs.logger import LOGS
class Tools:
def __init__(self):
if Var.DEV_MODE:
self.ffmpeg_threads = int(os.cpu_count() or 0) + 2
else:
self.ffmpeg_threads = 2
async def async_searcher(
self,
url: str,
post: bool = None,
headers: dict = None,
params: dict = None,
json: dict = None,
data: dict = None,
ssl=None,
re_json: bool = False,
re_content: bool = False,
real: bool = False,
*args,
**kwargs,
):
async with aiohttp.ClientSession(headers=headers) as client:
if post:
data = await client.post(
url, json=json, data=data, ssl=ssl, *args, **kwargs
)
else:
data = await client.get(url, params=params, ssl=ssl, *args, **kwargs)
if re_json:
return await data.json()
if re_content:
return await data.read()
if real:
return data
return await data.text()
async def cover_dl(self, link):
try:
if not link:
return None
image = await self.async_searcher(link, re_content=True)
fn = f"thumbs/{link.split('/')[-1]}"
if not fn.endswith((".jpg" or ".png")):
fn += ".jpg"
async with aiofiles.open(fn, "wb") as file:
await file.write(image)
return fn
except Exception as error:
LOGS.exception(format_exc())
LOGS.error(str(error))
async def mediainfo(self, file, bot):
try:
process = await asyncio.create_subprocess_shell(
f"mediainfo '''{file}''' --Output=HTML",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
out = stdout.decode()
client = TelegraphPoster(use_api=True)
client.create_api_token("Mediainfo")
page = client.post(
title="Mediainfo",
author=((await bot.get_me()).first_name),
author_url=f"https://t.me/{((await bot.get_me()).username)}",
text=out,
)
return page.get("url")
except Exception as error:
LOGS.exception(format_exc())
LOGS.error(str(error))
async def _poster(self, bot, anime_info, channel_id=None):
thumb = await self.cover_dl((await anime_info.get_cover()))
caption = await anime_info.get_caption()
return await bot.upload_poster(
thumb or "assest/poster_not_found.jpg",
caption,
channel_id if channel_id else None,
)
async def get_chat_info(self, bot, anime_info, dB):
try:
chat_info = await dB.get_anime_channel_info(anime_info.proper_name)
if not chat_info:
chat_id = await bot.create_channel(
(await anime_info.get_english()),
(await self.cover_dl((await anime_info.get_poster()))),
)
invite_link = await bot.generate_invite_link(chat_id)
chat_info = {"chat_id": chat_id, "invite_link": invite_link}
await dB.add_anime_channel_info(anime_info.proper_name, chat_info)
return chat_info
except BaseException:
LOGS.error(str(format_exc()))
def init_dir(self):
if not os.path.exists("thumb.jpg"):
content = requests.get(Var.THUMB).content
with open("thumb.jpg", "wb") as f:
f.write(content)
if not os.path.isdir("encode/"):
os.mkdir("encode/")
if not os.path.isdir("thumbs/"):
os.mkdir("thumbs/")
if not os.path.isdir("downloads/"):
os.mkdir("downloads/")
def hbs(self, size):
if not size:
return ""
power = 2**10
raised_to_pow = 0
dict_power_n = {0: "B", 1: "K", 2: "M", 3: "G", 4: "T", 5: "P"}
while size > power:
size /= power
raised_to_pow += 1
return str(round(size, 2)) + " " + dict_power_n[raised_to_pow] + "B"
def ts(self, milliseconds: int) -> str:
seconds, milliseconds = divmod(int(milliseconds), 1000)
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
tmp = (
((str(days) + "d:") if days else "")
+ ((str(hours) + "h:") if hours else "")
+ ((str(minutes) + "m:") if minutes else "")
+ ((str(seconds) + "s:") if seconds else "")
+ ((str(milliseconds) + "ms:") if milliseconds else "")
)
return tmp[:-1]
async def rename_file(self, dl, out):
try:
os.rename(dl, out)
except BaseException:
return False, format_exc()
return True, out
async def bash_(self, cmd, run_code=0):
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
err = stderr.decode().strip() or None
out = stdout.decode().strip()
if not run_code and err:
if match := re.match("\\/bin\\/sh: (.*): ?(\\w+): not found", err):
return out, f"{match.group(2).upper()}_NOT_FOUND"
return out, err
async def frame_counts(self, dl):
_x, _y = await self.bash_(
f'mediainfo --fullscan """{dl}""" | grep "Frame count"'
)
if _y and _y.endswith("NOT_FOUND"):
LOGS.error(f"ERROR: `{_y}`")
return False
return _x.split(":")[1].split("\n")[0]
async def compress(self, dl, out, log_msg):
total_frames = await self.frame_counts(dl)
if not total_frames:
return False, "Unable to Count The Frames!"
_progress = f"progress-{time.time()}.txt"
cmd = f'''{Var.FFMPEG} -hide_banner -loglevel quiet -progress """{_progress}""" -i """{dl}""" -metadata "Encoded By"="https://github.com/kaif-00z/AutoAnimeBot/" -preset ultrafast -c:v libx265 -crf {Var.CRF} -map 0:v -c:a aac -map 0:a -c:s copy -map 0:s? -threads {self.ffmpeg_threads} """{out}""" -y'''
process = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
d_time = time.time()
while process.returncode != 0:
await asyncio.sleep(5)
with open(_progress, "r+") as fil:
text = fil.read()
frames = re.findall("frame=(\\d+)", text)
size = re.findall("total_size=(\\d+)", text)
speed = 0
if not os.path.exists(out) or os.path.getsize(out) == 0:
return False, "Unable To Encode This Video!"
if len(frames):
elapse = int(frames[-1])
if len(size):
size = int(size[-1])
per = elapse * 100 / int(total_frames)
time_diff = time.time() - int(d_time)
speed = round(elapse / time_diff, 2)
if int(speed) != 0:
some_eta = ((int(total_frames) - elapse) / speed) * 1000
text = f"**Successfully Downloaded The Anime**\n\n **File Name:** ```{dl.split('/')[-1]}```\n\n**STATUS:** \n"
progress_str = "`[{0}{1}] {2}%\n\n`".format(
"".join("●" for _ in range(math.floor(per / 5))),
"".join("" for _ in range(20 - math.floor(per / 5))),
round(per, 2),
)
e_size = f"{self.hbs(size)} of ~{self.hbs((size / per) * 100)}"
eta = f"~{self.ts(some_eta)}"
try:
_new_log_msg = await log_msg.edit(
text
+ progress_str
+ "`"
+ e_size
+ "`"
+ "\n\n`"
+ eta
+ "`"
)
except MessageNotModifiedError:
pass
try:
os.remove(_progress)
except BaseException:
pass
return True, _new_log_msg
async def genss(self, file):
process = subprocess.Popen(
# just for better codefactor rating :)
[shutil.which("mediainfo"), file, "--Output=JSON"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
stdout, stderr = process.communicate()
out = stdout.decode().strip()
z = json.loads(out)
p = z["media"]["track"][0]["Duration"]
return int(p.split(".")[-2])
def stdr(self, seconds: int) -> str:
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
if len(str(minutes)) == 1:
minutes = "0" + str(minutes)
if len(str(hours)) == 1:
hours = "0" + str(hours)
if len(str(seconds)) == 1:
seconds = "0" + str(seconds)
dur = (
((str(hours) + ":") if hours else "00:")
+ ((str(minutes) + ":") if minutes else "00:")
+ ((str(seconds)) if seconds else "")
)
return dur
async def duration_s(self, file):
tsec = await self.genss(file)
x = round(tsec / 5)
y = round(tsec / 5 + 30)
pin = self.stdr(x)
if y < tsec:
pon = self.stdr(y)
else:
pon = self.stdr(tsec)
return pin, pon
async def gen_ss_sam(self, _hash, filename):
try:
ss_path, sp_path = None, None
os.mkdir(_hash)
tsec = await self.genss(filename)
fps = 10 / tsec
ncmd = f"ffmpeg -i '{filename}' -vf fps={fps} -vframes 10 '{_hash}/pic%01d.png'"
process = await asyncio.create_subprocess_shell(
ncmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
await process.communicate()
ss, dd = await self.duration_s(filename)
__ = filename.split(".mkv")[-2]
out = __ + "_sample.mkv"
_ncmd = f'ffmpeg -i """{filename}""" -preset ultrafast -ss {ss} -to {dd} -c:v libx265 -crf 27 -map 0:v -c:a aac -map 0:a -c:s copy -map 0:s? """{out}""" -y'
process = await asyncio.create_subprocess_shell(
_ncmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
er = stderr.decode().strip()
try:
if er:
if not os.path.exists(out) or os.path.getsize(out) == 0:
LOGS.error(str(er))
return (ss_path, sp_path)
except BaseException:
pass
return _hash, out
except Exception as error:
LOGS.error(str(error))
LOGS.exception(format_exc())