|
import os |
|
import sys |
|
import json |
|
import asyncio |
|
from typing import List, Dict |
|
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) |
|
|
|
from components.indexers.news_indexer import get_or_build_index_from_docs |
|
from components.fetchers.google_search import fetch_google_news |
|
from components.fetchers.scraper import scrape_url |
|
from llama_index.core.settings import Settings |
|
from llama_index.embeddings.huggingface import HuggingFaceEmbedding |
|
from llama_index.core.schema import Document |
|
|
|
|
|
Settings.embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/paraphrase-MiniLM-L3-v2") |
|
|
|
|
|
API_KEY = os.environ.get("GOOGLE_API_KEY") |
|
CSE_ID = os.environ.get("GOOGLE_CX_ID") |
|
|
|
|
|
QUERIES = ["India news", "World news", "Tech news", "Finance news", "Sports news"] |
|
|
|
|
|
DATA_DIR = "data/news" |
|
RAW_JSON = os.path.join(DATA_DIR, "news.jsonl") |
|
INDEX_DIR = "storage/index" |
|
|
|
|
|
def write_articles_jsonl(articles: List[Dict], file_path: str): |
|
os.makedirs(os.path.dirname(file_path), exist_ok=True) |
|
with open(file_path, "w", encoding="utf-8") as f: |
|
for article in articles: |
|
f.write(json.dumps(article, ensure_ascii=False) + "\n") |
|
|
|
|
|
async def build_documents(data: List[Dict]) -> List[Document]: |
|
|
|
|
|
|
|
return [ |
|
Document( |
|
text=entry["content"], |
|
metadata={ |
|
"headline_id": entry["headline_id"], |
|
"title": entry["title"], |
|
"url": entry["url"], |
|
"topic": entry["topic"].lower().replace(" news", ""), |
|
"source": entry["source"] |
|
} |
|
) |
|
for entry in data |
|
] |
|
|
|
|
|
async def main(): |
|
if not API_KEY or not CSE_ID: |
|
raise EnvironmentError("Missing GOOGLE_API_KEY or GOOGLE_CX_ID in environment.") |
|
|
|
print("π Fetching news URLs from Google...") |
|
|
|
all_articles = [] |
|
|
|
global_headline_id_counter = 1 |
|
|
|
for query in QUERIES: |
|
print(f"π Searching for: {query}") |
|
try: |
|
results = fetch_google_news(query, API_KEY, CSE_ID, num_results=10) |
|
print(f" β Found {len(results)} links for '{query}'.") |
|
|
|
for item in results: |
|
url = item.get("link", "").strip() |
|
title = item.get("title", "").strip() |
|
source = item.get("displayLink", "").strip() |
|
if not url or not title: |
|
continue |
|
|
|
print(f"π Scraping: {url}") |
|
article_text = scrape_url(url) |
|
|
|
if article_text: |
|
|
|
all_articles.append({ |
|
"headline_id": global_headline_id_counter, |
|
"topic": query, |
|
"title": title, |
|
"url": url, |
|
"source": source, |
|
"content": article_text |
|
}) |
|
global_headline_id_counter += 1 |
|
else: |
|
print(f"β οΈ Skipped: {url}") |
|
|
|
except Exception as e: |
|
print(f"β Error fetching '{query}': {e}") |
|
|
|
if not all_articles: |
|
print("β οΈ No content scraped. Exiting.") |
|
return |
|
|
|
print(f"π Writing {len(all_articles)} articles to {RAW_JSON}...") |
|
write_articles_jsonl(all_articles, RAW_JSON) |
|
|
|
print("π§ Building index...") |
|
documents = await build_documents(all_articles) |
|
get_or_build_index_from_docs(documents) |
|
|
|
print("β
Indexing complete.") |
|
|
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main()) |