Yt_Playlist_app / playlist_details.py
Thanush
first commit
a030e94
import os
import re
import time
import json
import urllib.parse
import concurrent.futures
from urllib.parse import urlparse, parse_qs
from playwright.sync_api import sync_playwright
from bs4 import BeautifulSoup
from youtube_transcript_api import YouTubeTranscriptApi
from aws_llm import llm_response
from utils.generator import generate_learning_outcomes,generate_mcqs,generate_playlist_mcqs
from utils.transcript import fetch_transcript
from helper.helpers import get_thumbnail_url,parse_video_id,sanitize_filename,duration_in_range
# Prompts for description generation
DESCRIPTION_SYSTEM_PROMPT = (
"You are a professional writing assistant.\n"
"Your task is to transform the following YouTube video description into a clean, concise, and informative summary.About the Course section—similar to what you'd find on a professional course page.\n"
"Present the content as clear, well-written bullet points, each conveying one key idea.\n"
"Eliminate timestamps, repetitive phrases, promotional content, and irrelevant information.\n"
"Ensure the language is natural, professional, and sounds like it was written by a human expert.\n"
"If only the title is available, infer the likely content and generate a meaningful, accurate summary based on it.\n"
"**IMPORTANT** Do not include any introductions, explanations, or labels such as 'Summary' or 'Cleaned Description.'"
"Always provide the best possible output using your reasoning and language skills, regardless of the input quality."
)
DESCRIPTION_USER_PROMPT = (
"Material:\n{material}"
)
def get_youtube_playlist_videos(url: str, mcq_per_video=5, playlist_mcqs=10):
output = {
"playlist_info": {
"title": "",
"channel": "",
"url": url,
"channel_icon": "",
"what_you_learn": []
},
"videos": [],
"playlist_questions": []
}
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
viewport={"width": 1280, "height": 800},
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
)
page = context.new_page()
page.route("**/*.{png,jpg,jpeg,gif,svg,woff,woff2}",
lambda route: route.abort() if "i.ytimg.com/vi" not in route.request.url else route.continue_())
page.goto(url, wait_until="networkidle", timeout=30000)
# Extract playlist title
try:
playlist_title = page.evaluate(
"() => document.querySelector('h1#title yt-formatted-string')?.textContent.trim()"
)
if playlist_title:
output["playlist_info"]["title"] = playlist_title
else:
# Fallback: Try BeautifulSoup if Playwright fails
soup_title = BeautifulSoup(page.content(), "html.parser")
title_el = soup_title.select_one("h1#title yt-formatted-string")
if title_el:
output["playlist_info"]["title"] = title_el.text.strip()
except Exception as e:
print(f"⚠️ Could not extract playlist title: {e}")
# Get total video count
try:
page.wait_for_selector(".metadata-stats.style-scope.ytd-playlist-byline-renderer", timeout=10000)
video_count_text = page.evaluate(
"() => document.querySelector('.metadata-stats.style-scope.ytd-playlist-byline-renderer yt-formatted-string.byline-item span')?.textContent.trim() || '0'"
)
total_videos = int(video_count_text) if video_count_text.isdigit() else 0
output["playlist_info"]["video_count"] = total_videos
print(f"📋 Playlist contains {total_videos} videos according to YouTube")
# Improved dynamic scrolling
max_attempts = min((total_videos // 5) + 50, 200) # Cap max attempts for very large playlists
attempt = 0
prev_loaded = 0
no_change_count = 0
print(f"🖱️ Scrolling dynamically to load all videos...")
while True:
attempt += 1
# Scroll to the bottom more precisely using JavaScript
page.evaluate("""
() => {
const container = document.querySelector('ytd-playlist-video-list-renderer');
if (container) {
container.scrollTop = container.scrollHeight;
window.scrollTo(0, document.body.scrollHeight);
} else {
window.scrollTo(0, document.body.scrollHeight);
}
}
""")
# Wait for network to be idle (more robust than fixed time)
page.wait_for_timeout(1000) # Initial wait for scroll action
try:
page.wait_for_load_state("networkidle", timeout=3000) # Wait for network activity to settle
except:
pass # Continue if timeout occurs
# Count loaded videos
loaded_count = page.evaluate(
"() => document.querySelectorAll('ytd-playlist-video-renderer').length"
)
print(f" ↳ Attempt {attempt}: loaded {loaded_count}/{total_videos}")
# Check for completion or stalled loading
if loaded_count >= total_videos:
print("✅ All videos loaded!")
break
if loaded_count == prev_loaded:
no_change_count += 1
if no_change_count >= 5: # If no new videos loaded after 5 attempts
print(f"⚠️ Scrolling stalled at {loaded_count}/{total_videos} videos")
# Try one more aggressive scroll technique
try:
print("Attempting more aggressive scrolling technique...")
# Click on the last visible video to ensure it's in view
page.evaluate("""
() => {
const videos = document.querySelectorAll('ytd-playlist-video-renderer');
if (videos.length > 0) {
const lastVideo = videos[videos.length - 1];
lastVideo.scrollIntoView({behavior: 'smooth', block: 'end'});
}
}
""")
page.wait_for_timeout(2000)
# Force scroll beyond the current view
page.evaluate("""
() => {
window.scrollBy(0, window.innerHeight * 2);
}
""")
page.wait_for_timeout(2000)
# Check if this helped
new_count = page.evaluate(
"() => document.querySelectorAll('ytd-playlist-video-renderer').length"
)
if new_count > loaded_count:
print(f" ↳ Aggressive scroll worked! Now at {new_count} videos")
no_change_count = 0
prev_loaded = new_count
continue
except:
pass
print(f"⚠️ Giving up after {attempt} attempts; proceeding with {loaded_count} videos")
break
else:
no_change_count = 0
prev_loaded = loaded_count
if attempt >= max_attempts:
print(f"⚠️ Max scroll attempts reached ({max_attempts}); proceeding with {loaded_count} videos")
break
except Exception as e:
print(f"⚠️ Error during scrolling: {e}")
# Final content load and parse
# Ensure we're at the very bottom one last time
page.evaluate("""
() => {
window.scrollTo(0, document.body.scrollHeight * 2);
const container = document.querySelector('ytd-playlist-video-list-renderer');
if (container) container.scrollTop = container.scrollHeight * 2;
}
""")
page.wait_for_timeout(2000) # Give time for final items to load
loaded_video_count = page.evaluate(
"() => document.querySelectorAll('ytd-playlist-video-renderer').length"
)
print(f"🔢 Actually loaded {loaded_video_count} videos")
soup = BeautifulSoup(page.content(), "html.parser")
video_elements = soup.select("ytd-playlist-video-renderer")
if c := soup.select_one("ytd-channel-name div#text-container a"): output["playlist_info"]["channel"] = c.text.strip()
# Robust channel_icon extraction with debug output
# 1. Try playlist header avatar (most reliable)
header_avatar = soup.select_one("img.yt-core-image.yt-spec-avatar-shape__image")
if header_avatar and header_avatar.get("src"):
output["playlist_info"]["channel_icon"] = header_avatar["src"]
print("[DEBUG] channel_icon found from playlist header avatar selector (.yt-core-image.yt-spec-avatar-shape__image)")
else:
found = False
# 2. Try first video's channel avatar
if video_elements:
a = video_elements[0].select_one("a#video-title")
if a:
first_video_url = "https://www.youtube.com" + a["href"]
try:
icon_page = context.new_page()
icon_page.goto(first_video_url, wait_until="domcontentloaded", timeout=30000)
icon_page.wait_for_selector("yt-img-shadow#avatar img#img", timeout=10000)
icon_el = icon_page.query_selector("yt-img-shadow#avatar img#img")
if icon_el:
icon_src = icon_el.get_attribute("src")
if icon_src:
output["playlist_info"]["channel_icon"] = icon_src
found = True
print("[DEBUG] channel_icon found from first video owner selector (yt-img-shadow#avatar img#img)")
icon_page.close()
except Exception as e:
print(f"[DEBUG] channel_icon NOT found in first video owner selector: {e}")
if not found:
# 3. Fallback to default icon
output["playlist_info"]["channel_icon"] = "https://www.youtube.com/img/desktop/yt_1200.png"
print("[DEBUG] channel_icon fallback to default icon")
# For collecting video summaries for later playlist-level MCQ generation
all_video_summaries = []
all_transcripts = []
# Process videos
print(f"🔍 Found {len(video_elements)} videos to process")
for idx, vid in enumerate(video_elements):
a = vid.select_one("a#video-title")
if not a: continue
title = a["title"].strip()
href = a["href"]
full_url = "https://www.youtube.com" + href
thumb = get_thumbnail_url(href)
raw_dur = vid.select_one("badge-shape .badge-shape-wiz__text").text.strip() if vid.select_one("badge-shape .badge-shape-wiz__text") else "0:00"
if not duration_in_range(raw_dur):
print(f"⏭️ Skipping '{title}'—duration {raw_dur}")
continue
vid_id = parse_video_id(full_url)
transcript = ""
try:
transcript = fetch_transcript(vid_id)
except Exception as e:
print(f"⚠️ Transcript failed for {title}: {e}")
material = transcript.strip() or title
all_transcripts.append(material)
# Generate video description
user_prompt = DESCRIPTION_USER_PROMPT.format(material=material)
description, cost = llm_response(DESCRIPTION_SYSTEM_PROMPT, user_prompt)
# Generate MCQs for this video
print(f"🧩 Generating {mcq_per_video} MCQs for video: {title}")
mcqs, mcq_cost = generate_mcqs(material, mcq_per_video)
# Generate "What You'll Learn" points for this video
print(f"📝 Generating learning outcomes for video: {title}")
learning_outcomes, learn_cost = generate_learning_outcomes(material)
# Store video summary for playlist-level MCQs
all_video_summaries.append({
"title": title,
"description": description
})
if idx == 0 and not output["playlist_info"]["channel_icon"]:
try:
icon_page = context.new_page()
icon_page.goto(full_url, wait_until="domcontentloaded", timeout=15000)
icon_page.wait_for_selector("ytd-video-owner-renderer img#img", timeout=5000)
icon_el = icon_page.query_selector("ytd-video-owner-renderer img#img")
output["playlist_info"]["channel_icon"] = icon_el.get_attribute("src")
icon_page.close()
except:
pass
output["videos"].append({
"title": title,
"url": full_url,
"thumbnail": thumb,
"duration": raw_dur,
"description": description,
"questions": mcqs,
"what_you_learn": learning_outcomes
})
if (idx + 1) % 10 == 0 or idx == 0 or idx == len(video_elements) - 1:
print(f"⏱️ Processed {idx+1}/{len(video_elements)} videos ({((idx+1)/len(video_elements)*100):.1f}%)")
# After processing all videos, generate playlist-level content
print(f"🧩 Generating {playlist_mcqs} MCQs for the entire playlist")
playlist_title = output["playlist_info"]["title"]
# Generate playlist-level MCQs
playlist_mcqs_result, playlist_mcq_cost = generate_playlist_mcqs(
playlist_title,
all_video_summaries,
playlist_mcqs
)
output["playlist_questions"] = playlist_mcqs_result
# Generate overall learning outcomes for the playlist
combined_material = "\n\n".join([
f"Title: {output['playlist_info']['title']}",
*[summary["description"] for summary in all_video_summaries[:10]] # Use first 10 videos to avoid token limits
])
playlist_outcomes, playlist_learn_cost = generate_learning_outcomes(combined_material)
output["playlist_info"]["what_you_learn"] = playlist_outcomes
context.close()
browser.close()
# Ensure channel_icon is never empty
if not output["playlist_info"].get("channel_icon"):
output["playlist_info"]["channel_icon"] = "https://www.youtube.com/img/desktop/yt_1200.png"
return output
if __name__ == "__main__":
start_time = time.time()
os.environ["PYTHONUNBUFFERED"] = "1"
PLAYLIST_URL = os.getenv("PLAYLIST_URL", "https://www.youtube.com/playlist?list=PLeo1K3hjS3uuKaU2nBDwr6zrSOTzNCs0l")
# MCQ generation parameters - can be configured via environment variables
MCQ_PER_VIDEO = int(os.getenv("MCQ_PER_VIDEO", "5"))
PLAYLIST_MCQS = int(os.getenv("PLAYLIST_MCQS", "10"))
print(f"🔍 Scraping playlist: {PLAYLIST_URL}")
print(f"🧩 Will generate {MCQ_PER_VIDEO} MCQs per video and {PLAYLIST_MCQS} for the playlist")
data = get_youtube_playlist_videos(
PLAYLIST_URL,
mcq_per_video=MCQ_PER_VIDEO,
playlist_mcqs=PLAYLIST_MCQS
)
title = data["playlist_info"]["title"] or "playlist"
filename = sanitize_filename(title) + ".json"
os.makedirs("outputs", exist_ok=True)
filepath = os.path.join("outputs", filename)
elapsed = time.time() - start_time
print(f"✅ Completed in {elapsed:.2f} seconds")
print(f"📄 Saved {len(data['videos'])} videos (with descriptions, MCQs, and learning outcomes) to {filepath}")
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)