import os import logging import requests import tempfile import uuid import torch from transformers import pipeline from faster_whisper import WhisperModel from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_openai import OpenAIEmbeddings from langchain_core.documents import Document from supabase.client import create_client try: import av # Optional: used to pre-check audio streams for robustness except Exception: # pragma: no cover av = None from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models import User # Setup logger logger = logging.getLogger("app.utils.whisper_llm") logger.setLevel(logging.INFO) if not logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter("[%(asctime)s] %(levelname)s - %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) # Whisper Model Initialization def get_whisper_model(): # Allow overrides via env vars env_device = os.getenv("FASTER_WHISPER_DEVICE") env_compute = os.getenv("FASTER_WHISPER_COMPUTE") if env_device: device = env_device logger.info(f"Using device from env FASTER_WHISPER_DEVICE={env_device}") else: if torch.cuda.is_available(): device = "cuda" logger.info("GPU detected: Using CUDA") else: device = "cpu" logger.warning("GPU not available: Falling back to CPU") if env_compute: compute_type = env_compute logger.info(f"Using compute_type from env FASTER_WHISPER_COMPUTE={env_compute}") else: compute_type = "float32" if device == "cuda" else "int8" try: model = WhisperModel("base", device=device, compute_type=compute_type) logger.info(f"Loaded Faster-Whisper model on {device} with compute_type={compute_type}") return model except Exception as e: logger.error(f"Failed to load Whisper model: {e}") raise whisper_model = get_whisper_model() # Supabase Initialization supabase_url = os.getenv("SUPABASE_URL") supabase_key = os.getenv("SUPABASE_KEY") if not supabase_url or not supabase_key: logger.error("❌ SUPABASE_URL or SUPABASE_KEY is not set in the environment.") raise RuntimeError("SUPABASE_URL and SUPABASE_KEY must be set in .env or environment variables.") try: supabase_client = create_client(supabase_url, supabase_key) logger.info("✅ Supabase client initialized successfully.") except Exception as e: logger.exception("❌ Failed to initialize Supabase client.") raise # Summarizer try: summarizer = pipeline("summarization", model="facebook/bart-large-cnn") logger.info("Hugging Face summarizer pipeline loaded successfully.") except Exception as e: logger.error(f"Failed to load summarization pipeline: {e}") raise # Chunked summarization with no word limits def summarize_in_chunks(text, chunk_size=1024, overlap=200): """ Generate comprehensive summary without word restrictions. Uses larger chunks and better overlap for more complete summaries. """ if not text or len(text.strip()) == 0: return "No content to summarize" # For very short texts, return as is if len(text.strip()) < 200: return text.strip() summaries = [] words = text.split() # If text is short enough, summarize in one go if len(words) <= chunk_size: try: result = summarizer(text, max_length=512, min_length=128, do_sample=False) return result[0]['summary_text'] except Exception as e: logger.error(f"Single chunk summarization failed: {e}") return text.strip() # For longer texts, use chunked approach with better parameters step = chunk_size - overlap for i in range(0, len(words), step): chunk = " ".join(words[i:i + chunk_size]) if len(chunk.strip()) == 0: continue try: # Use larger max_length for more comprehensive summaries result = summarizer( chunk, max_length=512, # Increased from 256 min_length=128, # Increased from 64 do_sample=False ) summaries.append(result[0]['summary_text']) except Exception as e: logger.error(f"Chunk summarization failed for chunk {i//step + 1}: {e}") # Include the chunk text as fallback summaries.append(chunk[:200] + "..." if len(chunk) > 200 else chunk) # Combine all summaries combined_summary = " ".join(summaries) # If the combined summary is still very long, do a final summarization if len(combined_summary.split()) > 1000: try: final_result = summarizer( combined_summary, max_length=800, # Allow longer final summary min_length=200, do_sample=False ) return final_result[0]['summary_text'] except Exception as e: logger.error(f"Final sum marization failed: {e}") return combined_summary[:1500] + "..." if len(combined_summary) > 1500 else combined_summary return combined_summary # Async user fetch using AsyncSession async def get_user(user_id: int, db: AsyncSession): result = await db.execute(select(User).where(User.id == user_id)) return result.scalar_one_or_none() # Core analyzer function with per-user FAISS ingestion async def analyze(video_url: str, user_id: int, db: AsyncSession): user = await get_user(user_id, db) if not user: raise ValueError(f"User with ID {user_id} not found in database.") logger.info(f"Starting video analysis for user: {user.email} (ID: {user.id})") # Step 1: Download video to temp file try: with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp: with requests.get(video_url, stream=True, timeout=60) as response: response.raise_for_status() for chunk in response.iter_content(chunk_size=8192): tmp.write(chunk) tmp_path = tmp.name # Validate the downloaded file if not os.path.exists(tmp_path) or os.path.getsize(tmp_path) == 0: raise ValueError("Downloaded video file is empty or missing") logger.info(f"Video saved to temp file: {tmp_path} (size: {os.path.getsize(tmp_path)} bytes)") except Exception as e: logger.error(f"Failed to download video: {e}") raise # Step 2: Transcribe try: # Optional pre-check: ensure the file has an audio stream if av is not None: try: with av.open(tmp_path) as container: has_audio = any(s.type == "audio" for s in container.streams) if not has_audio: logger.error("No valid audio stream in file; skipping transcription") raise IndexError("No audio stream") except IndexError: raise except Exception: # If PyAV check fails, continue and let transcribe attempt pass logger.info("Transcribing audio with Faster-Whisper...") # Get transcription result result = whisper_model.transcribe(tmp_path) # Handle different return formats from faster-whisper if isinstance(result, tuple): segments, info = result else: segments = result info = None # Extract text from segments if segments: text = " ".join(segment.text for segment in segments if hasattr(segment, 'text') and segment.text) else: text = "" logger.info(f"Transcription completed. Length: {len(text)} characters.") # Log additional info if available if info: logger.info(f"Transcription info: language={getattr(info, 'language', 'unknown')}, language_probability={getattr(info, 'language_probability', 'unknown')}") # Handle empty transcription if not text or len(text.strip()) == 0: logger.warning("Transcription resulted in empty text, using fallback") text = "No speech detected in video" except IndexError: logger.error("No valid audio stream in file; skipping transcription") text = "Transcription failed - video may be corrupted or have no audio" except Exception as e: logger.error(f"Transcription failed: {e}") logger.error(f"Error type: {type(e)}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") # Provide fallback text instead of failing completely logger.warning("Using fallback text due to transcription failure") text = "Transcription failed - video may be corrupted or have no audio" finally: # Always attempt to clean up temp file try: os.unlink(tmp_path) except Exception: pass # Step 3: Summarize try: logger.info("Summarizing transcript with Hugging Face model...") # Always generate summary regardless of text length # The summarize_in_chunks function handles short texts appropriately summary = summarize_in_chunks(text) logger.info(f"Summarization completed. Summary length: {len(summary)} characters.") except Exception as e: logger.error(f"Summarization failed: {e}") logger.warning("Using original text as summary due to summarization failure") summary = text # Use original text as fallback # Clean up temp file try: os.unlink(tmp_path) except: pass # Step 4: Save to Supabase vector store (explicit user_id) try: logger.info("Saving summary to Supabase vector store for user...") if not summary or not summary.strip(): logger.warning("Empty summary; skipping Supabase insert") else: embeddings = OpenAIEmbeddings() embedding_vector = embeddings.embed_query(summary) document_id = str(uuid.uuid4()) payload = { "id": document_id, "user_id": user_id, "content": summary, "embedding": embedding_vector, "metadata": {"user_id": user_id, "video_url": video_url}, } supabase_client.table("documents").insert(payload).execute() logger.info(f"Summary saved to Supabase for user: {user_id}") except Exception as e: logger.error(f"Failed to save to Supabase vector store: {e}") raise # Clean up temp file try: os.unlink(tmp_path) except: pass return text, summary