RSS_News_1 / rss_processor.py
broadfield-dev's picture
Update rss_processor.py
a7c55d0 verified
import os
import feedparser
from chromadb import PersistentClient
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_core.documents import Document
import logging
from huggingface_hub import HfApi, login, snapshot_download
from datetime import datetime
import dateutil.parser
import hashlib
import json
import re
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
LOCAL_DB_DIR = "chroma_db"
FEEDS_FILE = "rss_feeds.json"
COLLECTION_NAME = "news_articles"
HF_API_TOKEN = os.getenv("HF_TOKEN")
REPO_ID = "broadfield-dev/news-rag-db"
MAX_ARTICLES_PER_FEED = 1000
def initialize_hf_api():
if not HF_API_TOKEN:
logger.error("Hugging Face API token (HF_TOKEN) not set.")
raise ValueError("HF_TOKEN environment variable is not set.")
try:
login(token=HF_API_TOKEN)
return HfApi()
except Exception as e:
logger.error(f"Failed to login to Hugging Face Hub: {e}")
raise
def get_embedding_model():
if not hasattr(get_embedding_model, "model"):
get_embedding_model.model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
return get_embedding_model.model
def clean_text(text):
if not text or not isinstance(text, str):
return ""
text = re.sub(r'<.*?>', '', text)
text = ' '.join(text.split())
return text.strip()
def fetch_rss_feeds():
articles = []
seen_links = set()
try:
with open(FEEDS_FILE, 'r') as f:
feed_categories = json.load(f)
except FileNotFoundError:
logger.error(f"{FEEDS_FILE} not found. No feeds to process.")
return []
for category, feeds in feed_categories.items():
for feed_info in feeds:
feed_url = feed_info.get("url")
if not feed_url:
logger.warning(f"Skipping feed with no URL in category '{category}'")
continue
try:
logger.info(f"Fetching {feed_url}")
feed = feedparser.parse(feed_url)
if feed.bozo:
logger.warning(f"Parse error for {feed_url}: {feed.bozo_exception}")
continue
for entry in feed.entries[:MAX_ARTICLES_PER_FEED]:
link = entry.get("link", "")
if not link or link in seen_links:
continue
seen_links.add(link)
title = entry.get("title", "No Title")
description_raw = entry.get("summary", entry.get("description", ""))
description = clean_text(description_raw)
if not description:
continue
published_str = "Unknown Date"
for date_field in ["published", "updated", "created", "pubDate"]:
if date_field in entry:
try:
parsed_date = dateutil.parser.parse(entry[date_field])
published_str = parsed_date.isoformat()
break
except (ValueError, TypeError):
continue
image = "svg"
image_sources = [
lambda e: e.get("media_content", [{}])[0].get("url") if e.get("media_content") else None,
lambda e: e.get("media_thumbnail", [{}])[0].get("url") if e.get("media_thumbnail") else None,
lambda e: e.get("enclosure", {}).get("url") if e.get("enclosure") and e.get("enclosure", {}).get('type', '').startswith('image') else None,
lambda e: next((lnk.get("href") for lnk in e.get("links", []) if lnk.get("type", "").startswith("image")), None),
]
for source_func in image_sources:
try:
img_url = source_func(entry)
if img_url and isinstance(img_url, str) and img_url.strip():
image = img_url
break
except (IndexError, AttributeError, TypeError):
continue
articles.append({
"title": title,
"link": link,
"description": description,
"published": published_str,
"category": category,
"image": image,
})
except Exception as e:
logger.error(f"Error fetching or parsing {feed_url}: {e}")
logger.info(f"Total unique articles fetched: {len(articles)}")
return articles
def process_and_store_articles(articles):
if not os.path.exists(LOCAL_DB_DIR):
os.makedirs(LOCAL_DB_DIR)
client = PersistentClient(path=LOCAL_DB_DIR)
collection = client.get_or_create_collection(name=COLLECTION_NAME)
try:
existing_ids = set(collection.get(include=[])["ids"])
logger.info(f"Loaded {len(existing_ids)} existing document IDs from {LOCAL_DB_DIR}.")
except Exception:
logger.info("No existing DB found or it is empty. Starting fresh.")
existing_ids = set()
contents_to_add = []
metadatas_to_add = []
ids_to_add = []
for article in articles:
if not article.get('link'):
continue
doc_id = hashlib.sha256(article['link'].encode('utf-8')).hexdigest()
if doc_id in existing_ids:
continue
metadata = {
"title": article["title"],
"link": article["link"],
"published": article["published"],
"category": article["category"],
"image": article["image"],
}
contents_to_add.append(article["description"])
metadatas_to_add.append(metadata)
ids_to_add.append(doc_id)
if ids_to_add:
logger.info(f"Found {len(ids_to_add)} new articles to add to the database.")
try:
embedding_model = get_embedding_model()
embeddings_to_add = embedding_model.embed_documents(contents_to_add)
collection.add(
embeddings=embeddings_to_add,
documents=contents_to_add,
metadatas=metadatas_to_add,
ids=ids_to_add
)
logger.info(f"Successfully added {len(ids_to_add)} new articles to DB. Total in DB: {collection.count()}")
except Exception as e:
logger.error(f"Error storing articles in ChromaDB: {e}", exc_info=True)
else:
logger.info("No new articles to add to the database.")
def download_from_hf_hub():
if not os.path.exists(os.path.join(LOCAL_DB_DIR, "chroma.sqlite3")):
try:
logger.info(f"Downloading Chroma DB from {REPO_ID} to {LOCAL_DB_DIR}...")
snapshot_download(
repo_id=REPO_ID,
repo_type="dataset",
local_dir=".",
local_dir_use_symlinks=False,
allow_patterns=[f"{LOCAL_DB_DIR}/**"],
token=HF_API_TOKEN
)
logger.info("Finished downloading DB.")
except Exception as e:
logger.warning(f"Could not download from Hugging Face Hub (this is normal on first run): {e}")
else:
logger.info(f"Local Chroma DB found at '{LOCAL_DB_DIR}', skipping download.")
def upload_to_hf_hub(hf_api):
if os.path.exists(LOCAL_DB_DIR):
try:
logger.info(f"Uploading updated Chroma DB '{LOCAL_DB_DIR}' to {REPO_ID}...")
hf_api.upload_folder(
folder_path=LOCAL_DB_DIR,
path_in_repo=LOCAL_DB_DIR,
repo_id=REPO_ID,
repo_type="dataset",
commit_message=f"Update RSS news database {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
ignore_patterns=["*.bak", "*.tmp"]
)
logger.info(f"Database folder '{LOCAL_DB_DIR}' uploaded to: {REPO_ID}")
except Exception as e:
logger.error(f"Error uploading to Hugging Face Hub: {e}", exc_info=True)
def main():
try:
hf_api = initialize_hf_api()
download_from_hf_hub()
articles_to_process = fetch_rss_feeds()
if articles_to_process:
process_and_store_articles(articles_to_process)
upload_to_hf_hub(hf_api)
else:
logger.info("No articles fetched, skipping database processing and upload.")
except Exception as e:
logger.critical(f"An unhandled error occurred in main execution: {e}", exc_info=True)
if __name__ == "__main__":
main()