import streamlit as st from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint, HuggingFaceEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import FAISS from langchain.prompts import PromptTemplate import requests import os # Environment variables api_key = os.getenv("HF_API_KEY") RAPIDAPI_KEY = (os.getenv("RAPIDAPI_KEY") or "").strip() if not RAPIDAPI_KEY: st.error("RAPIDAPI_KEY not set") # Check available languages via RapidAPI @st.cache_data def get_available_languages(video_id): """Check available transcript languages for a video via RapidAPI""" url = "https://youtube-transcript3.p.rapidapi.com/api/languages" querystring = {"videoId": video_id} headers = { "x-rapidapi-key": RAPIDAPI_KEY, "x-rapidapi-host": "youtube-transcript3.p.rapidapi.com" } try: response = requests.get(url, headers=headers, params=querystring, timeout=10) if response.status_code == 200: data = response.json() if data.get("success") and "languages" in data: languages = [] for lang in data["languages"]: code = lang.get("code", "") name = lang.get("name", "") languages.append((code, f"{name} ({code})")) return languages # Fallback to common languages if API fails return [ ("en", "English (en)"), ("hi", "Hindi (hi)"), ("es", "Spanish (es)"), ("fr", "French (fr)"), ("de", "German (de)"), ("ja", "Japanese (ja)"), ("pt", "Portuguese (pt)"), ("ru", "Russian (ru)") ] except Exception as e: st.warning(f"Could not fetch languages: {e}. Using common languages.") return [ ("en", "English (en)"), ("hi", "Hindi (hi)"), ("es", "Spanish (es)"), ("fr", "French (fr)"), ("de", "German (de)"), ("ja", "Japanese (ja)"), ("pt", "Portuguese (pt)"), ("ru", "Russian (ru)") ] # Transcript Fetcher @st.cache_data def get_transcript(video_id, language_code="en"): url = "https://youtube-transcript3.p.rapidapi.com/api/transcript" querystring = {"videoId": video_id, "lang": language_code} headers = { "x-rapidapi-key": RAPIDAPI_KEY, "x-rapidapi-host": "youtube-transcript3.p.rapidapi.com" } try: response = requests.get(url, headers=headers, params=querystring, timeout=10) if response.status_code != 200: st.error(f"API Error: {response.status_code}") return None data = response.json() if data.get("success") and "transcript" in data: return ' '.join([item.get('text', '') for item in data["transcript"]]) else: st.warning("Unexpected API response format") return None except Exception as e: st.error(f"Error: {str(e)}") return None # Vector Store @st.cache_data def create_vector_store(transcript): splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) docs = splitter.create_documents([transcript]) embeddings = HuggingFaceEmbeddings( model_name="intfloat/multilingual-e5-base", model_kwargs={"device": "cpu"} ) return FAISS.from_documents(docs, embeddings) # ------------------------------------------------- # 3️⃣ Model Builder # ------------------------------------------------- def build_model(model_choice, temperature=0.7): """Return the correct model and a flag indicating if it’s chat-based.""" if model_choice == "Llama-3.2-1B": llm = HuggingFaceEndpoint( repo_id="meta-llama/Llama-3.2-1B-Instruct", huggingfacehub_api_token=api_key, task="text-generation", max_new_tokens=500, temperature=temperature ) return ChatHuggingFace(llm=llm, temperature=temperature), True # (model, is_chat) elif model_choice == "Gemma-2-3B": llm = HuggingFaceEndpoint( repo_id="google/gemma-2-2b-it", huggingfacehub_api_token=api_key, task="text-generation", max_new_tokens=500 ) return ChatHuggingFace(llm=llm, temperature=temperature), True elif model_choice == "DeepSeek-685B": llm = HuggingFaceEndpoint( repo_id="deepseek-ai/DeepSeek-V3.2-Exp", huggingfacehub_api_token=api_key, task="text-generation", max_new_tokens=500 ) return ChatHuggingFace(llm=llm, temperature=temperature), True elif model_choice == "OpenAI-20B": llm = HuggingFaceEndpoint( repo_id="openai/gpt-oss-20b", huggingfacehub_api_token=api_key, task="text-generation", max_new_tokens=500 ) return ChatHuggingFace(llm=llm, temperature=temperature), True # ------------------------------------------------- # 4️⃣ Prompt Template # ------------------------------------------------- prompt_template = PromptTemplate( template=( "You are a helpful assistant.\n\n" "Answer the question using the context provided below.\n" "If the context does not mention the topic, say clearly: 'There is no mention of the topic in the video you provided.'\n" "Then, based on your own knowledge, try to answer the question.\n" "If both the context and your knowledge are insufficient, say: 'I don't know.'\n\n" "Keep the answer format neat, clean, and human-readable.\n\n" "Context:\n{context}\n\n" "Question:\n{question}" ), input_variables=["context", "question"] ) # ------------------------------------------------- # 5️⃣ Streamlit App UI # ------------------------------------------------- import re def extract_video_id(url: str) -> str: # Handles both youtube.com and youtu.be formats pattern = r"(?:v=|\/)([0-9A-Za-z_-]{11}).*" match = re.search(pattern, url) return match.group(1) if match else None st.title("🎬 YouTube Transcript Chatbot (RAG)") video_url = st.text_input("Enter YouTube Video URL", value="lv1_-RER4_I") video_id = extract_video_id(video_url) query = st.text_area("Your Query", value="What is RAG?") model_choice = st.radio("Model to Use", ["Llama-3.2-1B", "Gemma-2-3B", "DeepSeek-685B", "OpenAI-20B"]) temperature = st.slider("Temperature", 0, 100, value=50) / 100.0 # Get available languages for this video language_code = None if video_id: with st.spinner("Checking available languages..."): available_languages = get_available_languages(video_id) if available_languages: st.success(f"Found {len(available_languages)} language(s)") lang_options = {label: code for code, label in available_languages} selected_label = st.selectbox("Select Language", options=list(lang_options.keys())) language_code = lang_options[selected_label] else: st.warning("No languages found for this video.") # ------------------------------------------------- # 6️⃣ Run Chatbot # ------------------------------------------------- if st.button("Run Chatbot"): if not video_id or not query or not language_code: st.warning("⚠️ Please fill in all fields and select a language.") else: with st.spinner("Fetching transcript..."): transcript = get_transcript(video_id, language_code) if not transcript: st.error("❌ Could not fetch transcript.") else: st.success(f"✅ Transcript fetched ({len(transcript)} characters).") with st.spinner("Creating knowledge base..."): retriever = create_vector_store(transcript).as_retriever( search_type="mmr", search_kwargs={"k": 5} ) relevant_docs = retriever.invoke(query) context_text = "\n\n".join(doc.page_content for doc in relevant_docs) prompt = prompt_template.invoke({'context':context_text, 'question':query}) with st.spinner(f"Generating response using {model_choice}..."): model, is_chat = build_model(model_choice, temperature) try: if is_chat: # DeepSeek & OpenAI (chat-based) response = model.invoke(prompt) response_text = ( response.content if hasattr(response, "content") else str(response) ) else: # Flan-T5 (non-chat) response = model(prompt) if isinstance(response, list) and "generated_text" in response[0]: response_text = response[0]["generated_text"] else: response_text = str(response) st.text_area("🧠 Model Response", value=response_text, height=400) except Exception as e: st.error(f"Model generation failed: {e}") ## answer