|
import os |
|
import json |
|
import redis |
|
import numpy as np |
|
from typing import List, Dict, Any |
|
from openai import OpenAI |
|
from components.indexers.news_indexer import get_upstash_vector_store |
|
from llama_index.core.vector_stores.types import VectorStoreQuery, MetadataFilter, MetadataFilters, FilterOperator |
|
import logging |
|
import re |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379") |
|
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") |
|
|
|
|
|
try: |
|
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True) |
|
except Exception as e: |
|
logging.error(f"β [Redis Init Error]: {e}") |
|
raise |
|
|
|
|
|
TOPICS = ["India news", "World news", "Tech news", "Finance news", "Sports news"] |
|
TOPIC_KEYS = [t.lower().replace(" news", "") for t in TOPICS] |
|
|
|
|
|
BASE_PROMPT = ( |
|
"You are Nuseβs editorial summarizer. Read the excerpts below and extract the most important stories. " |
|
"Generate exactly 3 punchy headlines, each under 20 words. Each headline must be immediately followed by a concise, single-sentence explanation of why the story matters. Do NOT include phrases like 'this is important because', 'why this matters', 'explanation:', etc. Just state the logic directly." |
|
"\n\nFormat your output ONLY as a list of items. Each item must be formatted as: 'Headline -- Explanation'." |
|
"\n\nExample:" |
|
"\n- Global Markets Rally -- Investor confidence surges on positive economic data." |
|
"\n- Tech Giant Faces Antitrust Probe -- Company dominance scrutinized amidst regulatory pressure." |
|
"\n- Climate Summit Yields No Deal -- Disagreement over carbon targets stalls progress." |
|
"\n\nIf you cannot find 3 suitable stories, generate fewer lines following the same format. Do not add any other introductory or concluding text." |
|
) |
|
|
|
|
|
|
|
|
|
def load_docs_by_topic_with_refs() -> Dict[str, List[Dict]]: |
|
topic_docs = {key: [] for key in TOPIC_KEYS} |
|
logging.info("Starting to load documents by topic from Upstash Vector Store...") |
|
try: |
|
vector_store = get_upstash_vector_store() |
|
for full_topic_name, topic_key_for_filter in zip(TOPICS, TOPIC_KEYS): |
|
filters = MetadataFilters( |
|
filters=[MetadataFilter(key="topic", value=topic_key_for_filter, operator=FilterOperator.EQ)] |
|
) |
|
dummy_vector = np.random.rand(384).tolist() |
|
query = VectorStoreQuery(query_embedding=dummy_vector, similarity_top_k=50, filters=filters) |
|
|
|
logging.info(f"π Querying for topic '{full_topic_name}' with filter value '{topic_key_for_filter}'.") |
|
result = vector_store.query(query) |
|
logging.info(f"β‘οΈ Found {len(result.nodes)} nodes for topic '{full_topic_name}'.") |
|
|
|
for node in result.nodes: |
|
content = node.get_content().strip() |
|
|
|
|
|
|
|
title = node.metadata.get("title", "No Title") |
|
url = node.metadata.get("url", "#") |
|
source = node.metadata.get("source", "Unknown Source") |
|
|
|
if content: |
|
topic_docs[topic_key_for_filter].append({ |
|
"text": content, |
|
|
|
"title": title, |
|
"url": url, |
|
"source": source |
|
}) |
|
|
|
|
|
except Exception as e: |
|
logging.error(f"β [load_docs_by_topic_with_refs Error]: {e}", exc_info=True) |
|
return topic_docs |
|
|
|
|
|
|
|
def summarize_topic(topic_key: str, docs: List[Dict], current_global_id: int) -> List[Dict]: |
|
if not docs: |
|
logging.warning(f"β οΈ No docs for topic: {topic_key}, skipping summarization.") |
|
return [], current_global_id |
|
|
|
|
|
representative_article_link = docs[0].get("url") if docs else f"https://google.com/search?q={topic_key}+news" |
|
representative_title = docs[0].get("title") if docs else f"Summary for {topic_key}" |
|
|
|
content = "\n\n---\n\n".join([str(d["text"]) for d in docs if "text" in d and d["text"] is not None]) |
|
|
|
if not content: |
|
logging.warning(f"β οΈ No valid text content found in docs for topic: {topic_key}, skipping summarization.") |
|
return [], current_global_id |
|
|
|
content = content[:12000] |
|
|
|
logging.info(f"π§ Summarizing topic via OpenAI: '{topic_key}' ({len(docs)} documents)") |
|
try: |
|
client = OpenAI(api_key=OPENAI_API_KEY) |
|
response = client.chat.completions.create( |
|
model="gpt-4", |
|
messages=[ |
|
{"role": "system", "content": BASE_PROMPT}, |
|
{"role": "user", "content": content}, |
|
], |
|
max_tokens=512, |
|
temperature=0.7, |
|
) |
|
llm_output = response.choices[0].message.content.strip() |
|
|
|
logging.info(f"Raw LLM output for topic '{topic_key}':\n---\n{llm_output}\n---") |
|
|
|
parsed_summaries = [] |
|
for line in llm_output.splitlines(): |
|
line = line.strip() |
|
if not line: |
|
continue |
|
|
|
match = re.match(r'^(?:[->β’\d\.]+\s*)?(.*?)\s*--\s*(.*)$', line) |
|
|
|
if match: |
|
headline_text = match.group(1).strip() |
|
explanation_text = match.group(2).strip() |
|
|
|
explanation_text = re.sub(r'^(?:this is important because|why this matters because|this matters because|reason:|significance:)\s*', '', explanation_text, flags=re.IGNORECASE).strip() |
|
|
|
if len(headline_text.split()) >= 2 and len(explanation_text.split()) >= 3: |
|
parsed_summaries.append({"summary": headline_text, "explanation": explanation_text}) |
|
else: |
|
logging.warning(f"Skipping line due to short/empty headline or explanation after parsing: '{line}' for topic '{topic_key}'.") |
|
else: |
|
logging.warning(f"Could not parse line: '{line}' for topic '{topic_key}'. Does it match 'Headline -- Explanation' format?") |
|
|
|
result = [] |
|
|
|
for h_item in parsed_summaries: |
|
result.append({ |
|
"summary": h_item["summary"], |
|
"explanation": h_item["explanation"], |
|
"id": current_global_id, |
|
"image_url": "https://source.unsplash.com/800x600/?news", |
|
"article_link": representative_article_link, |
|
"representative_title": representative_title |
|
}) |
|
current_global_id += 1 |
|
|
|
logging.info(f"β
Successfully generated {len(result)} summaries for topic '{topic_key}'.") |
|
return result, current_global_id |
|
except Exception as e: |
|
logging.error(f"β [Summarize topic '{topic_key}' Error]: {e}", exc_info=True) |
|
return [], current_global_id |
|
|
|
|
|
def generate_and_cache_daily_feed(): |
|
try: |
|
logging.info("π Generating daily feed...") |
|
topic_docs = load_docs_by_topic_with_refs() |
|
|
|
|
|
final_feed_structured: Dict[str, Dict[int, Dict[str, Any]]] = {} |
|
global_summary_id_counter = 1 |
|
|
|
for topic_display_name, topic_key in zip(TOPICS, TOPIC_KEYS): |
|
summaries_for_topic, updated_global_id = summarize_topic( |
|
topic_key, |
|
topic_docs.get(topic_key, []), |
|
global_summary_id_counter |
|
) |
|
|
|
|
|
global_summary_id_counter = updated_global_id |
|
|
|
|
|
topic_summary_map: Dict[int, Dict[str, Any]] = {} |
|
for summary_item in summaries_for_topic: |
|
|
|
topic_summary_map[summary_item["id"]] = { |
|
"summary": summary_item["summary"], |
|
"explanation": summary_item["explanation"], |
|
"image_url": summary_item["image_url"], |
|
"article_link": summary_item["article_link"], |
|
"representative_title": summary_item["representative_title"] |
|
} |
|
|
|
final_feed_structured[topic_key] = topic_summary_map |
|
|
|
|
|
try: |
|
cache_key = "daily_news_feed_cache" |
|
|
|
redis_client.set(cache_key, json.dumps(final_feed_structured, ensure_ascii=False)) |
|
redis_client.expire(cache_key, 86400) |
|
logging.info(f"β
Cached feed under key '{cache_key}' with 24-hour expiry.") |
|
except Exception as e: |
|
logging.error(f"β [Redis cache error]: {e}", exc_info=True) |
|
|
|
return final_feed_structured |
|
|
|
except Exception as e: |
|
logging.critical(f"β [generate_and_cache_daily_feed Overall Error]: {e}", exc_info=True) |
|
return {} |
|
|
|
|
|
def get_cached_daily_feed(): |
|
try: |
|
cache_key = "daily_news_feed_cache" |
|
cached = redis_client.get(cache_key) |
|
if cached: |
|
logging.info(f"β
Retrieved cached daily feed from '{cache_key}'.") |
|
return json.loads(cached) |
|
else: |
|
logging.info(f"βΉοΈ No cached data found under key '{cache_key}'.") |
|
return {} |
|
except Exception as e: |
|
logging.error(f"β [get_cached_daily_feed Error]: {e}", exc_info=True) |
|
return {} |
|
|
|
|
|
if __name__ == "__main__": |
|
feed = generate_and_cache_daily_feed() |
|
print("\n--- Generated Daily Feed (Structured) ---") |
|
print(json.dumps(feed, indent=2, ensure_ascii=False)) |