Spaces:
Runtime error
Runtime error
File size: 5,124 Bytes
f529d4f 2332137 f529d4f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
import os
import requests
import asyncio
import edge_tts
import subprocess
import re
from telegram import Update, InputFile
from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, filters, ContextTypes
# ========== SCRIPT GENERATION ==========
def get_script(prompt, word_count):
url = f"https://text.pollinations.ai/{prompt} ({word_count} words)"
response = requests.get(url)
return response.text.strip()
# ========== TEXT SPLITTING ==========
def split_sentences(text):
return re.split(r'(?<=[.?!।])\s+', text.strip())
# ========== IMAGE GENERATION ==========
def download_images(sentences):
os.makedirs("images", exist_ok=True)
image_paths = []
for idx, sentence in enumerate(sentences):
url = f"https://image.pollinations.ai/prompt/ stunning 3d render styled images {sentence}?&nologo=True"
img_path = f"images/img_{idx:03}.jpg"
response = requests.get(url)
with open(img_path, "wb") as f:
f.write(response.content)
image_paths.append(img_path)
return image_paths
# ========== AUDIO DURATION ==========
def get_audio_duration(audio_path):
result = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", audio_path],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
return float(result.stdout)
# ========== TTS + SUBTITLES ==========
async def generate_audio_and_sentence_subs(sentences, voice="hi-IN-SwaraNeural"):
audio_path = "output.mp3"
srt_path = "subtitles.srt"
full_text = " ".join(sentences)
communicate = edge_tts.Communicate(full_text, voice)
with open(audio_path, "wb") as f:
async for chunk in communicate.stream():
if chunk["type"] == "audio":
f.write(chunk["data"])
duration = get_audio_duration(audio_path)
per_sentence = duration / len(sentences)
with open(srt_path, "w", encoding="utf-8") as f_srt:
for i, sentence in enumerate(sentences):
start = i * per_sentence
end = (i + 1) * per_sentence
def format_time(seconds):
ms = int((seconds - int(seconds)) * 1000)
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
return f"{h:02}:{m:02}:{s:02},{ms:03}"
f_srt.write(f"{i+1}\n")
f_srt.write(f"{format_time(start)} --> {format_time(end)}\n")
f_srt.write(f"{sentence.strip()}\n\n")
return audio_path, srt_path, per_sentence
# ========== VIDEO CREATION ==========
def make_video(image_paths, audio_path, srt_path, duration_per_image, font_size, font_family, output="final_video.mp4"):
with open("images.txt", "w") as f:
for path in image_paths:
f.write(f"file '{os.path.abspath(path)}'\n")
f.write(f"duration {duration_per_image}\n")
f.write(f"file '{os.path.abspath(image_paths[-1])}'\n")
subprocess.call([
"ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", "images.txt",
"-vsync", "vfr", "-pix_fmt", "yuv420p", "-vf", "scale=720:1280", "temp_video.mp4"
])
subprocess.call([
"ffmpeg", "-y", "-i", "temp_video.mp4", "-i", audio_path, "-vf",
f"subtitles={srt_path}:force_style='FontName={font_family},FontSize={font_size},BorderStyle=3,Outline=1,Shadow=0,Alignment=2'",
"-c:a", "aac", "-b:a", "192k", output
])
return output
# ========== TELEGRAM BOT ==========
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
try:
text = update.message.text.strip()
if ";" not in text:
await update.message.reply_text("Send prompt in format: `prompt;word_count;font_size;font_name`", parse_mode="Markdown")
return
prompt, word_count, font_size, font_family = [x.strip() for x in text.split(";")]
word_count = int(word_count)
font_size = int(font_size)
await update.message.reply_text("Generating script...")
script = get_script(prompt, word_count)
sentences = split_sentences(script)
image_paths = download_images(sentences)
await update.message.reply_text("Generating audio & subtitles...")
audio_path, srt_path, duration_per_image = await generate_audio_and_sentence_subs(sentences)
await update.message.reply_text("Creating video...")
video_path = make_video(image_paths, audio_path, srt_path, duration_per_image, font_size, font_family)
await update.message.reply_video(video=InputFile(video_path), caption="Here is your video")
except Exception as e:
await update.message.reply_text(f"Error: {e}")
if __name__ == "__main__":
import logging
logging.basicConfig(level=logging.INFO)
TOKEN = os.getenv("BOT_TOKEN") or "7497022424:AAG-TeECKpf9NCDZQWCIDnGHrKe2MEbk1I8"
app = ApplicationBuilder().token(TOKEN).build()
app.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_message))
app.run_polling()
|