Spaces:
Runtime error
Runtime error
import os | |
import requests | |
import asyncio | |
import edge_tts | |
import subprocess | |
import re | |
from telegram import Update, InputFile | |
from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, filters, ContextTypes | |
# ========== SCRIPT GENERATION ========== | |
def get_script(prompt, word_count): | |
url = f"https://text.pollinations.ai/{prompt} ({word_count} words)" | |
response = requests.get(url) | |
return response.text.strip() | |
# ========== TEXT SPLITTING ========== | |
def split_sentences(text): | |
return re.split(r'(?<=[.?!।])\s+', text.strip()) | |
# ========== IMAGE GENERATION ========== | |
def download_images(sentences): | |
os.makedirs("images", exist_ok=True) | |
image_paths = [] | |
for idx, sentence in enumerate(sentences): | |
url = f"https://image.pollinations.ai/prompt/ stunning 3d render styled images {sentence}?&nologo=True" | |
img_path = f"images/img_{idx:03}.jpg" | |
response = requests.get(url) | |
with open(img_path, "wb") as f: | |
f.write(response.content) | |
image_paths.append(img_path) | |
return image_paths | |
# ========== AUDIO DURATION ========== | |
def get_audio_duration(audio_path): | |
result = subprocess.run( | |
["ffprobe", "-v", "error", "-show_entries", "format=duration", | |
"-of", "default=noprint_wrappers=1:nokey=1", audio_path], | |
stdout=subprocess.PIPE, stderr=subprocess.STDOUT) | |
return float(result.stdout) | |
# ========== TTS + SUBTITLES ========== | |
async def generate_audio_and_sentence_subs(sentences, voice="hi-IN-SwaraNeural"): | |
audio_path = "output.mp3" | |
srt_path = "subtitles.srt" | |
full_text = " ".join(sentences) | |
communicate = edge_tts.Communicate(full_text, voice) | |
with open(audio_path, "wb") as f: | |
async for chunk in communicate.stream(): | |
if chunk["type"] == "audio": | |
f.write(chunk["data"]) | |
duration = get_audio_duration(audio_path) | |
per_sentence = duration / len(sentences) | |
with open(srt_path, "w", encoding="utf-8") as f_srt: | |
for i, sentence in enumerate(sentences): | |
start = i * per_sentence | |
end = (i + 1) * per_sentence | |
def format_time(seconds): | |
ms = int((seconds - int(seconds)) * 1000) | |
h = int(seconds // 3600) | |
m = int((seconds % 3600) // 60) | |
s = int(seconds % 60) | |
return f"{h:02}:{m:02}:{s:02},{ms:03}" | |
f_srt.write(f"{i+1}\n") | |
f_srt.write(f"{format_time(start)} --> {format_time(end)}\n") | |
f_srt.write(f"{sentence.strip()}\n\n") | |
return audio_path, srt_path, per_sentence | |
# ========== VIDEO CREATION ========== | |
def make_video(image_paths, audio_path, srt_path, duration_per_image, font_size, font_family, output="final_video.mp4"): | |
with open("images.txt", "w") as f: | |
for path in image_paths: | |
f.write(f"file '{os.path.abspath(path)}'\n") | |
f.write(f"duration {duration_per_image}\n") | |
f.write(f"file '{os.path.abspath(image_paths[-1])}'\n") | |
subprocess.call([ | |
"ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", "images.txt", | |
"-vsync", "vfr", "-pix_fmt", "yuv420p", "-vf", "scale=720:1280", "temp_video.mp4" | |
]) | |
subprocess.call([ | |
"ffmpeg", "-y", "-i", "temp_video.mp4", "-i", audio_path, "-vf", | |
f"subtitles={srt_path}:force_style='FontName={font_family},FontSize={font_size},BorderStyle=3,Outline=1,Shadow=0,Alignment=2'", | |
"-c:a", "aac", "-b:a", "192k", output | |
]) | |
return output | |
# ========== TELEGRAM BOT ========== | |
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
try: | |
text = update.message.text.strip() | |
if ";" not in text: | |
await update.message.reply_text("Send prompt in format: `prompt;word_count;font_size;font_name`", parse_mode="Markdown") | |
return | |
prompt, word_count, font_size, font_family = [x.strip() for x in text.split(";")] | |
word_count = int(word_count) | |
font_size = int(font_size) | |
await update.message.reply_text("Generating script...") | |
script = get_script(prompt, word_count) | |
sentences = split_sentences(script) | |
image_paths = download_images(sentences) | |
await update.message.reply_text("Generating audio & subtitles...") | |
audio_path, srt_path, duration_per_image = await generate_audio_and_sentence_subs(sentences) | |
await update.message.reply_text("Creating video...") | |
video_path = make_video(image_paths, audio_path, srt_path, duration_per_image, font_size, font_family) | |
await update.message.reply_video(video=InputFile(video_path), caption="Here is your video") | |
except Exception as e: | |
await update.message.reply_text(f"Error: {e}") | |
if __name__ == "__main__": | |
import logging | |
logging.basicConfig(level=logging.INFO) | |
TOKEN = os.getenv("BOT_TOKEN") or "7497022424:AAG-TeECKpf9NCDZQWCIDnGHrKe2MEbk1I8" | |
app = ApplicationBuilder().token(TOKEN).build() | |
app.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_message)) | |
app.run_polling() | |