import os import feedparser from chromadb import PersistentClient from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_core.documents import Document import logging from huggingface_hub import HfApi, login, snapshot_download from datetime import datetime import dateutil.parser import hashlib import json import re logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) LOCAL_DB_DIR = "chroma_db" FEEDS_FILE = "rss_feeds.json" COLLECTION_NAME = "news_articles" HF_API_TOKEN = os.getenv("HF_TOKEN") REPO_ID = "broadfield-dev/news-rag-db" MAX_ARTICLES_PER_FEED = 1000 def initialize_hf_api(): if not HF_API_TOKEN: logger.error("Hugging Face API token (HF_TOKEN) not set.") raise ValueError("HF_TOKEN environment variable is not set.") try: login(token=HF_API_TOKEN) return HfApi() except Exception as e: logger.error(f"Failed to login to Hugging Face Hub: {e}") raise def get_embedding_model(): if not hasattr(get_embedding_model, "model"): get_embedding_model.model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") return get_embedding_model.model def clean_text(text): if not text or not isinstance(text, str): return "" text = re.sub(r'<.*?>', '', text) text = ' '.join(text.split()) return text.strip() def fetch_rss_feeds(): articles = [] seen_links = set() try: with open(FEEDS_FILE, 'r') as f: feed_categories = json.load(f) except FileNotFoundError: logger.error(f"{FEEDS_FILE} not found. No feeds to process.") return [] for category, feeds in feed_categories.items(): for feed_info in feeds: feed_url = feed_info.get("url") if not feed_url: logger.warning(f"Skipping feed with no URL in category '{category}'") continue try: logger.info(f"Fetching {feed_url}") feed = feedparser.parse(feed_url) if feed.bozo: logger.warning(f"Parse error for {feed_url}: {feed.bozo_exception}") continue for entry in feed.entries[:MAX_ARTICLES_PER_FEED]: link = entry.get("link", "") if not link or link in seen_links: continue seen_links.add(link) title = entry.get("title", "No Title") description_raw = entry.get("summary", entry.get("description", "")) description = clean_text(description_raw) if not description: continue published_str = "Unknown Date" for date_field in ["published", "updated", "created", "pubDate"]: if date_field in entry: try: parsed_date = dateutil.parser.parse(entry[date_field]) published_str = parsed_date.isoformat() break except (ValueError, TypeError): continue image = "svg" image_sources = [ lambda e: e.get("media_content", [{}])[0].get("url") if e.get("media_content") else None, lambda e: e.get("media_thumbnail", [{}])[0].get("url") if e.get("media_thumbnail") else None, lambda e: e.get("enclosure", {}).get("url") if e.get("enclosure") and e.get("enclosure", {}).get('type', '').startswith('image') else None, lambda e: next((lnk.get("href") for lnk in e.get("links", []) if lnk.get("type", "").startswith("image")), None), ] for source_func in image_sources: try: img_url = source_func(entry) if img_url and isinstance(img_url, str) and img_url.strip(): image = img_url break except (IndexError, AttributeError, TypeError): continue articles.append({ "title": title, "link": link, "description": description, "published": published_str, "category": category, "image": image, }) except Exception as e: logger.error(f"Error fetching or parsing {feed_url}: {e}") logger.info(f"Total unique articles fetched: {len(articles)}") return articles def process_and_store_articles(articles): if not os.path.exists(LOCAL_DB_DIR): os.makedirs(LOCAL_DB_DIR) client = PersistentClient(path=LOCAL_DB_DIR) collection = client.get_or_create_collection(name=COLLECTION_NAME) try: existing_ids = set(collection.get(include=[])["ids"]) logger.info(f"Loaded {len(existing_ids)} existing document IDs from {LOCAL_DB_DIR}.") except Exception: logger.info("No existing DB found or it is empty. Starting fresh.") existing_ids = set() contents_to_add = [] metadatas_to_add = [] ids_to_add = [] for article in articles: if not article.get('link'): continue doc_id = hashlib.sha256(article['link'].encode('utf-8')).hexdigest() if doc_id in existing_ids: continue metadata = { "title": article["title"], "link": article["link"], "published": article["published"], "category": article["category"], "image": article["image"], } contents_to_add.append(article["description"]) metadatas_to_add.append(metadata) ids_to_add.append(doc_id) if ids_to_add: logger.info(f"Found {len(ids_to_add)} new articles to add to the database.") try: embedding_model = get_embedding_model() embeddings_to_add = embedding_model.embed_documents(contents_to_add) collection.add( embeddings=embeddings_to_add, documents=contents_to_add, metadatas=metadatas_to_add, ids=ids_to_add ) logger.info(f"Successfully added {len(ids_to_add)} new articles to DB. Total in DB: {collection.count()}") except Exception as e: logger.error(f"Error storing articles in ChromaDB: {e}", exc_info=True) else: logger.info("No new articles to add to the database.") def download_from_hf_hub(): if not os.path.exists(os.path.join(LOCAL_DB_DIR, "chroma.sqlite3")): try: logger.info(f"Downloading Chroma DB from {REPO_ID} to {LOCAL_DB_DIR}...") snapshot_download( repo_id=REPO_ID, repo_type="dataset", local_dir=".", local_dir_use_symlinks=False, allow_patterns=[f"{LOCAL_DB_DIR}/**"], token=HF_API_TOKEN ) logger.info("Finished downloading DB.") except Exception as e: logger.warning(f"Could not download from Hugging Face Hub (this is normal on first run): {e}") else: logger.info(f"Local Chroma DB found at '{LOCAL_DB_DIR}', skipping download.") def upload_to_hf_hub(hf_api): if os.path.exists(LOCAL_DB_DIR): try: logger.info(f"Uploading updated Chroma DB '{LOCAL_DB_DIR}' to {REPO_ID}...") hf_api.upload_folder( folder_path=LOCAL_DB_DIR, path_in_repo=LOCAL_DB_DIR, repo_id=REPO_ID, repo_type="dataset", commit_message=f"Update RSS news database {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", ignore_patterns=["*.bak", "*.tmp"] ) logger.info(f"Database folder '{LOCAL_DB_DIR}' uploaded to: {REPO_ID}") except Exception as e: logger.error(f"Error uploading to Hugging Face Hub: {e}", exc_info=True) def main(): try: hf_api = initialize_hf_api() download_from_hf_hub() articles_to_process = fetch_rss_feeds() if articles_to_process: process_and_store_articles(articles_to_process) upload_to_hf_hub(hf_api) else: logger.info("No articles fetched, skipping database processing and upload.") except Exception as e: logger.critical(f"An unhandled error occurred in main execution: {e}", exc_info=True) if __name__ == "__main__": main()