|
import os |
|
import json |
|
import numpy as np |
|
import redis |
|
from typing import List, Dict, Any, Optional, Set |
|
from openai import OpenAI |
|
from llama_index.core.vector_stores.types import VectorStoreQuery, MetadataFilter, MetadataFilters, FilterOperator |
|
from llama_index.core.schema import TextNode |
|
from components.indexers.news_indexer import get_upstash_vector_store |
|
import logging |
|
|
|
from llama_index.core.settings import Settings |
|
from llama_index.embeddings.huggingface import HuggingFaceEmbedding |
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") |
|
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379") |
|
|
|
|
|
try: |
|
detailed_explainer_redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True) |
|
detailed_explainer_redis_client.ping() |
|
logging.info("Redis client initialized for detailed_explainer.py.") |
|
except Exception as e: |
|
logging.critical(f"β FATAL ERROR: Could not connect to Redis in detailed_explainer.py: {e}") |
|
raise |
|
|
|
|
|
DETAILED_FEED_CACHE_KEY = "detailed_news_feed_cache" |
|
|
|
|
|
try: |
|
if not hasattr(Settings, 'embed_model') or Settings.embed_model is None: |
|
logging.info("Settings.embed_model not yet configured, initializing with default HuggingFaceEmbedding.") |
|
|
|
Settings.embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/paraphrase-MiniLM-L3-v2") |
|
except Exception as e: |
|
logging.error(f"Failed to initialize Settings.embed_model in detailed_explainer: {e}") |
|
|
|
|
|
|
|
EXPLAINER_PROMPT = ( |
|
"You are an expert news analyst. Based on the following article content, " |
|
"generate a concise, detailed explanation (50-60 words) for the headline provided. " |
|
"Focus on the 'why it matters' and key context. Do not include any introductory phrases, just the explanation itself." |
|
"\n\nHeadline: {headline}" |
|
"\n\nArticle Content:\n{article_content}" |
|
"\n\nDetailed Explanation (50-60 words):" |
|
) |
|
|
|
async def get_detailed_explanation_from_vector( |
|
summary_item: Dict[str, Any], |
|
vector_store_client: Any |
|
) -> Dict[str, Any]: |
|
""" |
|
Takes a summary item, queries the vector store for its original article content, |
|
and generates a detailed explanation using an LLM. |
|
""" |
|
headline_text = summary_item["summary"] |
|
representative_article_link = summary_item["article_link"] |
|
representative_title = summary_item["representative_title"] |
|
|
|
detailed_content = "" |
|
sources_found: Set[str] = set() |
|
|
|
logging.info(f"Retrieving detailed content for headline: '{headline_text}' (from {representative_article_link})") |
|
|
|
try: |
|
query_text = f"{representative_title} {representative_article_link}" if representative_title else representative_article_link |
|
|
|
|
|
query_embedding = Settings.embed_model.get_query_embedding(query_text) |
|
|
|
filters = MetadataFilters( |
|
filters=[MetadataFilter(key="url", value=representative_article_link, operator=FilterOperator.EQ)] |
|
) |
|
|
|
query = VectorStoreQuery( |
|
query_embedding=query_embedding, |
|
similarity_top_k=5, |
|
filters=filters |
|
) |
|
result = vector_store_client.query(query) |
|
|
|
if result.nodes: |
|
for node in result.nodes: |
|
node_content = node.get_content().strip() |
|
if node_content: |
|
detailed_content += node_content + "\n\n" |
|
if "source" in node.metadata: |
|
sources_found.add(node.metadata["source"]) |
|
|
|
if not detailed_content: |
|
logging.warning(f"No usable content found in nodes retrieved for URL: {representative_article_link}. Falling back to title+url context.") |
|
detailed_content = representative_title + " " + representative_article_link |
|
|
|
else: |
|
logging.warning(f"No original article found in vector store for URL: {representative_article_link}. Using summary as context.") |
|
detailed_content = summary_item["summary"] + ". " + summary_item.get("explanation", "") |
|
|
|
except Exception as e: |
|
logging.error(f"β Error querying vector store for detailed content for '{representative_article_link}': {e}", exc_info=True) |
|
detailed_content = summary_item["summary"] + ". " + summary_item.get("explanation", "") |
|
|
|
|
|
detailed_explanation_text = "" |
|
try: |
|
client = OpenAI(api_key=OPENAI_API_KEY) |
|
if not OPENAI_API_KEY: |
|
raise ValueError("OPENAI_API_KEY is not set.") |
|
|
|
llm_response = client.chat.completions.create( |
|
model="gpt-4o", |
|
messages=[ |
|
{"role": "system", "content": "You are a concise and informative news explainer."}, |
|
{"role": "user", "content": EXPLAINER_PROMPT.format( |
|
headline=headline_text, |
|
article_content=detailed_content |
|
)}, |
|
], |
|
max_tokens=100, |
|
temperature=0.4, |
|
) |
|
detailed_explanation_text = llm_response.choices[0].message.content.strip() |
|
logging.info(f"Generated detailed explanation for '{headline_text}'.") |
|
|
|
except Exception as e: |
|
logging.error(f"β Error generating detailed explanation for '{headline_text}': {e}", exc_info=True) |
|
detailed_explanation_text = summary_item.get("explanation", "Could not generate a detailed explanation.") |
|
|
|
return { |
|
"title": headline_text, |
|
"description": detailed_explanation_text, |
|
"sources": list(sources_found) if sources_found else ["General News Sources"] |
|
} |
|
|
|
async def generate_detailed_feed( |
|
cached_feed: Dict[str, Dict[int, Dict[str, Any]]] |
|
) -> Dict[str, Dict[int, Dict[str, Any]]]: |
|
""" |
|
Generates detailed explanations for each summary in the cached feed. |
|
Does NOT cache the result internally. The caller is responsible for caching. |
|
""" |
|
if not cached_feed: |
|
logging.info("No cached feed found to generate detailed explanations from.") |
|
return {} |
|
|
|
detailed_feed_structured: Dict[str, Dict[int, Dict[str, Any]]] = {} |
|
vector_store = get_upstash_vector_store() |
|
|
|
for topic_key, summaries_map in cached_feed.items(): |
|
logging.info(f"Processing detailed explanations for topic: {topic_key}") |
|
detailed_summaries_for_topic: Dict[int, Dict[str, Any]] = {} |
|
|
|
for summary_id in sorted(summaries_map.keys()): |
|
summary_item = summaries_map[summary_id] |
|
|
|
detailed_item = await get_detailed_explanation_from_vector(summary_item, vector_store) |
|
|
|
detailed_summaries_for_topic[summary_id] = detailed_item |
|
|
|
detailed_feed_structured[topic_key] = detailed_summaries_for_topic |
|
|
|
logging.info("β
Detailed explanation generation complete.") |
|
return detailed_feed_structured |
|
|
|
|
|
def cache_detailed_feed(feed_data: Dict[str, Dict[int, Dict[str, Any]]]): |
|
"""Caches the given detailed feed data to Redis using its dedicated client.""" |
|
try: |
|
detailed_explainer_redis_client.set(DETAILED_FEED_CACHE_KEY, json.dumps(feed_data, ensure_ascii=False)) |
|
detailed_explainer_redis_client.expire(DETAILED_FEED_CACHE_KEY, 86400) |
|
logging.info(f"β
Detailed feed cached under key '{DETAILED_FEED_CACHE_KEY}' with 24-hour expiry.") |
|
except Exception as e: |
|
logging.error(f"β [Redis detailed feed caching error]: {e}", exc_info=True) |
|
raise |
|
|
|
|
|
def get_cached_detailed_feed() -> Dict[str, Dict[int, Dict[str, Any]]]: |
|
"""Retrieves the cached detailed feed from Redis using its dedicated client.""" |
|
try: |
|
cached_raw = detailed_explainer_redis_client.get(DETAILED_FEED_CACHE_KEY) |
|
if cached_raw: |
|
logging.info(f"β
Retrieved cached detailed feed from '{DETAILED_FEED_CACHE_KEY}'.") |
|
return json.loads(cached_raw) |
|
else: |
|
logging.info(f"βΉοΈ No cached detailed feed found under key '{DETAILED_FEED_CACHE_KEY}'.") |
|
return {} |
|
except Exception as e: |
|
logging.error(f"β [Redis detailed feed retrieval error]: {e}", exc_info=True) |
|
return {} |