File size: 11,050 Bytes
69210b9 faed34c e4a76c1 2af85a2 e51955e 0820a6b e4a76c1 0820a6b ec3b991 0e7d7a3 69210b9 2af85a2 69210b9 0e7d7a3 0820a6b 69210b9 8cb2491 69210b9 6858714 e4a76c1 c8b3b66 2af85a2 79637ed c8b3b66 71257bd fbd9dbe e4a76c1 8cb2491 faed34c 0820a6b ec3b991 a7ccef6 8cb2491 ebed014 8cb2491 0820a6b 8cb2491 0820a6b ebed014 8cb2491 955fa3b 0820a6b 8cb2491 e4a76c1 0820a6b e4a76c1 ebed014 0820a6b e4a76c1 0820a6b e4a76c1 0820a6b ec3b991 0820a6b b1c1acd 3f4bef7 fbd9dbe e4a76c1 3f4bef7 0820a6b e4a76c1 0820a6b e4a76c1 0820a6b e4a76c1 3f4bef7 0820a6b ec3b991 2af85a2 8cb2491 a0be762 2af85a2 8cb2491 2af85a2 0820a6b 955fa3b e4a76c1 0820a6b a0be762 955fa3b a0be762 955fa3b a7ccef6 955fa3b e4a76c1 955fa3b a7ccef6 955fa3b a7ccef6 0820a6b 8cb2491 e4a76c1 0820a6b e4a76c1 0820a6b 955fa3b 0820a6b e4a76c1 0820a6b e4a76c1 ec3b991 0820a6b e4a76c1 7200af5 fbd9dbe 4df303e fbd9dbe 0820a6b 955fa3b e4a76c1 fbd9dbe 0820a6b e4a76c1 fbd9dbe ec3b991 fbd9dbe e4a76c1 955fa3b 0820a6b ec3b991 0820a6b ec3b991 e4a76c1 8cb2491 fbd9dbe 0820a6b e4a76c1 69210b9 fbd9dbe 69210b9 ec3b991 fbd9dbe 0820a6b e4a76c1 ec3b991 955fa3b e4a76c1 e51955e fbd9dbe e51955e 8cb2491 e4a76c1 955fa3b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
import os
import json
import redis
import numpy as np
from typing import List, Dict, Any
from openai import OpenAI
from components.indexers.news_indexer import get_upstash_vector_store
from llama_index.core.vector_stores.types import VectorStoreQuery, MetadataFilter, MetadataFilters, FilterOperator
import logging
import re
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# π Environment variables
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
# β
Redis client
try:
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
except Exception as e:
logging.error(f"β [Redis Init Error]: {e}")
raise # It's critical for caching, so raising is appropriate here
# π° Topics
TOPICS = ["India news", "World news", "Tech news", "Finance news", "Sports news"]
TOPIC_KEYS = [t.lower().replace(" news", "") for t in TOPICS]
# π§ Summarization Prompt
BASE_PROMPT = (
"You are Nuseβs editorial summarizer. Read the excerpts below and extract the most important stories. "
"Generate exactly 3 punchy headlines, each under 20 words. Each headline must be immediately followed by a concise, single-sentence explanation of why the story matters. Do NOT include phrases like 'this is important because', 'why this matters', 'explanation:', etc. Just state the logic directly."
"\n\nFormat your output ONLY as a list of items. Each item must be formatted as: 'Headline -- Explanation'."
"\n\nExample:"
"\n- Global Markets Rally -- Investor confidence surges on positive economic data."
"\n- Tech Giant Faces Antitrust Probe -- Company dominance scrutinized amidst regulatory pressure."
"\n- Climate Summit Yields No Deal -- Disagreement over carbon targets stalls progress."
"\n\nIf you cannot find 3 suitable stories, generate fewer lines following the same format. Do not add any other introductory or concluding text."
)
# π₯ Load documents and metadata
# This function will now only return 'text', 'title', 'url', 'source'
# We remove 'headline_id' from this output as it will be newly generated for summaries
def load_docs_by_topic_with_refs() -> Dict[str, List[Dict]]:
topic_docs = {key: [] for key in TOPIC_KEYS}
logging.info("Starting to load documents by topic from Upstash Vector Store...")
try:
vector_store = get_upstash_vector_store()
for full_topic_name, topic_key_for_filter in zip(TOPICS, TOPIC_KEYS):
filters = MetadataFilters(
filters=[MetadataFilter(key="topic", value=topic_key_for_filter, operator=FilterOperator.EQ)]
)
dummy_vector = np.random.rand(384).tolist()
query = VectorStoreQuery(query_embedding=dummy_vector, similarity_top_k=50, filters=filters)
logging.info(f"π Querying for topic '{full_topic_name}' with filter value '{topic_key_for_filter}'.")
result = vector_store.query(query)
logging.info(f"β‘οΈ Found {len(result.nodes)} nodes for topic '{full_topic_name}'.")
for node in result.nodes:
content = node.get_content().strip()
# We no longer need to retrieve headline_id here for the summarizer's purpose
# headline_id = node.metadata.get("headline_id")
title = node.metadata.get("title", "No Title")
url = node.metadata.get("url", "#")
source = node.metadata.get("source", "Unknown Source")
if content: # No longer checking for headline_id here
topic_docs[topic_key_for_filter].append({
"text": content,
# "headline_id": headline_id, # Removed
"title": title,
"url": url,
"source": source
})
# Removed the warning for missing headline_id since we are not relying on it here
except Exception as e:
logging.error(f"β [load_docs_by_topic_with_refs Error]: {e}", exc_info=True)
return topic_docs
# π§ͺ Topic summarizer
# Now accepts 'current_global_id' to assign sequential IDs
def summarize_topic(topic_key: str, docs: List[Dict], current_global_id: int) -> List[Dict]:
if not docs:
logging.warning(f"β οΈ No docs for topic: {topic_key}, skipping summarization.")
return [], current_global_id # Return empty list and unchanged ID
# These representative fields are for generic summary context if no specific link
representative_article_link = docs[0].get("url") if docs else f"https://google.com/search?q={topic_key}+news"
representative_title = docs[0].get("title") if docs else f"Summary for {topic_key}"
content = "\n\n---\n\n".join([str(d["text"]) for d in docs if "text" in d and d["text"] is not None])
if not content:
logging.warning(f"β οΈ No valid text content found in docs for topic: {topic_key}, skipping summarization.")
return [], current_global_id
content = content[:12000] # Truncate to avoid excessive token usage
logging.info(f"π§ Summarizing topic via OpenAI: '{topic_key}' ({len(docs)} documents)")
try:
client = OpenAI(api_key=OPENAI_API_KEY)
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": BASE_PROMPT},
{"role": "user", "content": content},
],
max_tokens=512,
temperature=0.7,
)
llm_output = response.choices[0].message.content.strip()
logging.info(f"Raw LLM output for topic '{topic_key}':\n---\n{llm_output}\n---")
parsed_summaries = [] # Renamed for clarity
for line in llm_output.splitlines():
line = line.strip()
if not line:
continue
match = re.match(r'^(?:[->β’\d\.]+\s*)?(.*?)\s*--\s*(.*)$', line)
if match:
headline_text = match.group(1).strip()
explanation_text = match.group(2).strip()
explanation_text = re.sub(r'^(?:this is important because|why this matters because|this matters because|reason:|significance:)\s*', '', explanation_text, flags=re.IGNORECASE).strip()
if len(headline_text.split()) >= 2 and len(explanation_text.split()) >= 3:
parsed_summaries.append({"summary": headline_text, "explanation": explanation_text})
else:
logging.warning(f"Skipping line due to short/empty headline or explanation after parsing: '{line}' for topic '{topic_key}'.")
else:
logging.warning(f"Could not parse line: '{line}' for topic '{topic_key}'. Does it match 'Headline -- Explanation' format?")
result = []
# Assign new sequential IDs here
for h_item in parsed_summaries:
result.append({
"summary": h_item["summary"],
"explanation": h_item["explanation"],
"id": current_global_id, # Assign the new sequential ID
"image_url": "https://source.unsplash.com/800x600/?news",
"article_link": representative_article_link,
"representative_title": representative_title
})
current_global_id += 1 # Increment for the next summary
logging.info(f"β
Successfully generated {len(result)} summaries for topic '{topic_key}'.")
return result, current_global_id # Return the summaries and the updated global ID
except Exception as e:
logging.error(f"β [Summarize topic '{topic_key}' Error]: {e}", exc_info=True)
return [], current_global_id # Return empty and unchanged ID on error
# π Generate and cache feed
def generate_and_cache_daily_feed():
try:
logging.info("π Generating daily feed...")
topic_docs = load_docs_by_topic_with_refs()
# This will hold the final structure you requested
final_feed_structured: Dict[str, Dict[int, Dict[str, Any]]] = {}
global_summary_id_counter = 1 # Initialize global counter for all summaries
for topic_display_name, topic_key in zip(TOPICS, TOPIC_KEYS):
summaries_for_topic, updated_global_id = summarize_topic(
topic_key,
topic_docs.get(topic_key, []),
global_summary_id_counter # Pass the current global ID
)
# Update the global counter for the next topic
global_summary_id_counter = updated_global_id
# Store summaries in the desired {1: data, 2: data} format
topic_summary_map: Dict[int, Dict[str, Any]] = {}
for summary_item in summaries_for_topic:
# The 'id' key in summary_item already holds the sequential ID
topic_summary_map[summary_item["id"]] = {
"summary": summary_item["summary"],
"explanation": summary_item["explanation"],
"image_url": summary_item["image_url"],
"article_link": summary_item["article_link"],
"representative_title": summary_item["representative_title"]
}
final_feed_structured[topic_key] = topic_summary_map
# Cache to Redis
try:
cache_key = "daily_news_feed_cache"
# Dump the structured dictionary
redis_client.set(cache_key, json.dumps(final_feed_structured, ensure_ascii=False))
redis_client.expire(cache_key, 86400)
logging.info(f"β
Cached feed under key '{cache_key}' with 24-hour expiry.")
except Exception as e:
logging.error(f"β [Redis cache error]: {e}", exc_info=True)
return final_feed_structured # Return the structured feed
except Exception as e:
logging.critical(f"β [generate_and_cache_daily_feed Overall Error]: {e}", exc_info=True)
return {} # Return empty dict on overall error
# π¦ Retrieve from cache
def get_cached_daily_feed():
try:
cache_key = "daily_news_feed_cache"
cached = redis_client.get(cache_key)
if cached:
logging.info(f"β
Retrieved cached daily feed from '{cache_key}'.")
return json.loads(cached)
else:
logging.info(f"βΉοΈ No cached data found under key '{cache_key}'.")
return {} # Return empty dict if no cache
except Exception as e:
logging.error(f"β [get_cached_daily_feed Error]: {e}", exc_info=True)
return {}
# π§ͺ Run if main
if __name__ == "__main__":
feed = generate_and_cache_daily_feed()
print("\n--- Generated Daily Feed (Structured) ---")
print(json.dumps(feed, indent=2, ensure_ascii=False)) |