Spaces:
Running
Running
| import os | |
| import threading | |
| import hashlib | |
| import logging | |
| import time | |
| from datetime import datetime | |
| from flask import Flask, render_template, request, jsonify | |
| from rss_processor import fetch_rss_feeds, process_and_store_articles, download_from_hf_hub, upload_to_hf_hub, LOCAL_DB_DIR, main | |
| from langchain.vectorstores import Chroma | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| main() | |
| app = Flask(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| loading_complete = True | |
| last_update_time = time.time() | |
| last_data_hash = None | |
| def get_embedding_model(): | |
| if not hasattr(get_embedding_model, "model"): | |
| get_embedding_model.model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| return get_embedding_model.model | |
| def get_vector_db(): | |
| if not os.path.exists(LOCAL_DB_DIR): | |
| return None | |
| try: | |
| if not hasattr(get_vector_db, "db_instance"): | |
| get_vector_db.db_instance = Chroma( | |
| persist_directory=LOCAL_DB_DIR, | |
| embedding_function=get_embedding_model(), | |
| collection_name="news_articles" | |
| ) | |
| return get_vector_db.db_instance | |
| except Exception as e: | |
| logger.error(f"Failed to load vector DB: {e}") | |
| if hasattr(get_vector_db, "db_instance"): | |
| delattr(get_vector_db, "db_instance") | |
| return None | |
| def load_feeds_in_background(): | |
| global loading_complete, last_update_time | |
| if not loading_complete: | |
| return | |
| loading_complete = False | |
| try: | |
| logger.info("Starting background RSS feed fetch") | |
| articles = fetch_rss_feeds() | |
| logger.info(f"Fetched {len(articles)} articles") | |
| process_and_store_articles(articles) | |
| last_update_time = time.time() | |
| logger.info("Background feed processing complete") | |
| upload_to_hf_hub() | |
| except Exception as e: | |
| logger.error(f"Error in background feed loading: {e}") | |
| finally: | |
| loading_complete = True | |
| def get_all_docs_from_db(): | |
| vector_db = get_vector_db() | |
| if not vector_db or vector_db._collection.count() == 0: | |
| return {'documents': [], 'metadatas': []} | |
| return vector_db.get(include=['documents', 'metadatas']) | |
| def format_articles_from_db_results(docs): | |
| enriched_articles = [] | |
| seen_keys = set() | |
| items = [] | |
| # Handle both direct DB gets and similarity search results | |
| if isinstance(docs, dict) and 'metadatas' in docs: | |
| items = zip(docs.get('documents', []), docs.get('metadatas', [])) | |
| elif isinstance(docs, list): | |
| items = [(doc.page_content, doc.metadata) for doc, score in docs] | |
| for doc_content, meta in items: | |
| if not meta: continue | |
| title = meta.get("title", "No Title") | |
| link = meta.get("link", "") | |
| # The 'published' string from the DB is already in the correct ISO format. | |
| published_iso = meta.get("published", "1970-01-01T00:00:00").strip() | |
| # Use a unique key to avoid duplicates in the final display | |
| key = f"{title}|{link}|{published_iso}" | |
| if key not in seen_keys: | |
| seen_keys.add(key) | |
| # The description is the main content of the document, not in the metadata. | |
| description = doc_content if doc_content else "No Description" | |
| enriched_articles.append({ | |
| "title": title, | |
| "link": link, | |
| "description": description, # Correctly use the document content | |
| "category": meta.get("category", "Uncategorized"), | |
| "published": published_iso, # Use the ISO string directly | |
| "image": meta.get("image", "svg"), | |
| }) | |
| # Sorting will now work correctly with valid ISO date strings | |
| enriched_articles.sort(key=lambda x: x["published"], reverse=True) | |
| return enriched_articles | |
| def compute_data_hash(categorized_articles): | |
| if not categorized_articles: return "" | |
| data_str = "" | |
| for cat, articles in sorted(categorized_articles.items()): | |
| for article in sorted(articles, key=lambda x: x["published"]): | |
| data_str += f"{cat}|{article['title']}|{article['link']}|{article['published']}|" | |
| return hashlib.sha256(data_str.encode('utf-8')).hexdigest() | |
| def index(): | |
| global loading_complete, last_update_time, last_data_hash | |
| if not os.path.exists(LOCAL_DB_DIR): | |
| logger.info(f"No Chroma DB found at '{LOCAL_DB_DIR}', downloading from Hugging Face Hub...") | |
| download_from_hf_hub() | |
| threading.Thread(target=load_feeds_in_background, daemon=True).start() | |
| try: | |
| all_docs = get_all_docs_from_db() | |
| if not all_docs['metadatas']: | |
| return render_template("index.html", categorized_articles={}, has_articles=False, loading=True) | |
| enriched_articles = format_articles_from_db_results(all_docs) | |
| categorized_articles = {} | |
| for article in enriched_articles: | |
| cat = article["category"] | |
| categorized_articles.setdefault(cat, []).append(article) | |
| categorized_articles = dict(sorted(categorized_articles.items())) | |
| for cat in categorized_articles: | |
| categorized_articles[cat] = categorized_articles[cat][:10] | |
| last_data_hash = compute_data_hash(categorized_articles) | |
| return render_template("index.html", categorized_articles=categorized_articles, has_articles=True, loading=not loading_complete) | |
| except Exception as e: | |
| logger.error(f"Error retrieving articles at startup: {e}", exc_info=True) | |
| return render_template("index.html", categorized_articles={}, has_articles=False, loading=True) | |
| def search(): | |
| query = request.form.get('search') | |
| if not query: | |
| return jsonify({"categorized_articles": {}, "has_articles": False, "loading": False}) | |
| vector_db = get_vector_db() | |
| if not vector_db: | |
| return jsonify({"categorized_articles": {}, "has_articles": False, "loading": False}) | |
| try: | |
| # 1. Use similarity_search_with_score to get the raw distance score. | |
| # This returns a list of (Document, float) tuples. | |
| results_with_scores = vector_db.similarity_search_with_score(query, k=50) | |
| # 2. Filter based on the raw L2 distance score. Lower is better. | |
| # A threshold of 1.0 is a good starting point. You can make it smaller (e.g., 0.8) | |
| # for stricter matches, or larger for looser matches. | |
| score_threshold = 1.5 | |
| filtered_results = [(doc, score) for doc, score in results_with_scores if score < score_threshold] | |
| # 3. Pass the correctly filtered list to the formatting function. | |
| # This function is already set up to handle this data structure. | |
| enriched_articles = format_articles_from_db_results(filtered_results) | |
| categorized_articles = {} | |
| for article in enriched_articles: | |
| cat = article["category"] | |
| categorized_articles.setdefault(cat, []).append(article) | |
| return jsonify({ | |
| "categorized_articles": categorized_articles, | |
| "has_articles": bool(enriched_articles), | |
| "loading": False | |
| }) | |
| except Exception as e: | |
| logger.error(f"Semantic search error: {e}", exc_info=True) | |
| return jsonify({"categorized_articles": {}, "has_articles": False, "loading": False}), 500 | |
| def get_all_articles(category): | |
| try: | |
| all_docs = get_all_docs_from_db() | |
| enriched_articles = format_articles_from_db_results(all_docs) | |
| category_articles = [article for article in enriched_articles if article["category"] == category] | |
| return jsonify({"articles": category_articles, "category": category}) | |
| except Exception as e: | |
| logger.error(f"Error fetching all articles for category {category}: {e}") | |
| return jsonify({"articles": [], "category": category}), 500 | |
| def check_loading(): | |
| return jsonify({"status": "complete" if loading_complete else "loading", "last_update": last_update_time}) | |
| def get_updates(): | |
| global last_update_time, last_data_hash | |
| try: | |
| all_docs = get_all_docs_from_db() | |
| if not all_docs['metadatas']: | |
| return jsonify({"articles": {}, "last_update": last_update_time, "has_updates": False}) | |
| enriched_articles = format_articles_from_db_results(all_docs) | |
| categorized_articles = {} | |
| for article in enriched_articles: | |
| cat = article["category"] | |
| categorized_articles.setdefault(cat, []).append(article) | |
| for cat in categorized_articles: | |
| categorized_articles[cat] = categorized_articles[cat][:10] | |
| current_data_hash = compute_data_hash(categorized_articles) | |
| has_updates = last_data_hash != current_data_hash | |
| if has_updates: | |
| last_data_hash = current_data_hash | |
| return jsonify({"articles": categorized_articles, "last_update": last_update_time, "has_updates": True}) | |
| else: | |
| return jsonify({"articles": {}, "last_update": last_update_time, "has_updates": False}) | |
| except Exception as e: | |
| logger.error(f"Error fetching updates: {e}") | |
| return jsonify({"articles": {}, "last_update": last_update_time, "has_updates": False}), 500 | |
| def card_load(): | |
| return render_template("card.html") | |
| def api_search(): | |
| query = request.args.get('q') | |
| limit = request.args.get('limit', default=20, type=int) | |
| if not query: | |
| return jsonify({"error": "Query parameter 'q' is required."}), 400 | |
| vector_db = get_vector_db() | |
| if not vector_db: | |
| return jsonify({"error": "Database not available."}), 503 | |
| try: | |
| results = vector_db.similarity_search_with_relevance_scores(query, k=limit) | |
| formatted_articles = format_articles_from_db_results(results) | |
| return jsonify(formatted_articles) | |
| except Exception as e: | |
| logger.error(f"API Search error: {e}", exc_info=True) | |
| return jsonify({"error": "An internal error occurred during search."}), 500 | |
| def api_get_articles_by_category(category_name): | |
| limit = request.args.get('limit', default=20, type=int) | |
| offset = request.args.get('offset', default=0, type=int) | |
| vector_db = get_vector_db() | |
| if not vector_db: | |
| return jsonify({"error": "Database not available."}), 503 | |
| try: | |
| results = vector_db.get(where={"category": category_name}, include=['documents', 'metadatas']) | |
| formatted_articles = format_articles_from_db_results(results) | |
| paginated_results = formatted_articles[offset : offset + limit] | |
| return jsonify({ | |
| "category": category_name, | |
| "total_articles": len(formatted_articles), | |
| "articles": paginated_results | |
| }) | |
| except Exception as e: | |
| logger.error(f"API Category fetch error: {e}", exc_info=True) | |
| return jsonify({"error": "An internal error occurred."}), 500 | |
| def api_get_categories(): | |
| vector_db = get_vector_db() | |
| if not vector_db: | |
| return jsonify({"error": "Database not available."}), 503 | |
| try: | |
| all_metadata = vector_db.get(include=['metadatas'])['metadatas'] | |
| if not all_metadata: | |
| return jsonify([]) | |
| unique_categories = sorted(list({meta['category'] for meta in all_metadata if 'category' in meta})) | |
| return jsonify(unique_categories) | |
| except Exception as e: | |
| logger.error(f"API Categories fetch error: {e}", exc_info=True) | |
| return jsonify({"error": "An internal error occurred."}), 500 | |
| def api_get_status(): | |
| return jsonify({ | |
| "status": "complete" if loading_complete else "loading", | |
| "last_update_time": last_update_time | |
| }) | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) |