Spaces:
Running
Running
import os | |
import logging | |
import requests | |
import tempfile | |
import uuid | |
import torch | |
from transformers import pipeline | |
from faster_whisper import WhisperModel | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_openai import OpenAIEmbeddings | |
from langchain_core.documents import Document | |
from supabase.client import create_client | |
try: | |
import av # Optional: used to pre-check audio streams for robustness | |
except Exception: # pragma: no cover | |
av = None | |
from sqlalchemy import select | |
from sqlalchemy.ext.asyncio import AsyncSession | |
from app.models import User | |
# Setup logger | |
logger = logging.getLogger("app.utils.whisper_llm") | |
logger.setLevel(logging.INFO) | |
if not logger.handlers: | |
handler = logging.StreamHandler() | |
formatter = logging.Formatter("[%(asctime)s] %(levelname)s - %(message)s") | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
# Whisper Model Initialization | |
def get_whisper_model(): | |
# Allow overrides via env vars | |
env_device = os.getenv("FASTER_WHISPER_DEVICE") | |
env_compute = os.getenv("FASTER_WHISPER_COMPUTE") | |
if env_device: | |
device = env_device | |
logger.info(f"Using device from env FASTER_WHISPER_DEVICE={env_device}") | |
else: | |
if torch.cuda.is_available(): | |
device = "cuda" | |
logger.info("GPU detected: Using CUDA") | |
else: | |
device = "cpu" | |
logger.warning("GPU not available: Falling back to CPU") | |
if env_compute: | |
compute_type = env_compute | |
logger.info(f"Using compute_type from env FASTER_WHISPER_COMPUTE={env_compute}") | |
else: | |
compute_type = "float32" if device == "cuda" else "int8" | |
try: | |
model = WhisperModel("base", device=device, compute_type=compute_type) | |
logger.info(f"Loaded Faster-Whisper model on {device} with compute_type={compute_type}") | |
return model | |
except Exception as e: | |
logger.error(f"Failed to load Whisper model: {e}") | |
raise | |
whisper_model = get_whisper_model() | |
# Supabase Initialization | |
supabase_url = os.getenv("SUPABASE_URL") | |
supabase_key = os.getenv("SUPABASE_KEY") | |
if not supabase_url or not supabase_key: | |
logger.error("❌ SUPABASE_URL or SUPABASE_KEY is not set in the environment.") | |
raise RuntimeError("SUPABASE_URL and SUPABASE_KEY must be set in .env or environment variables.") | |
try: | |
supabase_client = create_client(supabase_url, supabase_key) | |
logger.info("✅ Supabase client initialized successfully.") | |
except Exception as e: | |
logger.exception("❌ Failed to initialize Supabase client.") | |
raise | |
# Summarizer | |
try: | |
summarizer = pipeline("summarization", model="facebook/bart-large-cnn") | |
logger.info("Hugging Face summarizer pipeline loaded successfully.") | |
except Exception as e: | |
logger.error(f"Failed to load summarization pipeline: {e}") | |
raise | |
# Chunked summarization with no word limits | |
def summarize_in_chunks(text, chunk_size=1024, overlap=200): | |
""" | |
Generate comprehensive summary without word restrictions. | |
Uses larger chunks and better overlap for more complete summaries. | |
""" | |
if not text or len(text.strip()) == 0: | |
return "No content to summarize" | |
# For very short texts, return as is | |
if len(text.strip()) < 200: | |
return text.strip() | |
summaries = [] | |
words = text.split() | |
# If text is short enough, summarize in one go | |
if len(words) <= chunk_size: | |
try: | |
result = summarizer(text, max_length=512, min_length=128, do_sample=False) | |
return result[0]['summary_text'] | |
except Exception as e: | |
logger.error(f"Single chunk summarization failed: {e}") | |
return text.strip() | |
# For longer texts, use chunked approach with better parameters | |
step = chunk_size - overlap | |
for i in range(0, len(words), step): | |
chunk = " ".join(words[i:i + chunk_size]) | |
if len(chunk.strip()) == 0: | |
continue | |
try: | |
# Use larger max_length for more comprehensive summaries | |
result = summarizer( | |
chunk, | |
max_length=512, # Increased from 256 | |
min_length=128, # Increased from 64 | |
do_sample=False | |
) | |
summaries.append(result[0]['summary_text']) | |
except Exception as e: | |
logger.error(f"Chunk summarization failed for chunk {i//step + 1}: {e}") | |
# Include the chunk text as fallback | |
summaries.append(chunk[:200] + "..." if len(chunk) > 200 else chunk) | |
# Combine all summaries | |
combined_summary = " ".join(summaries) | |
# If the combined summary is still very long, do a final summarization | |
if len(combined_summary.split()) > 1000: | |
try: | |
final_result = summarizer( | |
combined_summary, | |
max_length=800, # Allow longer final summary | |
min_length=200, | |
do_sample=False | |
) | |
return final_result[0]['summary_text'] | |
except Exception as e: | |
logger.error(f"Final sum marization failed: {e}") | |
return combined_summary[:1500] + "..." if len(combined_summary) > 1500 else combined_summary | |
return combined_summary | |
# Async user fetch using AsyncSession | |
async def get_user(user_id: int, db: AsyncSession): | |
result = await db.execute(select(User).where(User.id == user_id)) | |
return result.scalar_one_or_none() | |
# Core analyzer function with per-user FAISS ingestion | |
async def analyze(video_url: str, user_id: int, db: AsyncSession): | |
user = await get_user(user_id, db) | |
if not user: | |
raise ValueError(f"User with ID {user_id} not found in database.") | |
logger.info(f"Starting video analysis for user: {user.email} (ID: {user.id})") | |
# Step 1: Download video to temp file | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp: | |
with requests.get(video_url, stream=True, timeout=60) as response: | |
response.raise_for_status() | |
for chunk in response.iter_content(chunk_size=8192): | |
tmp.write(chunk) | |
tmp_path = tmp.name | |
# Validate the downloaded file | |
if not os.path.exists(tmp_path) or os.path.getsize(tmp_path) == 0: | |
raise ValueError("Downloaded video file is empty or missing") | |
logger.info(f"Video saved to temp file: {tmp_path} (size: {os.path.getsize(tmp_path)} bytes)") | |
except Exception as e: | |
logger.error(f"Failed to download video: {e}") | |
raise | |
# Step 2: Transcribe | |
try: | |
# Optional pre-check: ensure the file has an audio stream | |
if av is not None: | |
try: | |
with av.open(tmp_path) as container: | |
has_audio = any(s.type == "audio" for s in container.streams) | |
if not has_audio: | |
logger.error("No valid audio stream in file; skipping transcription") | |
raise IndexError("No audio stream") | |
except IndexError: | |
raise | |
except Exception: | |
# If PyAV check fails, continue and let transcribe attempt | |
pass | |
logger.info("Transcribing audio with Faster-Whisper...") | |
# Get transcription result | |
result = whisper_model.transcribe(tmp_path) | |
# Handle different return formats from faster-whisper | |
if isinstance(result, tuple): | |
segments, info = result | |
else: | |
segments = result | |
info = None | |
# Extract text from segments | |
if segments: | |
text = " ".join(segment.text for segment in segments if hasattr(segment, 'text') and segment.text) | |
else: | |
text = "" | |
logger.info(f"Transcription completed. Length: {len(text)} characters.") | |
# Log additional info if available | |
if info: | |
logger.info(f"Transcription info: language={getattr(info, 'language', 'unknown')}, language_probability={getattr(info, 'language_probability', 'unknown')}") | |
# Handle empty transcription | |
if not text or len(text.strip()) == 0: | |
logger.warning("Transcription resulted in empty text, using fallback") | |
text = "No speech detected in video" | |
except IndexError: | |
logger.error("No valid audio stream in file; skipping transcription") | |
text = "Transcription failed - video may be corrupted or have no audio" | |
except Exception as e: | |
logger.error(f"Transcription failed: {e}") | |
logger.error(f"Error type: {type(e)}") | |
import traceback | |
logger.error(f"Traceback: {traceback.format_exc()}") | |
# Provide fallback text instead of failing completely | |
logger.warning("Using fallback text due to transcription failure") | |
text = "Transcription failed - video may be corrupted or have no audio" | |
finally: | |
# Always attempt to clean up temp file | |
try: | |
os.unlink(tmp_path) | |
except Exception: | |
pass | |
# Step 3: Summarize | |
try: | |
logger.info("Summarizing transcript with Hugging Face model...") | |
# Always generate summary regardless of text length | |
# The summarize_in_chunks function handles short texts appropriately | |
summary = summarize_in_chunks(text) | |
logger.info(f"Summarization completed. Summary length: {len(summary)} characters.") | |
except Exception as e: | |
logger.error(f"Summarization failed: {e}") | |
logger.warning("Using original text as summary due to summarization failure") | |
summary = text # Use original text as fallback | |
# Clean up temp file | |
try: | |
os.unlink(tmp_path) | |
except: | |
pass | |
# Step 4: Save to Supabase vector store (explicit user_id) | |
try: | |
logger.info("Saving summary to Supabase vector store for user...") | |
if not summary or not summary.strip(): | |
logger.warning("Empty summary; skipping Supabase insert") | |
else: | |
embeddings = OpenAIEmbeddings() | |
embedding_vector = embeddings.embed_query(summary) | |
document_id = str(uuid.uuid4()) | |
payload = { | |
"id": document_id, | |
"user_id": user_id, | |
"content": summary, | |
"embedding": embedding_vector, | |
"metadata": {"user_id": user_id, "video_url": video_url}, | |
} | |
supabase_client.table("documents").insert(payload).execute() | |
logger.info(f"Summary saved to Supabase for user: {user_id}") | |
except Exception as e: | |
logger.error(f"Failed to save to Supabase vector store: {e}") | |
raise | |
# Clean up temp file | |
try: | |
os.unlink(tmp_path) | |
except: | |
pass | |
return text, summary | |