RSS_News_1 / app.py
broadfield-dev's picture
Update app.py
19e0629 verified
import os
import threading
import hashlib
import logging
import time
from datetime import datetime
from flask import Flask, render_template, request, jsonify
from rss_processor import fetch_rss_feeds, process_and_store_articles, download_from_hf_hub, upload_to_hf_hub, LOCAL_DB_DIR
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
loading_complete = True
last_update_time = time.time()
last_data_hash = None
def get_embedding_model():
if not hasattr(get_embedding_model, "model"):
get_embedding_model.model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
return get_embedding_model.model
def get_vector_db():
if not os.path.exists(LOCAL_DB_DIR):
return None
try:
if not hasattr(get_vector_db, "db_instance"):
get_vector_db.db_instance = Chroma(
persist_directory=LOCAL_DB_DIR,
embedding_function=get_embedding_model(),
collection_name="news_articles"
)
return get_vector_db.db_instance
except Exception as e:
logger.error(f"Failed to load vector DB: {e}")
if hasattr(get_vector_db, "db_instance"):
delattr(get_vector_db, "db_instance")
return None
def load_feeds_in_background():
global loading_complete, last_update_time
if not loading_complete:
return
loading_complete = False
try:
logger.info("Starting background RSS feed fetch")
articles = fetch_rss_feeds()
logger.info(f"Fetched {len(articles)} articles")
process_and_store_articles(articles)
last_update_time = time.time()
logger.info("Background feed processing complete")
upload_to_hf_hub()
except Exception as e:
logger.error(f"Error in background feed loading: {e}")
finally:
loading_complete = True
def get_all_docs_from_db():
vector_db = get_vector_db()
if not vector_db or vector_db._collection.count() == 0:
return {'documents': [], 'metadatas': []}
return vector_db.get(include=['documents', 'metadatas'])
def format_articles_from_db_results(docs):
enriched_articles = []
seen_keys = set()
items = []
if isinstance(docs, dict) and 'metadatas' in docs:
items = zip(docs.get('documents', []), docs.get('metadatas', []))
elif isinstance(docs, list):
items = [(doc.page_content, doc.metadata) for doc, score in docs]
for doc_content, meta in items:
if not meta: continue
title = meta.get("title", "No Title")
link = meta.get("link", "")
published = meta.get("published", "Unknown Date").strip()
key = f"{title}|{link}|{published}"
if key not in seen_keys:
seen_keys.add(key)
try:
published_iso = datetime.strptime(published, "%Y-%m-%d %H:%M:%S").isoformat()
except (ValueError, TypeError):
published_iso = "1970-01-01T00:00:00"
enriched_articles.append({
"title": title,
"link": link,
"description": meta.get("original_description", "No Description"),
"category": meta.get("category", "Uncategorized"),
"published": published_iso,
"image": meta.get("image", "svg"),
})
enriched_articles.sort(key=lambda x: x["published"], reverse=True)
return enriched_articles
def compute_data_hash(categorized_articles):
if not categorized_articles: return ""
data_str = ""
for cat, articles in sorted(categorized_articles.items()):
for article in sorted(articles, key=lambda x: x["published"]):
data_str += f"{cat}|{article['title']}|{article['link']}|{article['published']}|"
return hashlib.sha256(data_str.encode('utf-8')).hexdigest()
@app.route('/')
def index():
global loading_complete, last_update_time, last_data_hash
if not os.path.exists(LOCAL_DB_DIR):
logger.info(f"No Chroma DB found at '{LOCAL_DB_DIR}', downloading from Hugging Face Hub...")
download_from_hf_hub()
threading.Thread(target=load_feeds_in_background, daemon=True).start()
try:
all_docs = get_all_docs_from_db()
if not all_docs['metadatas']:
return render_template("index.html", categorized_articles={}, has_articles=False, loading=True)
enriched_articles = format_articles_from_db_results(all_docs)
categorized_articles = {}
for article in enriched_articles:
cat = article["category"]
categorized_articles.setdefault(cat, []).append(article)
categorized_articles = dict(sorted(categorized_articles.items()))
for cat in categorized_articles:
categorized_articles[cat] = categorized_articles[cat][:10]
last_data_hash = compute_data_hash(categorized_articles)
return render_template("index.html", categorized_articles=categorized_articles, has_articles=True, loading=not loading_complete)
except Exception as e:
logger.error(f"Error retrieving articles at startup: {e}", exc_info=True)
return render_template("index.html", categorized_articles={}, has_articles=False, loading=True)
@app.route('/search', methods=['POST'])
def search():
query = request.form.get('search')
if not query:
return jsonify({"categorized_articles": {}, "has_articles": False, "loading": False})
vector_db = get_vector_db()
if not vector_db:
return jsonify({"categorized_articles": {}, "has_articles": False, "loading": False})
try:
results = vector_db.similarity_search_with_relevance_scores(query, k=50, score_threshold=0.5)
enriched_articles = format_articles_from_db_results(results)
categorized_articles = {}
for article in enriched_articles:
cat = article["category"]
categorized_articles.setdefault(cat, []).append(article)
return jsonify({
"categorized_articles": categorized_articles,
"has_articles": bool(enriched_articles),
"loading": False
})
except Exception as e:
logger.error(f"Semantic search error: {e}", exc_info=True)
return jsonify({"categorized_articles": {}, "has_articles": False, "loading": False}), 500
@app.route('/get_all_articles/<category>')
def get_all_articles(category):
try:
all_docs = get_all_docs_from_db()
enriched_articles = format_articles_from_db_results(all_docs)
category_articles = [article for article in enriched_articles if article["category"] == category]
return jsonify({"articles": category_articles, "category": category})
except Exception as e:
logger.error(f"Error fetching all articles for category {category}: {e}")
return jsonify({"articles": [], "category": category}), 500
@app.route('/check_loading')
def check_loading():
return jsonify({"status": "complete" if loading_complete else "loading", "last_update": last_update_time})
@app.route('/get_updates')
def get_updates():
global last_update_time, last_data_hash
try:
all_docs = get_all_docs_from_db()
if not all_docs['metadatas']:
return jsonify({"articles": {}, "last_update": last_update_time, "has_updates": False})
enriched_articles = format_articles_from_db_results(all_docs)
categorized_articles = {}
for article in enriched_articles:
cat = article["category"]
categorized_articles.setdefault(cat, []).append(article)
for cat in categorized_articles:
categorized_articles[cat] = categorized_articles[cat][:10]
current_data_hash = compute_data_hash(categorized_articles)
has_updates = last_data_hash != current_data_hash
if has_updates:
last_data_hash = current_data_hash
return jsonify({"articles": categorized_articles, "last_update": last_update_time, "has_updates": True})
else:
return jsonify({"articles": {}, "last_update": last_update_time, "has_updates": False})
except Exception as e:
logger.error(f"Error fetching updates: {e}")
return jsonify({"articles": {}, "last_update": last_update_time, "has_updates": False}), 500
@app.route('/card')
def card_load():
return render_template("card.html")
@app.route('/api/v1/search', methods=['GET'])
def api_search():
query = request.args.get('q')
limit = request.args.get('limit', default=20, type=int)
if not query:
return jsonify({"error": "Query parameter 'q' is required."}), 400
vector_db = get_vector_db()
if not vector_db:
return jsonify({"error": "Database not available."}), 503
try:
results = vector_db.similarity_search_with_relevance_scores(query, k=limit)
formatted_articles = format_articles_from_db_results(results)
return jsonify(formatted_articles)
except Exception as e:
logger.error(f"API Search error: {e}", exc_info=True)
return jsonify({"error": "An internal error occurred during search."}), 500
@app.route('/api/v1/articles/category/<string:category_name>', methods=['GET'])
def api_get_articles_by_category(category_name):
limit = request.args.get('limit', default=20, type=int)
offset = request.args.get('offset', default=0, type=int)
vector_db = get_vector_db()
if not vector_db:
return jsonify({"error": "Database not available."}), 503
try:
results = vector_db.get(where={"category": category_name}, include=['documents', 'metadatas'])
formatted_articles = format_articles_from_db_results(results)
paginated_results = formatted_articles[offset : offset + limit]
return jsonify({
"category": category_name,
"total_articles": len(formatted_articles),
"articles": paginated_results
})
except Exception as e:
logger.error(f"API Category fetch error: {e}", exc_info=True)
return jsonify({"error": "An internal error occurred."}), 500
@app.route('/api/v1/categories', methods=['GET'])
def api_get_categories():
vector_db = get_vector_db()
if not vector_db:
return jsonify({"error": "Database not available."}), 503
try:
all_metadata = vector_db.get(include=['metadatas'])['metadatas']
if not all_metadata:
return jsonify([])
unique_categories = sorted(list({meta['category'] for meta in all_metadata if 'category' in meta}))
return jsonify(unique_categories)
except Exception as e:
logger.error(f"API Categories fetch error: {e}", exc_info=True)
return jsonify({"error": "An internal error occurred."}), 500
@app.route('/api/v1/status', methods=['GET'])
def api_get_status():
return jsonify({
"status": "complete" if loading_complete else "loading",
"last_update_time": last_update_time
})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)