pdf_to_poadcast / app.py
mlokendra's picture
update
bc28198 verified
import os
import re
import heapq
import uuid
import asyncio
import edge_tts
import gradio as gr
import nest_asyncio
from PyPDF2 import PdfReader
from pydub import AudioSegment
from transformers import pipeline
import concurrent.futures
from edge_tts import VoicesManager
import random
import time
from pydub.exceptions import CouldntDecodeError
import fitz
# Apply nested event loop patch for Jupyter/Colab
nest_asyncio.apply()
# Load LLM
generator = pipeline("text-generation",
model="unsloth/gemma-3-1b-it",
device_map='cpu',
max_new_tokens=350,
do_sample=True,
temperature=0.7,)
# Async function to get voices
async def get_english_voices():
voices = await VoicesManager.create()
voice_male = [v for v in voices.voices if v['Gender'] == 'Male' and v['Locale'].startswith("en")]
voice_female = [v for v in voices.voices if v['Gender'] == 'Female' and v['Locale'].startswith("en")]
MALE_VOICE = random.choice(voice_male)['Name'] if voice_male else "en-US-GuyNeural"
FEMALE_VOICE = random.choice(voice_female)['Name'] if voice_female else "es-ES-ElviraNeural"
return MALE_VOICE, FEMALE_VOICE
# Example usage (you must call this within async context or with asyncio.run)
MALE_VOICE, FEMALE_VOICE = asyncio.run(get_english_voices())
rate_male=-12
pitch_male=-10
pitch_female=5
rate_female=-15
rate_female_str = f"{rate_female:+d}%"
pitch_female_str = f"{pitch_female:+d}Hz"
rate_male_str = f"{rate_female:+d}%"
pitch_male_str = f"{pitch_female:+d}Hz"
KEY_TERMS = [
"model", "propose", "architecture", "performance", "accuracy", "experiment",
"framework", "design", "method", "network", "approach", "outperform",
"layer", "training", "results", "learning", "evaluate", "baseline",
"precision", "recall", "f1", "error", "metric", "loss", "time", "weight", "speed"
]
def split_sentences(text):
return re.split(r'(?<=[.!?])\s+', text.strip())
def extract_sections_from_pdf(pdf_path):
reader = PdfReader(pdf_path)
full_text = "\n".join([page.extract_text() for page in reader.pages if page.extract_text()])
full_text = re.sub(r'\n+', '\n', full_text)
print("orignial text", full_text)
section_patterns = {
"Start of podcast with first section of paper as abstract": r"\babstract\b",
"second section continuing from abstract to Overview and no required to start introductuion between host & guest directly continue in discussion": r"\bintroduction\b",
"third section continuing from Overview to methodology and no required to start introductuion between host & guest directly continue in discussion": r"\b(method(?:ology)?|proposed method|approach|model architecture|architecture|experimental setup|network design|implementation details|techniques|framework|learning algorithm|system description)\b",
"fourth and the last section continuing from methodology to conclusion and no required to start introductuion between host & guest directly continue in discussion and this is the end of conversation so conclude add thank remarks": r"\bconclusion(?:s)?\b|\bsummary\b|final thoughts\b|result(?:s)?",
}
sections = {}
matches = []
for name, pattern in section_patterns.items():
match = re.search(pattern, full_text, re.IGNORECASE| re.MULTILINE)
if match:
matches.append((match.start(), match.end(), name))
matches.sort()
for i, (start, end, name) in enumerate(matches):
section_start = end
section_end = matches[i + 1][0] if i + 1 < len(matches) else len(full_text)
section_text = full_text[section_start:section_end].strip()
# Keep up to 4 paragraphs (based on double newline)
paragraphs = section_text.split("\n\n")
limited_section_text = "\n\n".join(paragraphs[:4])
sections[name] = extract_paragraphs(section_text, max_paragraphs=4)
return sections,section_patterns
def extract_paragraphs(text, max_paragraphs=4):
# Use double newlines if present
if "\n\n" in text:
paras = text.split("\n\n")
else:
# If no clear paragraphs, group every 4 lines as one paragraph
lines = text.splitlines()
paras = ['\n'.join(lines[i:i+4]) for i in range(0, len(lines), 4)]
return "\n\n".join(paras[:max_paragraphs])
def summarize_section_by_heuristics(text, max_sentences=5):
sentences = split_sentences(text)
if len(sentences) <= max_sentences:
return text
scored = []
for idx, sent in enumerate(sentences):
score = 0
lower_sent = sent.lower()
words = lower_sent.split()
# Keyword match
score += sum(1 for word in words if word in KEY_TERMS)
# Give more weight to sentences with numbers (e.g. 85%, 0.97, etc.)
if re.search(r'\b\d+(\.\d+)?%?\b', sent): # captures decimals, integers, percentages
score += 2
# Short, information-dense sentences
if 10 < len(words) < 50:
score += 1
# Sentence position (early sentences are usually summary-like)
if idx in [0, 1]:
score += 1
scored.append((score, sent))
# Pick top sentences, preserving original order
top_sentences = heapq.nlargest(max_sentences, scored)
top_sentences = [s for _, s in sorted(top_sentences, key=lambda x: sentences.index(x[1]))]
return " ".join(top_sentences)
def generate_podcast_script(section_name, section_text):
user_prompt = f"""
You are hosting a podcast episode where two characters are having a detailed conversation about a research paper section.
Characters:
- Host: A curious and articulate individual who has read the research paper. The host asks thoughtful questions, adds light commentary, and tries to simplify the topic for listeners.
- Guest: The primary **researcher** or **author** of the paper. The guest explains the section in detail, offering technical insights, motivations, and clarifications.
Goal:
Create a **friendly, engaging, and informative** podcast-style conversation (8–10 sentences total) between the **Host** and **Guest**, focused on the section: **{section_name}**.
Section Content:
\"\"\"
{section_text}
\"\"\"
Format:
Host: ...
Guest: ...
"""
messages = [{"role": "user", "content": user_prompt}]
response = generator(messages, max_new_tokens=350, do_sample=True, temperature=0.7)
return response[0]["generated_text"]
async def generate_voice_line(text, voice, filename, rate="+0%", pitch="+0Hz"):
communicate = edge_tts.Communicate(text, voice)
communicate.rate = rate
communicate.pitch = pitch
await communicate.save(filename)
async def tts_edge_line_by_line(script):
lines = script.split('\n')
segments = []
tasks = []
filenames = []
# Prepare all tasks
for i, line in enumerate(lines):
if 'Host:' in line or 'Guest:' in line:
speaker, content = line.split(':', 1)
speaker = speaker.strip().lower()
voice = MALE_VOICE if speaker == 'host' else FEMALE_VOICE
pitch_str = pitch_male_str if speaker == 'host' else pitch_female_str
rate_str = rate_male_str if speaker == 'host' else rate_female_str
filename = f"segment_{uuid.uuid4().hex}.mp3"
filenames.append(filename)
tasks.append(generate_voice_line(content.strip(), voice, filename, rate=rate_str, pitch=pitch_str))
# Run all TTS tasks
await asyncio.gather(*tasks)
# Wait briefly to ensure files are written
time.sleep(0.3)
# Load audio files safely
for filename in filenames:
if not os.path.exists(filename) or os.path.getsize(filename) == 0:
print(f"⚠️ Skipping corrupt or empty file: {filename}")
continue
segment = AudioSegment.from_mp3(filename)
segments.append(segment)
return segments
def merge_segments(segments, output="podcast_output.mp3"):
podcast = AudioSegment.empty()
for segment in segments:
podcast += segment + AudioSegment.silent(duration=300)
podcast.export(output, format="mp3")
print(f"Podcast saved as {output}")
def process_section(section_summary_pair):
section, summary = section_summary_pair
dialogue = generate_podcast_script(section, summary)
dialogue_content = dialogue[1]["content"]
lines = dialogue_content.split('\n')
dialogue_fine = "\n".join([line for line in lines if 'Host:' in line or 'Guest:' in line]).replace("**", "")
return f"\n\n=== {section.upper()} ===\n{dialogue_fine}\n"
def process_pdf(pdf_file):
pdf_path = "uploaded_pdf.pdf"
with open(pdf_file.name, "rb") as infile, open(pdf_path, "wb") as outfile:
outfile.write(infile.read())
sections,section_patterns = extract_sections_from_pdf(pdf_path)
print("Original text extrated \n\n\n",sections)
summarized_sections = {
name: summarize_section_by_heuristics(content)
for name, content in sections.items()
}
reordered_summarized_sections = {}
for key in section_patterns:
if key in summarized_sections: # Ensure the key exists in data_dict
reordered_summarized_sections[key] = summarized_sections[key]
print(reordered_summarized_sections)
print("Summrized text . \n\n\n",reordered_summarized_sections)
section_summary_pairs = list(reordered_summarized_sections.items())
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(process_section, section_summary_pairs)
final_script = "".join(results)
print("Script final taken \n\n\n",final_script)
segments = asyncio.run(tts_edge_line_by_line(final_script))
output_audio_path = "podcast_output.mp3"
merge_segments(segments, output=output_audio_path)
os.remove(pdf_path)
return output_audio_path
iface = gr.Interface(
fn=process_pdf,
inputs=gr.File(label="Upload a PDF file"),
outputs=gr.Audio(label="Generated Podcast Audio"),
title="PDF to Podcast",
description="Upload a Research Paper PDF and get a podcast-style audio summary."
)
iface.launch(debug=True)