fastAPIv2 / components /generators /detailed_explainer.py
ragV98's picture
phewwww
af23d2f
import os
import json
import numpy as np
import redis
from typing import List, Dict, Any, Optional, Set
from openai import OpenAI
from llama_index.core.vector_stores.types import VectorStoreQuery, MetadataFilter, MetadataFilters, FilterOperator
from llama_index.core.schema import TextNode
from components.indexers.news_indexer import get_upstash_vector_store
import logging
from llama_index.core.settings import Settings
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# πŸ” Environment variables for this module
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
# βœ… Redis client for this module
try:
detailed_explainer_redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
detailed_explainer_redis_client.ping()
logging.info("Redis client initialized for detailed_explainer.py.")
except Exception as e:
logging.critical(f"❌ FATAL ERROR: Could not connect to Redis in detailed_explainer.py: {e}")
raise
# Cache Key specific to detailed explanations
DETAILED_FEED_CACHE_KEY = "detailed_news_feed_cache"
# Ensure Settings.embed_model is configured globally.
try:
if not hasattr(Settings, 'embed_model') or Settings.embed_model is None:
logging.info("Settings.embed_model not yet configured, initializing with default HuggingFaceEmbedding.")
Settings.embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/paraphrase-MiniLM-L3-v2")
except Exception as e:
logging.error(f"Failed to initialize Settings.embed_model in detailed_explainer: {e}")
# LLM prompt for detailed explanation
EXPLAINER_PROMPT = (
"You are an expert news analyst. Based on the following article content, "
"generate a concise, detailed explanation (50-60 words) for the headline provided. "
"Focus on the 'why it matters' and key context. Do not include any introductory phrases, just the explanation itself."
"\n\nHeadline: {headline}"
"\n\nArticle Content:\n{article_content}"
"\n\nDetailed Explanation (50-60 words):"
)
async def get_detailed_explanation_from_vector(
summary_item: Dict[str, Any],
vector_store_client: Any
) -> Dict[str, Any]:
"""
Takes a summary item, queries the vector store for its original article content,
and generates a detailed explanation using an LLM.
"""
headline_text = summary_item["summary"]
representative_article_link = summary_item["article_link"]
representative_title = summary_item["representative_title"]
detailed_content = ""
sources_found: Set[str] = set()
logging.info(f"Retrieving detailed content for headline: '{headline_text}' (from {representative_article_link})")
try:
query_text = f"{representative_title} {representative_article_link}" if representative_title else representative_article_link
query_embedding = Settings.embed_model.embed_query(query_text)
filters = MetadataFilters(
filters=[MetadataFilter(key="url", value=representative_article_link, operator=FilterOperator.EQ)]
)
query = VectorStoreQuery(
query_embedding=query_embedding,
similarity_top_k=5,
filters=filters
)
result = vector_store_client.query(query)
if result.nodes:
for node in result.nodes:
node_content = node.get_content().strip()
if node_content:
detailed_content += node_content + "\n\n"
if "source" in node.metadata:
sources_found.add(node.metadata["source"])
if not detailed_content:
logging.warning(f"No usable content found in nodes retrieved for URL: {representative_article_link}. Falling back to title+url context.")
detailed_content = representative_title + " " + representative_article_link
else:
logging.warning(f"No original article found in vector store for URL: {representative_article_link}. Using summary as context.")
detailed_content = summary_item["summary"] + ". " + summary_item.get("explanation", "")
except Exception as e:
logging.error(f"❌ Error querying vector store for detailed content for '{representative_article_link}': {e}", exc_info=True)
detailed_content = summary_item["summary"] + ". " + summary_item.get("explanation", "")
# Generate detailed explanation using LLM
detailed_explanation_text = ""
try:
client = OpenAI(api_key=OPENAI_API_KEY)
if not OPENAI_API_KEY:
raise ValueError("OPENAI_API_KEY is not set.")
llm_response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a concise and informative news explainer."},
{"role": "user", "content": EXPLAINER_PROMPT.format(
headline=headline_text,
article_content=detailed_content
)},
],
max_tokens=100,
temperature=0.4,
)
detailed_explanation_text = llm_response.choices[0].message.content.strip()
logging.info(f"Generated detailed explanation for '{headline_text}'.")
except Exception as e:
logging.error(f"❌ Error generating detailed explanation for '{headline_text}': {e}", exc_info=True)
detailed_explanation_text = summary_item.get("explanation", "Could not generate a detailed explanation.")
return {
"title": headline_text,
"description": detailed_explanation_text,
"sources": list(sources_found) if sources_found else ["General News Sources"]
}
async def generate_detailed_feed(
cached_feed: Dict[str, Dict[int, Dict[str, Any]]]
) -> Dict[str, Dict[int, Dict[str, Any]]]:
"""
Generates detailed explanations for each summary in the cached feed.
Does NOT cache the result internally. The caller is responsible for caching.
"""
if not cached_feed:
logging.info("No cached feed found to generate detailed explanations from.")
return {}
detailed_feed_structured: Dict[str, Dict[int, Dict[str, Any]]] = {}
vector_store = get_upstash_vector_store()
for topic_key, summaries_map in cached_feed.items():
logging.info(f"Processing detailed explanations for topic: {topic_key}")
detailed_summaries_for_topic: Dict[int, Dict[str, Any]] = {}
for summary_id in sorted(summaries_map.keys()):
summary_item = summaries_map[summary_id]
detailed_item = await get_detailed_explanation_from_vector(summary_item, vector_store)
detailed_summaries_for_topic[summary_id] = detailed_item
detailed_feed_structured[topic_key] = detailed_summaries_for_topic
logging.info("βœ… Detailed explanation generation complete.")
return detailed_feed_structured
def cache_detailed_feed(feed_data: Dict[str, Dict[int, Dict[str, Any]]]):
"""Caches the given detailed feed data to Redis using its dedicated client."""
try:
detailed_explainer_redis_client.set(DETAILED_FEED_CACHE_KEY, json.dumps(feed_data, ensure_ascii=False))
detailed_explainer_redis_client.expire(DETAILED_FEED_CACHE_KEY, 86400)
logging.info(f"βœ… Detailed feed cached under key '{DETAILED_FEED_CACHE_KEY}' with 24-hour expiry.")
except Exception as e:
logging.error(f"❌ [Redis detailed feed caching error]: {e}", exc_info=True)
raise
def get_cached_detailed_feed() -> Dict[str, Dict[int, Dict[str, Any]]]:
"""Retrieves the cached detailed feed from Redis using its dedicated client."""
try:
cached_raw = detailed_explainer_redis_client.get(DETAILED_FEED_CACHE_KEY)
if cached_raw:
logging.info(f"βœ… Retrieved cached detailed feed from '{DETAILED_FEED_CACHE_KEY}'.")
return json.loads(cached_raw)
else:
logging.info(f"ℹ️ No cached detailed feed found under key '{DETAILED_FEED_CACHE_KEY}'.")
return {}
except Exception as e:
logging.error(f"❌ [Redis detailed feed retrieval error]: {e}", exc_info=True)
return {}