RSS_News_1 / rss_processor_og.py
broadfield-dev's picture
Rename rss_processor.py to rss_processor_og.py
66bca53 verified
import os
import feedparser
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.docstore.document import Document
import logging
from huggingface_hub import HfApi, login, snapshot_download
import shutil
import rss_feeds
from datetime import datetime
import dateutil.parser
import hashlib
import re
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
LOCAL_DB_DIR = "chroma_db"
RSS_FEEDS = rss_feeds.RSS_FEEDS
COLLECTION_NAME = "news_articles"
HF_API_TOKEN = os.getenv("HF_TOKEN")
REPO_ID = "broadfield-dev/news-rag-db"
login(token=HF_API_TOKEN)
hf_api = HfApi()
def get_embedding_model():
if not hasattr(get_embedding_model, "model"):
get_embedding_model.model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
return get_embedding_model.model
def clean_text(text):
if not text or not isinstance(text, str):
return ""
text = re.sub(r'<.*?>', '', text)
text = ' '.join(text.split())
return text.strip().lower()
def fetch_rss_feeds():
articles = []
seen_keys = set()
for feed_url in RSS_FEEDS:
try:
logger.info(f"Fetching {feed_url}")
feed = feedparser.parse(feed_url)
if feed.bozo:
logger.warning(f"Parse error for {feed_url}: {feed.bozo_exception}")
continue
article_count = 0
for entry in feed.entries:
if article_count >= 10:
break
title = entry.get("title", "No Title")
link = entry.get("link", "")
description = entry.get("summary", entry.get("description", ""))
cleaned_title = clean_text(title)
cleaned_link = clean_text(link)
published = "Unknown Date"
for date_field in ["published", "updated", "created", "pubDate"]:
if date_field in entry:
try:
parsed_date = dateutil.parser.parse(entry[date_field])
published = parsed_date.strftime("%Y-%m-%d %H:%M:%S")
break
except (ValueError, TypeError):
continue
key = f"{cleaned_title}|{cleaned_link}|{published}"
if key not in seen_keys:
seen_keys.add(key)
image = "svg"
for img_source in [
lambda e: clean_text(e.get("media_content", [{}])[0].get("url")) if e.get("media_content") else "",
lambda e: clean_text(e.get("media_thumbnail", [{}])[0].get("url")) if e.get("media_thumbnail") else "",
]:
try:
img = img_source(entry)
if img and img.strip():
image = img
break
except (IndexError, AttributeError, TypeError):
continue
articles.append({
"title": title,
"link": link,
"description": description,
"published": published,
"category": categorize_feed(feed_url),
"image": image,
})
article_count += 1
except Exception as e:
logger.error(f"Error fetching {feed_url}: {e}")
logger.info(f"Total articles fetched: {len(articles)}")
return articles
def categorize_feed(url):
if not url or not isinstance(url, str):
return "Uncategorized"
url = url.lower().strip()
if any(keyword in url for keyword in ["nature", "science.org", "arxiv.org", "plos.org", "jneurosci.org", "nejm.org", "lancet.com"]): return "Academic Papers"
if any(keyword in url for keyword in ["ft.com", "marketwatch.com", "cnbc.com", "wsj.com", "economist.com"]): return "Business"
if any(keyword in url for keyword in ["investing.com", "fool.co.uk", "seekingalpha.com", "yahoofinance.com"]): return "Stocks & Markets"
if any(keyword in url for keyword in ["nasa", "spaceweatherlive", "space.com", "universetoday.com", "esa.int"]): return "Space"
if any(keyword in url for keyword in ["sciencedaily", "quantamagazine", "scientificamerican", "newscientist", "livescience"]): return "Science"
if any(keyword in url for keyword in ["wired", "techcrunch", "arstechnica", "gizmodo", "theverge"]): return "Tech"
if any(keyword in url for keyword in ["horoscope", "astrostyle"]): return "Astrology"
if any(keyword in url for keyword in ["bbci.co.uk/news/politics", "politico.com", "thehill.com"]): return "Politics"
if any(keyword in url for keyword in ["weather.com", "weather.gov", "swpc.noaa.gov", "foxweather"]): return "Earth Weather"
if "phys.org" in url or "aps.org" in url: return "Physics"
return "Uncategorized"
def process_and_store_articles(articles):
vector_db = Chroma(
persist_directory=LOCAL_DB_DIR,
embedding_function=get_embedding_model(),
collection_name=COLLECTION_NAME
)
try:
existing_ids = set(vector_db.get(include=[])["ids"])
logger.info(f"Loaded {len(existing_ids)} existing document IDs from {LOCAL_DB_DIR}.")
except Exception:
existing_ids = set()
logger.info("No existing DB found or it is empty. Starting fresh.")
docs_to_add = []
ids_to_add = []
for article in articles:
cleaned_title = clean_text(article["title"])
cleaned_link = clean_text(article["link"])
doc_id = f"{cleaned_title}|{cleaned_link}|{article['published']}"
if doc_id in existing_ids:
continue
metadata = {
"title": article["title"],
"link": article["link"],
"original_description": article["description"],
"published": article["published"],
"category": article["category"],
"image": article["image"],
}
doc = Document(page_content=clean_text(article["description"]), metadata=metadata)
docs_to_add.append(doc)
ids_to_add.append(doc_id)
existing_ids.add(doc_id)
if docs_to_add:
try:
vector_db.add_documents(documents=docs_to_add, ids=ids_to_add)
vector_db.persist()
logger.info(f"Added {len(docs_to_add)} new articles to DB. Total in DB: {vector_db._collection.count()}")
except Exception as e:
logger.error(f"Error storing articles: {e}")
def download_from_hf_hub():
if not os.path.exists(LOCAL_DB_DIR):
try:
logger.info(f"Downloading Chroma DB from {REPO_ID} to {LOCAL_DB_DIR}...")
snapshot_download(
repo_id=REPO_ID,
repo_type="dataset",
local_dir=".",
local_dir_use_symlinks=False,
allow_patterns=f"{LOCAL_DB_DIR}/**",
token=HF_API_TOKEN
)
logger.info("Finished downloading DB.")
except Exception as e:
logger.warning(f"Could not download from Hugging Face Hub (this is normal on first run): {e}")
else:
logger.info("Local Chroma DB exists, loading existing data.")
def upload_to_hf_hub():
if os.path.exists(LOCAL_DB_DIR):
try:
logger.info(f"Uploading updated Chroma DB '{LOCAL_DB_DIR}' to {REPO_ID}...")
hf_api.upload_folder(
folder_path=LOCAL_DB_DIR,
path_in_repo=LOCAL_DB_DIR,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_API_TOKEN,
commit_message="Update RSS news database"
)
logger.info(f"Database folder '{LOCAL_DB_DIR}' uploaded to: {REPO_ID}")
except Exception as e:
logger.error(f"Error uploading to Hugging Face Hub: {e}")
if __name__ == "__main__":
download_from_hf_hub()
articles = fetch_rss_feeds()
process_and_store_articles(articles)
upload_to_hf_hub()