import os import json import redis import numpy as np from typing import List, Dict, Any from openai import OpenAI from components.indexers.news_indexer import get_upstash_vector_store from llama_index.core.vector_stores.types import VectorStoreQuery, MetadataFilter, MetadataFilters, FilterOperator import logging import re # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # ๐Ÿ” Environment variables REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379") OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") # โœ… Redis client try: redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True) except Exception as e: logging.error(f"โŒ [Redis Init Error]: {e}") raise # It's critical for caching, so raising is appropriate here # ๐Ÿ“ฐ Topics TOPICS = ["India news", "World news", "Tech news", "Finance news", "Sports news"] TOPIC_KEYS = [t.lower().replace(" news", "") for t in TOPICS] # ๐Ÿง  Summarization Prompt BASE_PROMPT = ( "You are Nuseโ€™s editorial summarizer. Read the excerpts below and extract the most important stories. " "Generate exactly 3 punchy headlines, each under 20 words. Each headline must be immediately followed by a concise, single-sentence explanation of why the story matters. Do NOT include phrases like 'this is important because', 'why this matters', 'explanation:', etc. Just state the logic directly." "\n\nFormat your output ONLY as a list of items. Each item must be formatted as: 'Headline -- Explanation'." "\n\nExample:" "\n- Global Markets Rally -- Investor confidence surges on positive economic data." "\n- Tech Giant Faces Antitrust Probe -- Company dominance scrutinized amidst regulatory pressure." "\n- Climate Summit Yields No Deal -- Disagreement over carbon targets stalls progress." "\n\nIf you cannot find 3 suitable stories, generate fewer lines following the same format. Do not add any other introductory or concluding text." ) # ๐Ÿ“ฅ Load documents and metadata # This function will now only return 'text', 'title', 'url', 'source' # We remove 'headline_id' from this output as it will be newly generated for summaries def load_docs_by_topic_with_refs() -> Dict[str, List[Dict]]: topic_docs = {key: [] for key in TOPIC_KEYS} logging.info("Starting to load documents by topic from Upstash Vector Store...") try: vector_store = get_upstash_vector_store() for full_topic_name, topic_key_for_filter in zip(TOPICS, TOPIC_KEYS): filters = MetadataFilters( filters=[MetadataFilter(key="topic", value=topic_key_for_filter, operator=FilterOperator.EQ)] ) dummy_vector = np.random.rand(384).tolist() query = VectorStoreQuery(query_embedding=dummy_vector, similarity_top_k=50, filters=filters) logging.info(f"๐Ÿ”Ž Querying for topic '{full_topic_name}' with filter value '{topic_key_for_filter}'.") result = vector_store.query(query) logging.info(f"โžก๏ธ Found {len(result.nodes)} nodes for topic '{full_topic_name}'.") for node in result.nodes: content = node.get_content().strip() # We no longer need to retrieve headline_id here for the summarizer's purpose # headline_id = node.metadata.get("headline_id") title = node.metadata.get("title", "No Title") url = node.metadata.get("url", "#") source = node.metadata.get("source", "Unknown Source") if content: # No longer checking for headline_id here topic_docs[topic_key_for_filter].append({ "text": content, # "headline_id": headline_id, # Removed "title": title, "url": url, "source": source }) # Removed the warning for missing headline_id since we are not relying on it here except Exception as e: logging.error(f"โŒ [load_docs_by_topic_with_refs Error]: {e}", exc_info=True) return topic_docs # ๐Ÿงช Topic summarizer # Now accepts 'current_global_id' to assign sequential IDs def summarize_topic(topic_key: str, docs: List[Dict], current_global_id: int) -> List[Dict]: if not docs: logging.warning(f"โš ๏ธ No docs for topic: {topic_key}, skipping summarization.") return [], current_global_id # Return empty list and unchanged ID # These representative fields are for generic summary context if no specific link representative_article_link = docs[0].get("url") if docs else f"https://google.com/search?q={topic_key}+news" representative_title = docs[0].get("title") if docs else f"Summary for {topic_key}" content = "\n\n---\n\n".join([str(d["text"]) for d in docs if "text" in d and d["text"] is not None]) if not content: logging.warning(f"โš ๏ธ No valid text content found in docs for topic: {topic_key}, skipping summarization.") return [], current_global_id content = content[:12000] # Truncate to avoid excessive token usage logging.info(f"๐Ÿง  Summarizing topic via OpenAI: '{topic_key}' ({len(docs)} documents)") try: client = OpenAI(api_key=OPENAI_API_KEY) response = client.chat.completions.create( model="gpt-4", messages=[ {"role": "system", "content": BASE_PROMPT}, {"role": "user", "content": content}, ], max_tokens=512, temperature=0.7, ) llm_output = response.choices[0].message.content.strip() logging.info(f"Raw LLM output for topic '{topic_key}':\n---\n{llm_output}\n---") parsed_summaries = [] # Renamed for clarity for line in llm_output.splitlines(): line = line.strip() if not line: continue match = re.match(r'^(?:[->โ€ข\d\.]+\s*)?(.*?)\s*--\s*(.*)$', line) if match: headline_text = match.group(1).strip() explanation_text = match.group(2).strip() explanation_text = re.sub(r'^(?:this is important because|why this matters because|this matters because|reason:|significance:)\s*', '', explanation_text, flags=re.IGNORECASE).strip() if len(headline_text.split()) >= 2 and len(explanation_text.split()) >= 3: parsed_summaries.append({"summary": headline_text, "explanation": explanation_text}) else: logging.warning(f"Skipping line due to short/empty headline or explanation after parsing: '{line}' for topic '{topic_key}'.") else: logging.warning(f"Could not parse line: '{line}' for topic '{topic_key}'. Does it match 'Headline -- Explanation' format?") result = [] # Assign new sequential IDs here for h_item in parsed_summaries: result.append({ "summary": h_item["summary"], "explanation": h_item["explanation"], "id": current_global_id, # Assign the new sequential ID "image_url": "https://source.unsplash.com/800x600/?news", "article_link": representative_article_link, "representative_title": representative_title }) current_global_id += 1 # Increment for the next summary logging.info(f"โœ… Successfully generated {len(result)} summaries for topic '{topic_key}'.") return result, current_global_id # Return the summaries and the updated global ID except Exception as e: logging.error(f"โŒ [Summarize topic '{topic_key}' Error]: {e}", exc_info=True) return [], current_global_id # Return empty and unchanged ID on error # ๐Ÿš€ Generate and cache feed def generate_and_cache_daily_feed(): try: logging.info("๐Ÿ†• Generating daily feed...") topic_docs = load_docs_by_topic_with_refs() # This will hold the final structure you requested final_feed_structured: Dict[str, Dict[int, Dict[str, Any]]] = {} global_summary_id_counter = 1 # Initialize global counter for all summaries for topic_display_name, topic_key in zip(TOPICS, TOPIC_KEYS): summaries_for_topic, updated_global_id = summarize_topic( topic_key, topic_docs.get(topic_key, []), global_summary_id_counter # Pass the current global ID ) # Update the global counter for the next topic global_summary_id_counter = updated_global_id # Store summaries in the desired {1: data, 2: data} format topic_summary_map: Dict[int, Dict[str, Any]] = {} for summary_item in summaries_for_topic: # The 'id' key in summary_item already holds the sequential ID topic_summary_map[summary_item["id"]] = { "summary": summary_item["summary"], "explanation": summary_item["explanation"], "image_url": summary_item["image_url"], "article_link": summary_item["article_link"], "representative_title": summary_item["representative_title"] } final_feed_structured[topic_key] = topic_summary_map # Cache to Redis try: cache_key = "daily_news_feed_cache" # Dump the structured dictionary redis_client.set(cache_key, json.dumps(final_feed_structured, ensure_ascii=False)) redis_client.expire(cache_key, 86400) logging.info(f"โœ… Cached feed under key '{cache_key}' with 24-hour expiry.") except Exception as e: logging.error(f"โŒ [Redis cache error]: {e}", exc_info=True) return final_feed_structured # Return the structured feed except Exception as e: logging.critical(f"โŒ [generate_and_cache_daily_feed Overall Error]: {e}", exc_info=True) return {} # Return empty dict on overall error # ๐Ÿ“ฆ Retrieve from cache def get_cached_daily_feed(): try: cache_key = "daily_news_feed_cache" cached = redis_client.get(cache_key) if cached: logging.info(f"โœ… Retrieved cached daily feed from '{cache_key}'.") return json.loads(cached) else: logging.info(f"โ„น๏ธ No cached data found under key '{cache_key}'.") return {} # Return empty dict if no cache except Exception as e: logging.error(f"โŒ [get_cached_daily_feed Error]: {e}", exc_info=True) return {} # ๐Ÿงช Run if main if __name__ == "__main__": feed = generate_and_cache_daily_feed() print("\n--- Generated Daily Feed (Structured) ---") print(json.dumps(feed, indent=2, ensure_ascii=False))