import os import re import heapq import uuid import asyncio import edge_tts import gradio as gr import nest_asyncio from PyPDF2 import PdfReader from pydub import AudioSegment from transformers import pipeline import concurrent.futures from edge_tts import VoicesManager import random import time from pydub.exceptions import CouldntDecodeError import fitz # Apply nested event loop patch for Jupyter/Colab nest_asyncio.apply() # Load LLM generator = pipeline("text-generation", model="unsloth/gemma-3-1b-it", device_map='cpu', max_new_tokens=350, do_sample=True, temperature=0.7,) # Async function to get voices async def get_english_voices(): voices = await VoicesManager.create() voice_male = [v for v in voices.voices if v['Gender'] == 'Male' and v['Locale'].startswith("en")] voice_female = [v for v in voices.voices if v['Gender'] == 'Female' and v['Locale'].startswith("en")] MALE_VOICE = random.choice(voice_male)['Name'] if voice_male else "en-US-GuyNeural" FEMALE_VOICE = random.choice(voice_female)['Name'] if voice_female else "es-ES-ElviraNeural" return MALE_VOICE, FEMALE_VOICE # Example usage (you must call this within async context or with asyncio.run) MALE_VOICE, FEMALE_VOICE = asyncio.run(get_english_voices()) rate_male=-12 pitch_male=-10 pitch_female=5 rate_female=-15 rate_female_str = f"{rate_female:+d}%" pitch_female_str = f"{pitch_female:+d}Hz" rate_male_str = f"{rate_female:+d}%" pitch_male_str = f"{pitch_female:+d}Hz" KEY_TERMS = [ "model", "propose", "architecture", "performance", "accuracy", "experiment", "framework", "design", "method", "network", "approach", "outperform", "layer", "training", "results", "learning", "evaluate", "baseline", "precision", "recall", "f1", "error", "metric", "loss", "time", "weight", "speed" ] def split_sentences(text): return re.split(r'(?<=[.!?])\s+', text.strip()) def extract_sections_from_pdf(pdf_path): reader = PdfReader(pdf_path) full_text = "\n".join([page.extract_text() for page in reader.pages if page.extract_text()]) full_text = re.sub(r'\n+', '\n', full_text) print("orignial text", full_text) section_patterns = { "Start of podcast with first section of paper as abstract": r"\babstract\b", "second section continuing from abstract to Overview and no required to start introductuion between host & guest directly continue in discussion": r"\bintroduction\b", "third section continuing from Overview to methodology and no required to start introductuion between host & guest directly continue in discussion": r"\b(method(?:ology)?|proposed method|approach|model architecture|architecture|experimental setup|network design|implementation details|techniques|framework|learning algorithm|system description)\b", "fourth and the last section continuing from methodology to conclusion and no required to start introductuion between host & guest directly continue in discussion and this is the end of conversation so conclude add thank remarks": r"\bconclusion(?:s)?\b|\bsummary\b|final thoughts\b|result(?:s)?", } sections = {} matches = [] for name, pattern in section_patterns.items(): match = re.search(pattern, full_text, re.IGNORECASE| re.MULTILINE) if match: matches.append((match.start(), match.end(), name)) matches.sort() for i, (start, end, name) in enumerate(matches): section_start = end section_end = matches[i + 1][0] if i + 1 < len(matches) else len(full_text) section_text = full_text[section_start:section_end].strip() # Keep up to 4 paragraphs (based on double newline) paragraphs = section_text.split("\n\n") limited_section_text = "\n\n".join(paragraphs[:4]) sections[name] = extract_paragraphs(section_text, max_paragraphs=4) return sections,section_patterns def extract_paragraphs(text, max_paragraphs=4): # Use double newlines if present if "\n\n" in text: paras = text.split("\n\n") else: # If no clear paragraphs, group every 4 lines as one paragraph lines = text.splitlines() paras = ['\n'.join(lines[i:i+4]) for i in range(0, len(lines), 4)] return "\n\n".join(paras[:max_paragraphs]) def summarize_section_by_heuristics(text, max_sentences=5): sentences = split_sentences(text) if len(sentences) <= max_sentences: return text scored = [] for idx, sent in enumerate(sentences): score = 0 lower_sent = sent.lower() words = lower_sent.split() # Keyword match score += sum(1 for word in words if word in KEY_TERMS) # Give more weight to sentences with numbers (e.g. 85%, 0.97, etc.) if re.search(r'\b\d+(\.\d+)?%?\b', sent): # captures decimals, integers, percentages score += 2 # Short, information-dense sentences if 10 < len(words) < 50: score += 1 # Sentence position (early sentences are usually summary-like) if idx in [0, 1]: score += 1 scored.append((score, sent)) # Pick top sentences, preserving original order top_sentences = heapq.nlargest(max_sentences, scored) top_sentences = [s for _, s in sorted(top_sentences, key=lambda x: sentences.index(x[1]))] return " ".join(top_sentences) def generate_podcast_script(section_name, section_text): user_prompt = f""" You are hosting a podcast episode where two characters are having a detailed conversation about a research paper section. Characters: - Host: A curious and articulate individual who has read the research paper. The host asks thoughtful questions, adds light commentary, and tries to simplify the topic for listeners. - Guest: The primary **researcher** or **author** of the paper. The guest explains the section in detail, offering technical insights, motivations, and clarifications. Goal: Create a **friendly, engaging, and informative** podcast-style conversation (8–10 sentences total) between the **Host** and **Guest**, focused on the section: **{section_name}**. Section Content: \"\"\" {section_text} \"\"\" Format: Host: ... Guest: ... """ messages = [{"role": "user", "content": user_prompt}] response = generator(messages, max_new_tokens=350, do_sample=True, temperature=0.7) return response[0]["generated_text"] async def generate_voice_line(text, voice, filename, rate="+0%", pitch="+0Hz"): communicate = edge_tts.Communicate(text, voice) communicate.rate = rate communicate.pitch = pitch await communicate.save(filename) async def tts_edge_line_by_line(script): lines = script.split('\n') segments = [] tasks = [] filenames = [] # Prepare all tasks for i, line in enumerate(lines): if 'Host:' in line or 'Guest:' in line: speaker, content = line.split(':', 1) speaker = speaker.strip().lower() voice = MALE_VOICE if speaker == 'host' else FEMALE_VOICE pitch_str = pitch_male_str if speaker == 'host' else pitch_female_str rate_str = rate_male_str if speaker == 'host' else rate_female_str filename = f"segment_{uuid.uuid4().hex}.mp3" filenames.append(filename) tasks.append(generate_voice_line(content.strip(), voice, filename, rate=rate_str, pitch=pitch_str)) # Run all TTS tasks await asyncio.gather(*tasks) # Wait briefly to ensure files are written time.sleep(0.3) # Load audio files safely for filename in filenames: if not os.path.exists(filename) or os.path.getsize(filename) == 0: print(f"⚠️ Skipping corrupt or empty file: {filename}") continue segment = AudioSegment.from_mp3(filename) segments.append(segment) return segments def merge_segments(segments, output="podcast_output.mp3"): podcast = AudioSegment.empty() for segment in segments: podcast += segment + AudioSegment.silent(duration=300) podcast.export(output, format="mp3") print(f"Podcast saved as {output}") def process_section(section_summary_pair): section, summary = section_summary_pair dialogue = generate_podcast_script(section, summary) dialogue_content = dialogue[1]["content"] lines = dialogue_content.split('\n') dialogue_fine = "\n".join([line for line in lines if 'Host:' in line or 'Guest:' in line]).replace("**", "") return f"\n\n=== {section.upper()} ===\n{dialogue_fine}\n" def process_pdf(pdf_file): pdf_path = "uploaded_pdf.pdf" with open(pdf_file.name, "rb") as infile, open(pdf_path, "wb") as outfile: outfile.write(infile.read()) sections,section_patterns = extract_sections_from_pdf(pdf_path) print("Original text extrated \n\n\n",sections) summarized_sections = { name: summarize_section_by_heuristics(content) for name, content in sections.items() } reordered_summarized_sections = {} for key in section_patterns: if key in summarized_sections: # Ensure the key exists in data_dict reordered_summarized_sections[key] = summarized_sections[key] print(reordered_summarized_sections) print("Summrized text . \n\n\n",reordered_summarized_sections) section_summary_pairs = list(reordered_summarized_sections.items()) with concurrent.futures.ThreadPoolExecutor() as executor: results = executor.map(process_section, section_summary_pairs) final_script = "".join(results) print("Script final taken \n\n\n",final_script) segments = asyncio.run(tts_edge_line_by_line(final_script)) output_audio_path = "podcast_output.mp3" merge_segments(segments, output=output_audio_path) os.remove(pdf_path) return output_audio_path iface = gr.Interface( fn=process_pdf, inputs=gr.File(label="Upload a PDF file"), outputs=gr.Audio(label="Generated Podcast Audio"), title="PDF to Podcast", description="Upload a Research Paper PDF and get a podcast-style audio summary." ) iface.launch(debug=True)