fastAPIv2 / pipeline /news_ingest.py
ragV98's picture
ref_id changes and prompt revision
0820a6b
import os
import sys
import json
import asyncio
from typing import List, Dict
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from components.indexers.news_indexer import get_or_build_index_from_docs
from components.fetchers.google_search import fetch_google_news
from components.fetchers.scraper import scrape_url
from llama_index.core.settings import Settings
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.schema import Document
# βœ… Use local embedding model
Settings.embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/paraphrase-MiniLM-L3-v2")
# πŸ” Environment variables
API_KEY = os.environ.get("GOOGLE_API_KEY")
CSE_ID = os.environ.get("GOOGLE_CX_ID")
# πŸ“° Topics
QUERIES = ["India news", "World news", "Tech news", "Finance news", "Sports news"]
# πŸ—‚οΈ Paths
DATA_DIR = "data/news"
RAW_JSON = os.path.join(DATA_DIR, "news.jsonl")
INDEX_DIR = "storage/index"
# πŸ’Ύ Save articles to disk
def write_articles_jsonl(articles: List[Dict], file_path: str):
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
for article in articles:
f.write(json.dumps(article, ensure_ascii=False) + "\n")
# πŸ“„ Convert raw scraped data into Document objects
async def build_documents(data: List[Dict]) -> List[Document]:
# --- IMPORTANT CHANGE HERE ---
# The 'data' list from all_articles already contains the 'headline_id' (which was 'counter').
# We will use that directly.
return [
Document(
text=entry["content"],
metadata={
"headline_id": entry["headline_id"], # Use the pre-assigned ID
"title": entry["title"],
"url": entry["url"],
"topic": entry["topic"].lower().replace(" news", ""), # normalized topic key
"source": entry["source"]
}
)
for entry in data
]
# πŸš€ Main pipeline runner
async def main():
if not API_KEY or not CSE_ID:
raise EnvironmentError("Missing GOOGLE_API_KEY or GOOGLE_CX_ID in environment.")
print("🌍 Fetching news URLs from Google...")
all_articles = []
# This counter will be used for your simple sequential ID
global_headline_id_counter = 1
for query in QUERIES:
print(f"πŸ” Searching for: {query}")
try:
results = fetch_google_news(query, API_KEY, CSE_ID, num_results=10)
print(f" β†’ Found {len(results)} links for '{query}'.")
for item in results:
url = item.get("link", "").strip()
title = item.get("title", "").strip()
source = item.get("displayLink", "").strip()
if not url or not title:
continue
print(f"🌐 Scraping: {url}")
article_text = scrape_url(url)
if article_text:
# Assign the simple sequential ID here
all_articles.append({
"headline_id": global_headline_id_counter, # Assign the unique ID
"topic": query,
"title": title, # Keep title clean, the numbering can be for display later
"url": url,
"source": source,
"content": article_text
})
global_headline_id_counter += 1 # Increment for the next article
else:
print(f"⚠️ Skipped: {url}")
except Exception as e:
print(f"❌ Error fetching '{query}': {e}")
if not all_articles:
print("⚠️ No content scraped. Exiting.")
return
print(f"πŸ“ Writing {len(all_articles)} articles to {RAW_JSON}...")
write_articles_jsonl(all_articles, RAW_JSON)
print("🧠 Building index...")
documents = await build_documents(all_articles)
get_or_build_index_from_docs(documents)
print("βœ… Indexing complete.")
# 🏁 Entrypoint
if __name__ == "__main__":
asyncio.run(main())