import os import sys import json import asyncio from typing import List, Dict sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from components.indexers.news_indexer import get_or_build_index_from_docs from components.fetchers.google_search import fetch_google_news from components.fetchers.scraper import scrape_url from llama_index.core.settings import Settings from llama_index.embeddings.huggingface import HuggingFaceEmbedding from llama_index.core.schema import Document # ✅ Use local embedding model Settings.embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/paraphrase-MiniLM-L3-v2") # 🔐 Environment variables API_KEY = os.environ.get("GOOGLE_API_KEY") CSE_ID = os.environ.get("GOOGLE_CX_ID") # 📰 Topics QUERIES = ["India news", "World news", "Tech news", "Finance news", "Sports news"] # 🗂️ Paths DATA_DIR = "data/news" RAW_JSON = os.path.join(DATA_DIR, "news.jsonl") INDEX_DIR = "storage/index" # 💾 Save articles to disk def write_articles_jsonl(articles: List[Dict], file_path: str): os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, "w", encoding="utf-8") as f: for article in articles: f.write(json.dumps(article, ensure_ascii=False) + "\n") # 📄 Convert raw scraped data into Document objects async def build_documents(data: List[Dict]) -> List[Document]: # --- IMPORTANT CHANGE HERE --- # The 'data' list from all_articles already contains the 'headline_id' (which was 'counter'). # We will use that directly. return [ Document( text=entry["content"], metadata={ "headline_id": entry["headline_id"], # Use the pre-assigned ID "title": entry["title"], "url": entry["url"], "topic": entry["topic"].lower().replace(" news", ""), # normalized topic key "source": entry["source"] } ) for entry in data ] # 🚀 Main pipeline runner async def main(): if not API_KEY or not CSE_ID: raise EnvironmentError("Missing GOOGLE_API_KEY or GOOGLE_CX_ID in environment.") print("🌍 Fetching news URLs from Google...") all_articles = [] # This counter will be used for your simple sequential ID global_headline_id_counter = 1 for query in QUERIES: print(f"🔍 Searching for: {query}") try: results = fetch_google_news(query, API_KEY, CSE_ID, num_results=10) print(f" → Found {len(results)} links for '{query}'.") for item in results: url = item.get("link", "").strip() title = item.get("title", "").strip() source = item.get("displayLink", "").strip() if not url or not title: continue print(f"🌐 Scraping: {url}") article_text = scrape_url(url) if article_text: # Assign the simple sequential ID here all_articles.append({ "headline_id": global_headline_id_counter, # Assign the unique ID "topic": query, "title": title, # Keep title clean, the numbering can be for display later "url": url, "source": source, "content": article_text }) global_headline_id_counter += 1 # Increment for the next article else: print(f"⚠️ Skipped: {url}") except Exception as e: print(f"❌ Error fetching '{query}': {e}") if not all_articles: print("⚠️ No content scraped. Exiting.") return print(f"📝 Writing {len(all_articles)} articles to {RAW_JSON}...") write_articles_jsonl(all_articles, RAW_JSON) print("🧠 Building index...") documents = await build_documents(all_articles) get_or_build_index_from_docs(documents) print("✅ Indexing complete.") # 🏁 Entrypoint if __name__ == "__main__": asyncio.run(main())