import os import json import numpy as np import redis from typing import List, Dict, Any, Optional, Set from openai import OpenAI from llama_index.core.vector_stores.types import VectorStoreQuery, MetadataFilter, MetadataFilters, FilterOperator from llama_index.core.schema import TextNode from components.indexers.news_indexer import get_upstash_vector_store import logging from llama_index.core.settings import Settings from llama_index.embeddings.huggingface import HuggingFaceEmbedding logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # 🔐 Environment variables for this module OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379") # ✅ Redis client for this module try: detailed_explainer_redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True) detailed_explainer_redis_client.ping() logging.info("Redis client initialized for detailed_explainer.py.") except Exception as e: logging.critical(f"❌ FATAL ERROR: Could not connect to Redis in detailed_explainer.py: {e}") raise # Cache Key specific to detailed explanations DETAILED_FEED_CACHE_KEY = "detailed_news_feed_cache" # Ensure Settings.embed_model is configured globally. try: if not hasattr(Settings, 'embed_model') or Settings.embed_model is None: logging.info("Settings.embed_model not yet configured, initializing with default HuggingFaceEmbedding.") Settings.embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/paraphrase-MiniLM-L3-v2") except Exception as e: logging.error(f"Failed to initialize Settings.embed_model in detailed_explainer: {e}") # LLM prompt for detailed explanation EXPLAINER_PROMPT = ( "You are an expert news analyst. Based on the following article content, " "generate a concise, detailed explanation (50-60 words) for the headline provided. " "Focus on the 'why it matters' and key context. Do not include any introductory phrases, just the explanation itself." "\n\nHeadline: {headline}" "\n\nArticle Content:\n{article_content}" "\n\nDetailed Explanation (50-60 words):" ) async def get_detailed_explanation_from_vector( summary_item: Dict[str, Any], vector_store_client: Any ) -> Dict[str, Any]: """ Takes a summary item, queries the vector store for its original article content, and generates a detailed explanation using an LLM. """ headline_text = summary_item["summary"] representative_article_link = summary_item["article_link"] representative_title = summary_item["representative_title"] detailed_content = "" sources_found: Set[str] = set() logging.info(f"Retrieving detailed content for headline: '{headline_text}' (from {representative_article_link})") try: query_text = f"{representative_title} {representative_article_link}" if representative_title else representative_article_link query_embedding = Settings.embed_model.embed_query(query_text) filters = MetadataFilters( filters=[MetadataFilter(key="url", value=representative_article_link, operator=FilterOperator.EQ)] ) query = VectorStoreQuery( query_embedding=query_embedding, similarity_top_k=5, filters=filters ) result = vector_store_client.query(query) if result.nodes: for node in result.nodes: node_content = node.get_content().strip() if node_content: detailed_content += node_content + "\n\n" if "source" in node.metadata: sources_found.add(node.metadata["source"]) if not detailed_content: logging.warning(f"No usable content found in nodes retrieved for URL: {representative_article_link}. Falling back to title+url context.") detailed_content = representative_title + " " + representative_article_link else: logging.warning(f"No original article found in vector store for URL: {representative_article_link}. Using summary as context.") detailed_content = summary_item["summary"] + ". " + summary_item.get("explanation", "") except Exception as e: logging.error(f"❌ Error querying vector store for detailed content for '{representative_article_link}': {e}", exc_info=True) detailed_content = summary_item["summary"] + ". " + summary_item.get("explanation", "") # Generate detailed explanation using LLM detailed_explanation_text = "" try: client = OpenAI(api_key=OPENAI_API_KEY) if not OPENAI_API_KEY: raise ValueError("OPENAI_API_KEY is not set.") llm_response = client.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": "You are a concise and informative news explainer."}, {"role": "user", "content": EXPLAINER_PROMPT.format( headline=headline_text, article_content=detailed_content )}, ], max_tokens=100, temperature=0.4, ) detailed_explanation_text = llm_response.choices[0].message.content.strip() logging.info(f"Generated detailed explanation for '{headline_text}'.") except Exception as e: logging.error(f"❌ Error generating detailed explanation for '{headline_text}': {e}", exc_info=True) detailed_explanation_text = summary_item.get("explanation", "Could not generate a detailed explanation.") return { "title": headline_text, "description": detailed_explanation_text, "sources": list(sources_found) if sources_found else ["General News Sources"] } async def generate_detailed_feed( cached_feed: Dict[str, Dict[int, Dict[str, Any]]] ) -> Dict[str, Dict[int, Dict[str, Any]]]: """ Generates detailed explanations for each summary in the cached feed. Does NOT cache the result internally. The caller is responsible for caching. """ if not cached_feed: logging.info("No cached feed found to generate detailed explanations from.") return {} detailed_feed_structured: Dict[str, Dict[int, Dict[str, Any]]] = {} vector_store = get_upstash_vector_store() for topic_key, summaries_map in cached_feed.items(): logging.info(f"Processing detailed explanations for topic: {topic_key}") detailed_summaries_for_topic: Dict[int, Dict[str, Any]] = {} for summary_id in sorted(summaries_map.keys()): summary_item = summaries_map[summary_id] detailed_item = await get_detailed_explanation_from_vector(summary_item, vector_store) detailed_summaries_for_topic[summary_id] = detailed_item detailed_feed_structured[topic_key] = detailed_summaries_for_topic logging.info("✅ Detailed explanation generation complete.") return detailed_feed_structured def cache_detailed_feed(feed_data: Dict[str, Dict[int, Dict[str, Any]]]): """Caches the given detailed feed data to Redis using its dedicated client.""" try: detailed_explainer_redis_client.set(DETAILED_FEED_CACHE_KEY, json.dumps(feed_data, ensure_ascii=False)) detailed_explainer_redis_client.expire(DETAILED_FEED_CACHE_KEY, 86400) logging.info(f"✅ Detailed feed cached under key '{DETAILED_FEED_CACHE_KEY}' with 24-hour expiry.") except Exception as e: logging.error(f"❌ [Redis detailed feed caching error]: {e}", exc_info=True) raise def get_cached_detailed_feed() -> Dict[str, Dict[int, Dict[str, Any]]]: """Retrieves the cached detailed feed from Redis using its dedicated client.""" try: cached_raw = detailed_explainer_redis_client.get(DETAILED_FEED_CACHE_KEY) if cached_raw: logging.info(f"✅ Retrieved cached detailed feed from '{DETAILED_FEED_CACHE_KEY}'.") return json.loads(cached_raw) else: logging.info(f"â„šī¸ No cached detailed feed found under key '{DETAILED_FEED_CACHE_KEY}'.") return {} except Exception as e: logging.error(f"❌ [Redis detailed feed retrieval error]: {e}", exc_info=True) return {}