import os import threading import hashlib import logging import time from datetime import datetime from flask import Flask, render_template, request, jsonify from rss_processor import fetch_rss_feeds, process_and_store_articles, download_from_hf_hub, upload_to_hf_hub, LOCAL_DB_DIR from langchain.vectorstores import Chroma from langchain.embeddings import HuggingFaceEmbeddings app = Flask(__name__) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) loading_complete = True last_update_time = time.time() last_data_hash = None def get_embedding_model(): if not hasattr(get_embedding_model, "model"): get_embedding_model.model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") return get_embedding_model.model def get_vector_db(): if not os.path.exists(LOCAL_DB_DIR): return None try: if not hasattr(get_vector_db, "db_instance"): get_vector_db.db_instance = Chroma( persist_directory=LOCAL_DB_DIR, embedding_function=get_embedding_model(), collection_name="news_articles" ) return get_vector_db.db_instance except Exception as e: logger.error(f"Failed to load vector DB: {e}") if hasattr(get_vector_db, "db_instance"): delattr(get_vector_db, "db_instance") return None def load_feeds_in_background(): global loading_complete, last_update_time if not loading_complete: return loading_complete = False try: logger.info("Starting background RSS feed fetch") articles = fetch_rss_feeds() logger.info(f"Fetched {len(articles)} articles") process_and_store_articles(articles) last_update_time = time.time() logger.info("Background feed processing complete") upload_to_hf_hub() except Exception as e: logger.error(f"Error in background feed loading: {e}") finally: loading_complete = True def get_all_docs_from_db(): vector_db = get_vector_db() if not vector_db or vector_db._collection.count() == 0: return {'documents': [], 'metadatas': []} return vector_db.get(include=['documents', 'metadatas']) def format_articles_from_db_results(docs): enriched_articles = [] seen_keys = set() items = [] if isinstance(docs, dict) and 'metadatas' in docs: items = zip(docs.get('documents', []), docs.get('metadatas', [])) elif isinstance(docs, list): items = [(doc.page_content, doc.metadata) for doc, score in docs] for doc_content, meta in items: if not meta: continue title = meta.get("title", "No Title") link = meta.get("link", "") published = meta.get("published", "Unknown Date").strip() key = f"{title}|{link}|{published}" if key not in seen_keys: seen_keys.add(key) try: published_iso = datetime.strptime(published, "%Y-%m-%d %H:%M:%S").isoformat() except (ValueError, TypeError): published_iso = "1970-01-01T00:00:00" enriched_articles.append({ "title": title, "link": link, "description": meta.get("original_description", "No Description"), "category": meta.get("category", "Uncategorized"), "published": published_iso, "image": meta.get("image", "svg"), }) enriched_articles.sort(key=lambda x: x["published"], reverse=True) return enriched_articles def compute_data_hash(categorized_articles): if not categorized_articles: return "" data_str = "" for cat, articles in sorted(categorized_articles.items()): for article in sorted(articles, key=lambda x: x["published"]): data_str += f"{cat}|{article['title']}|{article['link']}|{article['published']}|" return hashlib.sha256(data_str.encode('utf-8')).hexdigest() @app.route('/') def index(): global loading_complete, last_update_time, last_data_hash if not os.path.exists(LOCAL_DB_DIR): logger.info(f"No Chroma DB found at '{LOCAL_DB_DIR}', downloading from Hugging Face Hub...") download_from_hf_hub() threading.Thread(target=load_feeds_in_background, daemon=True).start() try: all_docs = get_all_docs_from_db() if not all_docs['metadatas']: return render_template("index.html", categorized_articles={}, has_articles=False, loading=True) enriched_articles = format_articles_from_db_results(all_docs) categorized_articles = {} for article in enriched_articles: cat = article["category"] categorized_articles.setdefault(cat, []).append(article) categorized_articles = dict(sorted(categorized_articles.items())) for cat in categorized_articles: categorized_articles[cat] = categorized_articles[cat][:10] last_data_hash = compute_data_hash(categorized_articles) return render_template("index.html", categorized_articles=categorized_articles, has_articles=True, loading=not loading_complete) except Exception as e: logger.error(f"Error retrieving articles at startup: {e}", exc_info=True) return render_template("index.html", categorized_articles={}, has_articles=False, loading=True) @app.route('/search', methods=['POST']) def search(): query = request.form.get('search') if not query: return jsonify({"categorized_articles": {}, "has_articles": False, "loading": False}) vector_db = get_vector_db() if not vector_db: return jsonify({"categorized_articles": {}, "has_articles": False, "loading": False}) try: results = vector_db.similarity_search_with_relevance_scores(query, k=50, score_threshold=0.5) enriched_articles = format_articles_from_db_results(results) categorized_articles = {} for article in enriched_articles: cat = article["category"] categorized_articles.setdefault(cat, []).append(article) return jsonify({ "categorized_articles": categorized_articles, "has_articles": bool(enriched_articles), "loading": False }) except Exception as e: logger.error(f"Semantic search error: {e}", exc_info=True) return jsonify({"categorized_articles": {}, "has_articles": False, "loading": False}), 500 @app.route('/get_all_articles/') def get_all_articles(category): try: all_docs = get_all_docs_from_db() enriched_articles = format_articles_from_db_results(all_docs) category_articles = [article for article in enriched_articles if article["category"] == category] return jsonify({"articles": category_articles, "category": category}) except Exception as e: logger.error(f"Error fetching all articles for category {category}: {e}") return jsonify({"articles": [], "category": category}), 500 @app.route('/check_loading') def check_loading(): return jsonify({"status": "complete" if loading_complete else "loading", "last_update": last_update_time}) @app.route('/get_updates') def get_updates(): global last_update_time, last_data_hash try: all_docs = get_all_docs_from_db() if not all_docs['metadatas']: return jsonify({"articles": {}, "last_update": last_update_time, "has_updates": False}) enriched_articles = format_articles_from_db_results(all_docs) categorized_articles = {} for article in enriched_articles: cat = article["category"] categorized_articles.setdefault(cat, []).append(article) for cat in categorized_articles: categorized_articles[cat] = categorized_articles[cat][:10] current_data_hash = compute_data_hash(categorized_articles) has_updates = last_data_hash != current_data_hash if has_updates: last_data_hash = current_data_hash return jsonify({"articles": categorized_articles, "last_update": last_update_time, "has_updates": True}) else: return jsonify({"articles": {}, "last_update": last_update_time, "has_updates": False}) except Exception as e: logger.error(f"Error fetching updates: {e}") return jsonify({"articles": {}, "last_update": last_update_time, "has_updates": False}), 500 @app.route('/card') def card_load(): return render_template("card.html") @app.route('/api/v1/search', methods=['GET']) def api_search(): query = request.args.get('q') limit = request.args.get('limit', default=20, type=int) if not query: return jsonify({"error": "Query parameter 'q' is required."}), 400 vector_db = get_vector_db() if not vector_db: return jsonify({"error": "Database not available."}), 503 try: results = vector_db.similarity_search_with_relevance_scores(query, k=limit) formatted_articles = format_articles_from_db_results(results) return jsonify(formatted_articles) except Exception as e: logger.error(f"API Search error: {e}", exc_info=True) return jsonify({"error": "An internal error occurred during search."}), 500 @app.route('/api/v1/articles/category/', methods=['GET']) def api_get_articles_by_category(category_name): limit = request.args.get('limit', default=20, type=int) offset = request.args.get('offset', default=0, type=int) vector_db = get_vector_db() if not vector_db: return jsonify({"error": "Database not available."}), 503 try: results = vector_db.get(where={"category": category_name}, include=['documents', 'metadatas']) formatted_articles = format_articles_from_db_results(results) paginated_results = formatted_articles[offset : offset + limit] return jsonify({ "category": category_name, "total_articles": len(formatted_articles), "articles": paginated_results }) except Exception as e: logger.error(f"API Category fetch error: {e}", exc_info=True) return jsonify({"error": "An internal error occurred."}), 500 @app.route('/api/v1/categories', methods=['GET']) def api_get_categories(): vector_db = get_vector_db() if not vector_db: return jsonify({"error": "Database not available."}), 503 try: all_metadata = vector_db.get(include=['metadatas'])['metadatas'] if not all_metadata: return jsonify([]) unique_categories = sorted(list({meta['category'] for meta in all_metadata if 'category' in meta})) return jsonify(unique_categories) except Exception as e: logger.error(f"API Categories fetch error: {e}", exc_info=True) return jsonify({"error": "An internal error occurred."}), 500 @app.route('/api/v1/status', methods=['GET']) def api_get_status(): return jsonify({ "status": "complete" if loading_complete else "loading", "last_update_time": last_update_time }) if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)