File size: 8,573 Bytes
994a0a2 af23d2f 994a0a2 af23d2f 994a0a2 af23d2f 994a0a2 af23d2f 994a0a2 af23d2f 994a0a2 af23d2f 994a0a2 af23d2f 994a0a2 af23d2f 994a0a2 af23d2f 994a0a2 af23d2f 994a0a2 af23d2f 994a0a2 af23d2f 994a0a2 af23d2f 994a0a2 af23d2f 994a0a2 af23d2f 994a0a2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
import os
import json
import numpy as np
import redis
from typing import List, Dict, Any, Optional, Set
from openai import OpenAI
from llama_index.core.vector_stores.types import VectorStoreQuery, MetadataFilter, MetadataFilters, FilterOperator
from llama_index.core.schema import TextNode
from components.indexers.news_indexer import get_upstash_vector_store
import logging
from llama_index.core.settings import Settings
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# π Environment variables for this module
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
# β
Redis client for this module
try:
detailed_explainer_redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
detailed_explainer_redis_client.ping()
logging.info("Redis client initialized for detailed_explainer.py.")
except Exception as e:
logging.critical(f"β FATAL ERROR: Could not connect to Redis in detailed_explainer.py: {e}")
raise
# Cache Key specific to detailed explanations
DETAILED_FEED_CACHE_KEY = "detailed_news_feed_cache"
# Ensure Settings.embed_model is configured globally.
try:
if not hasattr(Settings, 'embed_model') or Settings.embed_model is None:
logging.info("Settings.embed_model not yet configured, initializing with default HuggingFaceEmbedding.")
# Ensure this uses the same model as in news_ingest.py for consistency
Settings.embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/paraphrase-MiniLM-L3-v2")
except Exception as e:
logging.error(f"Failed to initialize Settings.embed_model in detailed_explainer: {e}")
# LLM prompt for detailed explanation
EXPLAINER_PROMPT = (
"You are an expert news analyst. Based on the following article content, "
"generate a concise, detailed explanation (50-60 words) for the headline provided. "
"Focus on the 'why it matters' and key context. Do not include any introductory phrases, just the explanation itself."
"\n\nHeadline: {headline}"
"\n\nArticle Content:\n{article_content}"
"\n\nDetailed Explanation (50-60 words):"
)
async def get_detailed_explanation_from_vector(
summary_item: Dict[str, Any],
vector_store_client: Any
) -> Dict[str, Any]:
"""
Takes a summary item, queries the vector store for its original article content,
and generates a detailed explanation using an LLM.
"""
headline_text = summary_item["summary"]
representative_article_link = summary_item["article_link"]
representative_title = summary_item["representative_title"]
detailed_content = ""
sources_found: Set[str] = set()
logging.info(f"Retrieving detailed content for headline: '{headline_text}' (from {representative_article_link})")
try:
query_text = f"{representative_title} {representative_article_link}" if representative_title else representative_article_link
# --- THE FIX IS HERE: Use .get_query_embedding() ---
query_embedding = Settings.embed_model.get_query_embedding(query_text)
filters = MetadataFilters(
filters=[MetadataFilter(key="url", value=representative_article_link, operator=FilterOperator.EQ)]
)
query = VectorStoreQuery(
query_embedding=query_embedding,
similarity_top_k=5,
filters=filters
)
result = vector_store_client.query(query)
if result.nodes:
for node in result.nodes:
node_content = node.get_content().strip()
if node_content:
detailed_content += node_content + "\n\n"
if "source" in node.metadata:
sources_found.add(node.metadata["source"])
if not detailed_content:
logging.warning(f"No usable content found in nodes retrieved for URL: {representative_article_link}. Falling back to title+url context.")
detailed_content = representative_title + " " + representative_article_link
else:
logging.warning(f"No original article found in vector store for URL: {representative_article_link}. Using summary as context.")
detailed_content = summary_item["summary"] + ". " + summary_item.get("explanation", "")
except Exception as e:
logging.error(f"β Error querying vector store for detailed content for '{representative_article_link}': {e}", exc_info=True)
detailed_content = summary_item["summary"] + ". " + summary_item.get("explanation", "")
# Generate detailed explanation using LLM
detailed_explanation_text = ""
try:
client = OpenAI(api_key=OPENAI_API_KEY)
if not OPENAI_API_KEY:
raise ValueError("OPENAI_API_KEY is not set.")
llm_response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a concise and informative news explainer."},
{"role": "user", "content": EXPLAINER_PROMPT.format(
headline=headline_text,
article_content=detailed_content
)},
],
max_tokens=100,
temperature=0.4,
)
detailed_explanation_text = llm_response.choices[0].message.content.strip()
logging.info(f"Generated detailed explanation for '{headline_text}'.")
except Exception as e:
logging.error(f"β Error generating detailed explanation for '{headline_text}': {e}", exc_info=True)
detailed_explanation_text = summary_item.get("explanation", "Could not generate a detailed explanation.")
return {
"title": headline_text,
"description": detailed_explanation_text,
"sources": list(sources_found) if sources_found else ["General News Sources"]
}
async def generate_detailed_feed(
cached_feed: Dict[str, Dict[int, Dict[str, Any]]]
) -> Dict[str, Dict[int, Dict[str, Any]]]:
"""
Generates detailed explanations for each summary in the cached feed.
Does NOT cache the result internally. The caller is responsible for caching.
"""
if not cached_feed:
logging.info("No cached feed found to generate detailed explanations from.")
return {}
detailed_feed_structured: Dict[str, Dict[int, Dict[str, Any]]] = {}
vector_store = get_upstash_vector_store()
for topic_key, summaries_map in cached_feed.items():
logging.info(f"Processing detailed explanations for topic: {topic_key}")
detailed_summaries_for_topic: Dict[int, Dict[str, Any]] = {}
for summary_id in sorted(summaries_map.keys()):
summary_item = summaries_map[summary_id]
detailed_item = await get_detailed_explanation_from_vector(summary_item, vector_store)
detailed_summaries_for_topic[summary_id] = detailed_item
detailed_feed_structured[topic_key] = detailed_summaries_for_topic
logging.info("β
Detailed explanation generation complete.")
return detailed_feed_structured
def cache_detailed_feed(feed_data: Dict[str, Dict[int, Dict[str, Any]]]):
"""Caches the given detailed feed data to Redis using its dedicated client."""
try:
detailed_explainer_redis_client.set(DETAILED_FEED_CACHE_KEY, json.dumps(feed_data, ensure_ascii=False))
detailed_explainer_redis_client.expire(DETAILED_FEED_CACHE_KEY, 86400)
logging.info(f"β
Detailed feed cached under key '{DETAILED_FEED_CACHE_KEY}' with 24-hour expiry.")
except Exception as e:
logging.error(f"β [Redis detailed feed caching error]: {e}", exc_info=True)
raise
def get_cached_detailed_feed() -> Dict[str, Dict[int, Dict[str, Any]]]:
"""Retrieves the cached detailed feed from Redis using its dedicated client."""
try:
cached_raw = detailed_explainer_redis_client.get(DETAILED_FEED_CACHE_KEY)
if cached_raw:
logging.info(f"β
Retrieved cached detailed feed from '{DETAILED_FEED_CACHE_KEY}'.")
return json.loads(cached_raw)
else:
logging.info(f"βΉοΈ No cached detailed feed found under key '{DETAILED_FEED_CACHE_KEY}'.")
return {}
except Exception as e:
logging.error(f"β [Redis detailed feed retrieval error]: {e}", exc_info=True)
return {} |