ragV98's picture
sequential headlines
e4a76c1
import os
import json
import redis
import numpy as np
from typing import List, Dict, Any
from openai import OpenAI
from components.indexers.news_indexer import get_upstash_vector_store
from llama_index.core.vector_stores.types import VectorStoreQuery, MetadataFilter, MetadataFilters, FilterOperator
import logging
import re
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# πŸ” Environment variables
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
# βœ… Redis client
try:
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
except Exception as e:
logging.error(f"❌ [Redis Init Error]: {e}")
raise # It's critical for caching, so raising is appropriate here
# πŸ“° Topics
TOPICS = ["India news", "World news", "Tech news", "Finance news", "Sports news"]
TOPIC_KEYS = [t.lower().replace(" news", "") for t in TOPICS]
# 🧠 Summarization Prompt
BASE_PROMPT = (
"You are Nuse’s editorial summarizer. Read the excerpts below and extract the most important stories. "
"Generate exactly 3 punchy headlines, each under 20 words. Each headline must be immediately followed by a concise, single-sentence explanation of why the story matters. Do NOT include phrases like 'this is important because', 'why this matters', 'explanation:', etc. Just state the logic directly."
"\n\nFormat your output ONLY as a list of items. Each item must be formatted as: 'Headline -- Explanation'."
"\n\nExample:"
"\n- Global Markets Rally -- Investor confidence surges on positive economic data."
"\n- Tech Giant Faces Antitrust Probe -- Company dominance scrutinized amidst regulatory pressure."
"\n- Climate Summit Yields No Deal -- Disagreement over carbon targets stalls progress."
"\n\nIf you cannot find 3 suitable stories, generate fewer lines following the same format. Do not add any other introductory or concluding text."
)
# πŸ“₯ Load documents and metadata
# This function will now only return 'text', 'title', 'url', 'source'
# We remove 'headline_id' from this output as it will be newly generated for summaries
def load_docs_by_topic_with_refs() -> Dict[str, List[Dict]]:
topic_docs = {key: [] for key in TOPIC_KEYS}
logging.info("Starting to load documents by topic from Upstash Vector Store...")
try:
vector_store = get_upstash_vector_store()
for full_topic_name, topic_key_for_filter in zip(TOPICS, TOPIC_KEYS):
filters = MetadataFilters(
filters=[MetadataFilter(key="topic", value=topic_key_for_filter, operator=FilterOperator.EQ)]
)
dummy_vector = np.random.rand(384).tolist()
query = VectorStoreQuery(query_embedding=dummy_vector, similarity_top_k=50, filters=filters)
logging.info(f"πŸ”Ž Querying for topic '{full_topic_name}' with filter value '{topic_key_for_filter}'.")
result = vector_store.query(query)
logging.info(f"➑️ Found {len(result.nodes)} nodes for topic '{full_topic_name}'.")
for node in result.nodes:
content = node.get_content().strip()
# We no longer need to retrieve headline_id here for the summarizer's purpose
# headline_id = node.metadata.get("headline_id")
title = node.metadata.get("title", "No Title")
url = node.metadata.get("url", "#")
source = node.metadata.get("source", "Unknown Source")
if content: # No longer checking for headline_id here
topic_docs[topic_key_for_filter].append({
"text": content,
# "headline_id": headline_id, # Removed
"title": title,
"url": url,
"source": source
})
# Removed the warning for missing headline_id since we are not relying on it here
except Exception as e:
logging.error(f"❌ [load_docs_by_topic_with_refs Error]: {e}", exc_info=True)
return topic_docs
# πŸ§ͺ Topic summarizer
# Now accepts 'current_global_id' to assign sequential IDs
def summarize_topic(topic_key: str, docs: List[Dict], current_global_id: int) -> List[Dict]:
if not docs:
logging.warning(f"⚠️ No docs for topic: {topic_key}, skipping summarization.")
return [], current_global_id # Return empty list and unchanged ID
# These representative fields are for generic summary context if no specific link
representative_article_link = docs[0].get("url") if docs else f"https://google.com/search?q={topic_key}+news"
representative_title = docs[0].get("title") if docs else f"Summary for {topic_key}"
content = "\n\n---\n\n".join([str(d["text"]) for d in docs if "text" in d and d["text"] is not None])
if not content:
logging.warning(f"⚠️ No valid text content found in docs for topic: {topic_key}, skipping summarization.")
return [], current_global_id
content = content[:12000] # Truncate to avoid excessive token usage
logging.info(f"🧠 Summarizing topic via OpenAI: '{topic_key}' ({len(docs)} documents)")
try:
client = OpenAI(api_key=OPENAI_API_KEY)
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": BASE_PROMPT},
{"role": "user", "content": content},
],
max_tokens=512,
temperature=0.7,
)
llm_output = response.choices[0].message.content.strip()
logging.info(f"Raw LLM output for topic '{topic_key}':\n---\n{llm_output}\n---")
parsed_summaries = [] # Renamed for clarity
for line in llm_output.splitlines():
line = line.strip()
if not line:
continue
match = re.match(r'^(?:[->β€’\d\.]+\s*)?(.*?)\s*--\s*(.*)$', line)
if match:
headline_text = match.group(1).strip()
explanation_text = match.group(2).strip()
explanation_text = re.sub(r'^(?:this is important because|why this matters because|this matters because|reason:|significance:)\s*', '', explanation_text, flags=re.IGNORECASE).strip()
if len(headline_text.split()) >= 2 and len(explanation_text.split()) >= 3:
parsed_summaries.append({"summary": headline_text, "explanation": explanation_text})
else:
logging.warning(f"Skipping line due to short/empty headline or explanation after parsing: '{line}' for topic '{topic_key}'.")
else:
logging.warning(f"Could not parse line: '{line}' for topic '{topic_key}'. Does it match 'Headline -- Explanation' format?")
result = []
# Assign new sequential IDs here
for h_item in parsed_summaries:
result.append({
"summary": h_item["summary"],
"explanation": h_item["explanation"],
"id": current_global_id, # Assign the new sequential ID
"image_url": "https://source.unsplash.com/800x600/?news",
"article_link": representative_article_link,
"representative_title": representative_title
})
current_global_id += 1 # Increment for the next summary
logging.info(f"βœ… Successfully generated {len(result)} summaries for topic '{topic_key}'.")
return result, current_global_id # Return the summaries and the updated global ID
except Exception as e:
logging.error(f"❌ [Summarize topic '{topic_key}' Error]: {e}", exc_info=True)
return [], current_global_id # Return empty and unchanged ID on error
# πŸš€ Generate and cache feed
def generate_and_cache_daily_feed():
try:
logging.info("πŸ†• Generating daily feed...")
topic_docs = load_docs_by_topic_with_refs()
# This will hold the final structure you requested
final_feed_structured: Dict[str, Dict[int, Dict[str, Any]]] = {}
global_summary_id_counter = 1 # Initialize global counter for all summaries
for topic_display_name, topic_key in zip(TOPICS, TOPIC_KEYS):
summaries_for_topic, updated_global_id = summarize_topic(
topic_key,
topic_docs.get(topic_key, []),
global_summary_id_counter # Pass the current global ID
)
# Update the global counter for the next topic
global_summary_id_counter = updated_global_id
# Store summaries in the desired {1: data, 2: data} format
topic_summary_map: Dict[int, Dict[str, Any]] = {}
for summary_item in summaries_for_topic:
# The 'id' key in summary_item already holds the sequential ID
topic_summary_map[summary_item["id"]] = {
"summary": summary_item["summary"],
"explanation": summary_item["explanation"],
"image_url": summary_item["image_url"],
"article_link": summary_item["article_link"],
"representative_title": summary_item["representative_title"]
}
final_feed_structured[topic_key] = topic_summary_map
# Cache to Redis
try:
cache_key = "daily_news_feed_cache"
# Dump the structured dictionary
redis_client.set(cache_key, json.dumps(final_feed_structured, ensure_ascii=False))
redis_client.expire(cache_key, 86400)
logging.info(f"βœ… Cached feed under key '{cache_key}' with 24-hour expiry.")
except Exception as e:
logging.error(f"❌ [Redis cache error]: {e}", exc_info=True)
return final_feed_structured # Return the structured feed
except Exception as e:
logging.critical(f"❌ [generate_and_cache_daily_feed Overall Error]: {e}", exc_info=True)
return {} # Return empty dict on overall error
# πŸ“¦ Retrieve from cache
def get_cached_daily_feed():
try:
cache_key = "daily_news_feed_cache"
cached = redis_client.get(cache_key)
if cached:
logging.info(f"βœ… Retrieved cached daily feed from '{cache_key}'.")
return json.loads(cached)
else:
logging.info(f"ℹ️ No cached data found under key '{cache_key}'.")
return {} # Return empty dict if no cache
except Exception as e:
logging.error(f"❌ [get_cached_daily_feed Error]: {e}", exc_info=True)
return {}
# πŸ§ͺ Run if main
if __name__ == "__main__":
feed = generate_and_cache_daily_feed()
print("\n--- Generated Daily Feed (Structured) ---")
print(json.dumps(feed, indent=2, ensure_ascii=False))