Spaces:
Running
Running
import os | |
import re | |
import heapq | |
import uuid | |
import asyncio | |
import edge_tts | |
import gradio as gr | |
import nest_asyncio | |
from PyPDF2 import PdfReader | |
from pydub import AudioSegment | |
from transformers import pipeline | |
import concurrent.futures | |
from edge_tts import VoicesManager | |
import random | |
import time | |
from pydub.exceptions import CouldntDecodeError | |
import fitz | |
# Apply nested event loop patch for Jupyter/Colab | |
nest_asyncio.apply() | |
# Load LLM | |
generator = pipeline("text-generation", | |
model="unsloth/gemma-3-1b-it", | |
device_map='cpu', | |
max_new_tokens=350, | |
do_sample=True, | |
temperature=0.7,) | |
# Async function to get voices | |
async def get_english_voices(): | |
voices = await VoicesManager.create() | |
voice_male = [v for v in voices.voices if v['Gender'] == 'Male' and v['Locale'].startswith("en")] | |
voice_female = [v for v in voices.voices if v['Gender'] == 'Female' and v['Locale'].startswith("en")] | |
MALE_VOICE = random.choice(voice_male)['Name'] if voice_male else "en-US-GuyNeural" | |
FEMALE_VOICE = random.choice(voice_female)['Name'] if voice_female else "es-ES-ElviraNeural" | |
return MALE_VOICE, FEMALE_VOICE | |
# Example usage (you must call this within async context or with asyncio.run) | |
MALE_VOICE, FEMALE_VOICE = asyncio.run(get_english_voices()) | |
rate_male=-12 | |
pitch_male=-10 | |
pitch_female=5 | |
rate_female=-15 | |
rate_female_str = f"{rate_female:+d}%" | |
pitch_female_str = f"{pitch_female:+d}Hz" | |
rate_male_str = f"{rate_female:+d}%" | |
pitch_male_str = f"{pitch_female:+d}Hz" | |
KEY_TERMS = [ | |
"model", "propose", "architecture", "performance", "accuracy", "experiment", | |
"framework", "design", "method", "network", "approach", "outperform", | |
"layer", "training", "results", "learning", "evaluate", "baseline", | |
"precision", "recall", "f1", "error", "metric", "loss", "time", "weight", "speed" | |
] | |
def split_sentences(text): | |
return re.split(r'(?<=[.!?])\s+', text.strip()) | |
def extract_sections_from_pdf(pdf_path): | |
reader = PdfReader(pdf_path) | |
full_text = "\n".join([page.extract_text() for page in reader.pages if page.extract_text()]) | |
full_text = re.sub(r'\n+', '\n', full_text) | |
print("orignial text", full_text) | |
section_patterns = { | |
"Start of podcast with first section of paper as abstract": r"\babstract\b", | |
"second section continuing from abstract to Overview and no required to start introductuion between host & guest directly continue in discussion": r"\bintroduction\b", | |
"third section continuing from Overview to methodology and no required to start introductuion between host & guest directly continue in discussion": r"\b(method(?:ology)?|proposed method|approach|model architecture|architecture|experimental setup|network design|implementation details|techniques|framework|learning algorithm|system description)\b", | |
"fourth and the last section continuing from methodology to conclusion and no required to start introductuion between host & guest directly continue in discussion and this is the end of conversation so conclude add thank remarks": r"\bconclusion(?:s)?\b|\bsummary\b|final thoughts\b|result(?:s)?", | |
} | |
sections = {} | |
matches = [] | |
for name, pattern in section_patterns.items(): | |
match = re.search(pattern, full_text, re.IGNORECASE| re.MULTILINE) | |
if match: | |
matches.append((match.start(), match.end(), name)) | |
matches.sort() | |
for i, (start, end, name) in enumerate(matches): | |
section_start = end | |
section_end = matches[i + 1][0] if i + 1 < len(matches) else len(full_text) | |
section_text = full_text[section_start:section_end].strip() | |
# Keep up to 4 paragraphs (based on double newline) | |
paragraphs = section_text.split("\n\n") | |
limited_section_text = "\n\n".join(paragraphs[:4]) | |
sections[name] = extract_paragraphs(section_text, max_paragraphs=4) | |
return sections,section_patterns | |
def extract_paragraphs(text, max_paragraphs=4): | |
# Use double newlines if present | |
if "\n\n" in text: | |
paras = text.split("\n\n") | |
else: | |
# If no clear paragraphs, group every 4 lines as one paragraph | |
lines = text.splitlines() | |
paras = ['\n'.join(lines[i:i+4]) for i in range(0, len(lines), 4)] | |
return "\n\n".join(paras[:max_paragraphs]) | |
def summarize_section_by_heuristics(text, max_sentences=5): | |
sentences = split_sentences(text) | |
if len(sentences) <= max_sentences: | |
return text | |
scored = [] | |
for idx, sent in enumerate(sentences): | |
score = 0 | |
lower_sent = sent.lower() | |
words = lower_sent.split() | |
# Keyword match | |
score += sum(1 for word in words if word in KEY_TERMS) | |
# Give more weight to sentences with numbers (e.g. 85%, 0.97, etc.) | |
if re.search(r'\b\d+(\.\d+)?%?\b', sent): # captures decimals, integers, percentages | |
score += 2 | |
# Short, information-dense sentences | |
if 10 < len(words) < 50: | |
score += 1 | |
# Sentence position (early sentences are usually summary-like) | |
if idx in [0, 1]: | |
score += 1 | |
scored.append((score, sent)) | |
# Pick top sentences, preserving original order | |
top_sentences = heapq.nlargest(max_sentences, scored) | |
top_sentences = [s for _, s in sorted(top_sentences, key=lambda x: sentences.index(x[1]))] | |
return " ".join(top_sentences) | |
def generate_podcast_script(section_name, section_text): | |
user_prompt = f""" | |
You are hosting a podcast episode where two characters are having a detailed conversation about a research paper section. | |
Characters: | |
- Host: A curious and articulate individual who has read the research paper. The host asks thoughtful questions, adds light commentary, and tries to simplify the topic for listeners. | |
- Guest: The primary **researcher** or **author** of the paper. The guest explains the section in detail, offering technical insights, motivations, and clarifications. | |
Goal: | |
Create a **friendly, engaging, and informative** podcast-style conversation (8–10 sentences total) between the **Host** and **Guest**, focused on the section: **{section_name}**. | |
Section Content: | |
\"\"\" | |
{section_text} | |
\"\"\" | |
Format: | |
Host: ... | |
Guest: ... | |
""" | |
messages = [{"role": "user", "content": user_prompt}] | |
response = generator(messages, max_new_tokens=350, do_sample=True, temperature=0.7) | |
return response[0]["generated_text"] | |
async def generate_voice_line(text, voice, filename, rate="+0%", pitch="+0Hz"): | |
communicate = edge_tts.Communicate(text, voice) | |
communicate.rate = rate | |
communicate.pitch = pitch | |
await communicate.save(filename) | |
async def tts_edge_line_by_line(script): | |
lines = script.split('\n') | |
segments = [] | |
tasks = [] | |
filenames = [] | |
# Prepare all tasks | |
for i, line in enumerate(lines): | |
if 'Host:' in line or 'Guest:' in line: | |
speaker, content = line.split(':', 1) | |
speaker = speaker.strip().lower() | |
voice = MALE_VOICE if speaker == 'host' else FEMALE_VOICE | |
pitch_str = pitch_male_str if speaker == 'host' else pitch_female_str | |
rate_str = rate_male_str if speaker == 'host' else rate_female_str | |
filename = f"segment_{uuid.uuid4().hex}.mp3" | |
filenames.append(filename) | |
tasks.append(generate_voice_line(content.strip(), voice, filename, rate=rate_str, pitch=pitch_str)) | |
# Run all TTS tasks | |
await asyncio.gather(*tasks) | |
# Wait briefly to ensure files are written | |
time.sleep(0.3) | |
# Load audio files safely | |
for filename in filenames: | |
if not os.path.exists(filename) or os.path.getsize(filename) == 0: | |
print(f"⚠️ Skipping corrupt or empty file: {filename}") | |
continue | |
segment = AudioSegment.from_mp3(filename) | |
segments.append(segment) | |
return segments | |
def merge_segments(segments, output="podcast_output.mp3"): | |
podcast = AudioSegment.empty() | |
for segment in segments: | |
podcast += segment + AudioSegment.silent(duration=300) | |
podcast.export(output, format="mp3") | |
print(f"Podcast saved as {output}") | |
def process_section(section_summary_pair): | |
section, summary = section_summary_pair | |
dialogue = generate_podcast_script(section, summary) | |
dialogue_content = dialogue[1]["content"] | |
lines = dialogue_content.split('\n') | |
dialogue_fine = "\n".join([line for line in lines if 'Host:' in line or 'Guest:' in line]).replace("**", "") | |
return f"\n\n=== {section.upper()} ===\n{dialogue_fine}\n" | |
def process_pdf(pdf_file): | |
pdf_path = "uploaded_pdf.pdf" | |
with open(pdf_file.name, "rb") as infile, open(pdf_path, "wb") as outfile: | |
outfile.write(infile.read()) | |
sections,section_patterns = extract_sections_from_pdf(pdf_path) | |
print("Original text extrated \n\n\n",sections) | |
summarized_sections = { | |
name: summarize_section_by_heuristics(content) | |
for name, content in sections.items() | |
} | |
reordered_summarized_sections = {} | |
for key in section_patterns: | |
if key in summarized_sections: # Ensure the key exists in data_dict | |
reordered_summarized_sections[key] = summarized_sections[key] | |
print(reordered_summarized_sections) | |
print("Summrized text . \n\n\n",reordered_summarized_sections) | |
section_summary_pairs = list(reordered_summarized_sections.items()) | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
results = executor.map(process_section, section_summary_pairs) | |
final_script = "".join(results) | |
print("Script final taken \n\n\n",final_script) | |
segments = asyncio.run(tts_edge_line_by_line(final_script)) | |
output_audio_path = "podcast_output.mp3" | |
merge_segments(segments, output=output_audio_path) | |
os.remove(pdf_path) | |
return output_audio_path | |
iface = gr.Interface( | |
fn=process_pdf, | |
inputs=gr.File(label="Upload a PDF file"), | |
outputs=gr.Audio(label="Generated Podcast Audio"), | |
title="PDF to Podcast", | |
description="Upload a Research Paper PDF and get a podcast-style audio summary." | |
) | |
iface.launch(debug=True) | |