File size: 4,166 Bytes
6d24925 d4f91e1 0e7d7a3 d4f91e1 0e7d7a3 d4f91e1 0e7d7a3 d4f91e1 0e7d7a3 d4f91e1 0e7d7a3 d4f91e1 0e7d7a3 d4f91e1 0820a6b 8e17b80 0820a6b 8e17b80 b2dd246 8e17b80 6d24925 0e7d7a3 d4f91e1 6d24925 d4f91e1 6d24925 0820a6b 6d24925 d4f91e1 6d24925 76187cf 6d24925 8e17b80 6d24925 d4f91e1 6d24925 0820a6b 8e17b80 0820a6b 8e17b80 0820a6b 8e17b80 0820a6b 6d24925 d4f91e1 6d24925 d4f91e1 6d24925 d4f91e1 b2dd246 d4f91e1 0e7d7a3 d4f91e1 0820a6b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
import os
import sys
import json
import asyncio
from typing import List, Dict
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from components.indexers.news_indexer import get_or_build_index_from_docs
from components.fetchers.google_search import fetch_google_news
from components.fetchers.scraper import scrape_url
from llama_index.core.settings import Settings
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.schema import Document
# β
Use local embedding model
Settings.embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/paraphrase-MiniLM-L3-v2")
# π Environment variables
API_KEY = os.environ.get("GOOGLE_API_KEY")
CSE_ID = os.environ.get("GOOGLE_CX_ID")
# π° Topics
QUERIES = ["India news", "World news", "Tech news", "Finance news", "Sports news"]
# ποΈ Paths
DATA_DIR = "data/news"
RAW_JSON = os.path.join(DATA_DIR, "news.jsonl")
INDEX_DIR = "storage/index"
# πΎ Save articles to disk
def write_articles_jsonl(articles: List[Dict], file_path: str):
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
for article in articles:
f.write(json.dumps(article, ensure_ascii=False) + "\n")
# π Convert raw scraped data into Document objects
async def build_documents(data: List[Dict]) -> List[Document]:
# --- IMPORTANT CHANGE HERE ---
# The 'data' list from all_articles already contains the 'headline_id' (which was 'counter').
# We will use that directly.
return [
Document(
text=entry["content"],
metadata={
"headline_id": entry["headline_id"], # Use the pre-assigned ID
"title": entry["title"],
"url": entry["url"],
"topic": entry["topic"].lower().replace(" news", ""), # normalized topic key
"source": entry["source"]
}
)
for entry in data
]
# π Main pipeline runner
async def main():
if not API_KEY or not CSE_ID:
raise EnvironmentError("Missing GOOGLE_API_KEY or GOOGLE_CX_ID in environment.")
print("π Fetching news URLs from Google...")
all_articles = []
# This counter will be used for your simple sequential ID
global_headline_id_counter = 1
for query in QUERIES:
print(f"π Searching for: {query}")
try:
results = fetch_google_news(query, API_KEY, CSE_ID, num_results=10)
print(f" β Found {len(results)} links for '{query}'.")
for item in results:
url = item.get("link", "").strip()
title = item.get("title", "").strip()
source = item.get("displayLink", "").strip()
if not url or not title:
continue
print(f"π Scraping: {url}")
article_text = scrape_url(url)
if article_text:
# Assign the simple sequential ID here
all_articles.append({
"headline_id": global_headline_id_counter, # Assign the unique ID
"topic": query,
"title": title, # Keep title clean, the numbering can be for display later
"url": url,
"source": source,
"content": article_text
})
global_headline_id_counter += 1 # Increment for the next article
else:
print(f"β οΈ Skipped: {url}")
except Exception as e:
print(f"β Error fetching '{query}': {e}")
if not all_articles:
print("β οΈ No content scraped. Exiting.")
return
print(f"π Writing {len(all_articles)} articles to {RAW_JSON}...")
write_articles_jsonl(all_articles, RAW_JSON)
print("π§ Building index...")
documents = await build_documents(all_articles)
get_or_build_index_from_docs(documents)
print("β
Indexing complete.")
# π Entrypoint
if __name__ == "__main__":
asyncio.run(main()) |