Testapi / bot.py
jkdwhdvkjdbhvjfvh's picture
Update bot.py
2332137 verified
import os
import requests
import asyncio
import edge_tts
import subprocess
import re
from telegram import Update, InputFile
from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, filters, ContextTypes
# ========== SCRIPT GENERATION ==========
def get_script(prompt, word_count):
url = f"https://text.pollinations.ai/{prompt} ({word_count} words)"
response = requests.get(url)
return response.text.strip()
# ========== TEXT SPLITTING ==========
def split_sentences(text):
return re.split(r'(?<=[.?!।])\s+', text.strip())
# ========== IMAGE GENERATION ==========
def download_images(sentences):
os.makedirs("images", exist_ok=True)
image_paths = []
for idx, sentence in enumerate(sentences):
url = f"https://image.pollinations.ai/prompt/ stunning 3d render styled images {sentence}?&nologo=True"
img_path = f"images/img_{idx:03}.jpg"
response = requests.get(url)
with open(img_path, "wb") as f:
f.write(response.content)
image_paths.append(img_path)
return image_paths
# ========== AUDIO DURATION ==========
def get_audio_duration(audio_path):
result = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", audio_path],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
return float(result.stdout)
# ========== TTS + SUBTITLES ==========
async def generate_audio_and_sentence_subs(sentences, voice="hi-IN-SwaraNeural"):
audio_path = "output.mp3"
srt_path = "subtitles.srt"
full_text = " ".join(sentences)
communicate = edge_tts.Communicate(full_text, voice)
with open(audio_path, "wb") as f:
async for chunk in communicate.stream():
if chunk["type"] == "audio":
f.write(chunk["data"])
duration = get_audio_duration(audio_path)
per_sentence = duration / len(sentences)
with open(srt_path, "w", encoding="utf-8") as f_srt:
for i, sentence in enumerate(sentences):
start = i * per_sentence
end = (i + 1) * per_sentence
def format_time(seconds):
ms = int((seconds - int(seconds)) * 1000)
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
return f"{h:02}:{m:02}:{s:02},{ms:03}"
f_srt.write(f"{i+1}\n")
f_srt.write(f"{format_time(start)} --> {format_time(end)}\n")
f_srt.write(f"{sentence.strip()}\n\n")
return audio_path, srt_path, per_sentence
# ========== VIDEO CREATION ==========
def make_video(image_paths, audio_path, srt_path, duration_per_image, font_size, font_family, output="final_video.mp4"):
with open("images.txt", "w") as f:
for path in image_paths:
f.write(f"file '{os.path.abspath(path)}'\n")
f.write(f"duration {duration_per_image}\n")
f.write(f"file '{os.path.abspath(image_paths[-1])}'\n")
subprocess.call([
"ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", "images.txt",
"-vsync", "vfr", "-pix_fmt", "yuv420p", "-vf", "scale=720:1280", "temp_video.mp4"
])
subprocess.call([
"ffmpeg", "-y", "-i", "temp_video.mp4", "-i", audio_path, "-vf",
f"subtitles={srt_path}:force_style='FontName={font_family},FontSize={font_size},BorderStyle=3,Outline=1,Shadow=0,Alignment=2'",
"-c:a", "aac", "-b:a", "192k", output
])
return output
# ========== TELEGRAM BOT ==========
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
try:
text = update.message.text.strip()
if ";" not in text:
await update.message.reply_text("Send prompt in format: `prompt;word_count;font_size;font_name`", parse_mode="Markdown")
return
prompt, word_count, font_size, font_family = [x.strip() for x in text.split(";")]
word_count = int(word_count)
font_size = int(font_size)
await update.message.reply_text("Generating script...")
script = get_script(prompt, word_count)
sentences = split_sentences(script)
image_paths = download_images(sentences)
await update.message.reply_text("Generating audio & subtitles...")
audio_path, srt_path, duration_per_image = await generate_audio_and_sentence_subs(sentences)
await update.message.reply_text("Creating video...")
video_path = make_video(image_paths, audio_path, srt_path, duration_per_image, font_size, font_family)
await update.message.reply_video(video=InputFile(video_path), caption="Here is your video")
except Exception as e:
await update.message.reply_text(f"Error: {e}")
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.INFO)
TOKEN = os.getenv("BOT_TOKEN") or "7497022424:AAG-TeECKpf9NCDZQWCIDnGHrKe2MEbk1I8"
app = ApplicationBuilder().token(TOKEN).build()
app.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_message))
app.run_polling()