dubswayAgenticV2 / app /utils /whisper_llm.py
peace2024's picture
update chat vector
1abe985
import os
import logging
import requests
import tempfile
import uuid
import torch
from transformers import pipeline
from faster_whisper import WhisperModel
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_core.documents import Document
from supabase.client import create_client
try:
import av # Optional: used to pre-check audio streams for robustness
except Exception: # pragma: no cover
av = None
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models import User
# Setup logger
logger = logging.getLogger("app.utils.whisper_llm")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter("[%(asctime)s] %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
# Whisper Model Initialization
def get_whisper_model():
# Allow overrides via env vars
env_device = os.getenv("FASTER_WHISPER_DEVICE")
env_compute = os.getenv("FASTER_WHISPER_COMPUTE")
if env_device:
device = env_device
logger.info(f"Using device from env FASTER_WHISPER_DEVICE={env_device}")
else:
if torch.cuda.is_available():
device = "cuda"
logger.info("GPU detected: Using CUDA")
else:
device = "cpu"
logger.warning("GPU not available: Falling back to CPU")
if env_compute:
compute_type = env_compute
logger.info(f"Using compute_type from env FASTER_WHISPER_COMPUTE={env_compute}")
else:
compute_type = "float32" if device == "cuda" else "int8"
try:
model = WhisperModel("base", device=device, compute_type=compute_type)
logger.info(f"Loaded Faster-Whisper model on {device} with compute_type={compute_type}")
return model
except Exception as e:
logger.error(f"Failed to load Whisper model: {e}")
raise
whisper_model = get_whisper_model()
# Supabase Initialization
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_KEY")
if not supabase_url or not supabase_key:
logger.error("❌ SUPABASE_URL or SUPABASE_KEY is not set in the environment.")
raise RuntimeError("SUPABASE_URL and SUPABASE_KEY must be set in .env or environment variables.")
try:
supabase_client = create_client(supabase_url, supabase_key)
logger.info("✅ Supabase client initialized successfully.")
except Exception as e:
logger.exception("❌ Failed to initialize Supabase client.")
raise
# Summarizer
try:
summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
logger.info("Hugging Face summarizer pipeline loaded successfully.")
except Exception as e:
logger.error(f"Failed to load summarization pipeline: {e}")
raise
# Chunked summarization with no word limits
def summarize_in_chunks(text, chunk_size=1024, overlap=200):
"""
Generate comprehensive summary without word restrictions.
Uses larger chunks and better overlap for more complete summaries.
"""
if not text or len(text.strip()) == 0:
return "No content to summarize"
# For very short texts, return as is
if len(text.strip()) < 200:
return text.strip()
summaries = []
words = text.split()
# If text is short enough, summarize in one go
if len(words) <= chunk_size:
try:
result = summarizer(text, max_length=512, min_length=128, do_sample=False)
return result[0]['summary_text']
except Exception as e:
logger.error(f"Single chunk summarization failed: {e}")
return text.strip()
# For longer texts, use chunked approach with better parameters
step = chunk_size - overlap
for i in range(0, len(words), step):
chunk = " ".join(words[i:i + chunk_size])
if len(chunk.strip()) == 0:
continue
try:
# Use larger max_length for more comprehensive summaries
result = summarizer(
chunk,
max_length=512, # Increased from 256
min_length=128, # Increased from 64
do_sample=False
)
summaries.append(result[0]['summary_text'])
except Exception as e:
logger.error(f"Chunk summarization failed for chunk {i//step + 1}: {e}")
# Include the chunk text as fallback
summaries.append(chunk[:200] + "..." if len(chunk) > 200 else chunk)
# Combine all summaries
combined_summary = " ".join(summaries)
# If the combined summary is still very long, do a final summarization
if len(combined_summary.split()) > 1000:
try:
final_result = summarizer(
combined_summary,
max_length=800, # Allow longer final summary
min_length=200,
do_sample=False
)
return final_result[0]['summary_text']
except Exception as e:
logger.error(f"Final sum marization failed: {e}")
return combined_summary[:1500] + "..." if len(combined_summary) > 1500 else combined_summary
return combined_summary
# Async user fetch using AsyncSession
async def get_user(user_id: int, db: AsyncSession):
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
# Core analyzer function with per-user FAISS ingestion
async def analyze(video_url: str, user_id: int, db: AsyncSession):
user = await get_user(user_id, db)
if not user:
raise ValueError(f"User with ID {user_id} not found in database.")
logger.info(f"Starting video analysis for user: {user.email} (ID: {user.id})")
# Step 1: Download video to temp file
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp:
with requests.get(video_url, stream=True, timeout=60) as response:
response.raise_for_status()
for chunk in response.iter_content(chunk_size=8192):
tmp.write(chunk)
tmp_path = tmp.name
# Validate the downloaded file
if not os.path.exists(tmp_path) or os.path.getsize(tmp_path) == 0:
raise ValueError("Downloaded video file is empty or missing")
logger.info(f"Video saved to temp file: {tmp_path} (size: {os.path.getsize(tmp_path)} bytes)")
except Exception as e:
logger.error(f"Failed to download video: {e}")
raise
# Step 2: Transcribe
try:
# Optional pre-check: ensure the file has an audio stream
if av is not None:
try:
with av.open(tmp_path) as container:
has_audio = any(s.type == "audio" for s in container.streams)
if not has_audio:
logger.error("No valid audio stream in file; skipping transcription")
raise IndexError("No audio stream")
except IndexError:
raise
except Exception:
# If PyAV check fails, continue and let transcribe attempt
pass
logger.info("Transcribing audio with Faster-Whisper...")
# Get transcription result
result = whisper_model.transcribe(tmp_path)
# Handle different return formats from faster-whisper
if isinstance(result, tuple):
segments, info = result
else:
segments = result
info = None
# Extract text from segments
if segments:
text = " ".join(segment.text for segment in segments if hasattr(segment, 'text') and segment.text)
else:
text = ""
logger.info(f"Transcription completed. Length: {len(text)} characters.")
# Log additional info if available
if info:
logger.info(f"Transcription info: language={getattr(info, 'language', 'unknown')}, language_probability={getattr(info, 'language_probability', 'unknown')}")
# Handle empty transcription
if not text or len(text.strip()) == 0:
logger.warning("Transcription resulted in empty text, using fallback")
text = "No speech detected in video"
except IndexError:
logger.error("No valid audio stream in file; skipping transcription")
text = "Transcription failed - video may be corrupted or have no audio"
except Exception as e:
logger.error(f"Transcription failed: {e}")
logger.error(f"Error type: {type(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
# Provide fallback text instead of failing completely
logger.warning("Using fallback text due to transcription failure")
text = "Transcription failed - video may be corrupted or have no audio"
finally:
# Always attempt to clean up temp file
try:
os.unlink(tmp_path)
except Exception:
pass
# Step 3: Summarize
try:
logger.info("Summarizing transcript with Hugging Face model...")
# Always generate summary regardless of text length
# The summarize_in_chunks function handles short texts appropriately
summary = summarize_in_chunks(text)
logger.info(f"Summarization completed. Summary length: {len(summary)} characters.")
except Exception as e:
logger.error(f"Summarization failed: {e}")
logger.warning("Using original text as summary due to summarization failure")
summary = text # Use original text as fallback
# Clean up temp file
try:
os.unlink(tmp_path)
except:
pass
# Step 4: Save to Supabase vector store (explicit user_id)
try:
logger.info("Saving summary to Supabase vector store for user...")
if not summary or not summary.strip():
logger.warning("Empty summary; skipping Supabase insert")
else:
embeddings = OpenAIEmbeddings()
embedding_vector = embeddings.embed_query(summary)
document_id = str(uuid.uuid4())
payload = {
"id": document_id,
"user_id": user_id,
"content": summary,
"embedding": embedding_vector,
"metadata": {"user_id": user_id, "video_url": video_url},
}
supabase_client.table("documents").insert(payload).execute()
logger.info(f"Summary saved to Supabase for user: {user_id}")
except Exception as e:
logger.error(f"Failed to save to Supabase vector store: {e}")
raise
# Clean up temp file
try:
os.unlink(tmp_path)
except:
pass
return text, summary