Spaces:
Running
Running
""" | |
Data clearing service for both local and Hugging Face Space environments. | |
Provides functionality to clear vector store and chat history data. | |
""" | |
import os | |
import shutil | |
from pathlib import Path | |
from typing import Dict, Any, Tuple, List | |
from src.core.config import config | |
from src.core.logging_config import get_logger | |
from src.rag.vector_store import vector_store_manager | |
logger = get_logger(__name__) | |
class DataClearingService: | |
"""Service for clearing all RAG-related data across different environments.""" | |
def __init__(self): | |
"""Initialize the data clearing service.""" | |
self.is_hf_space = bool(os.getenv("SPACE_ID")) | |
logger.info(f"DataClearingService initialized (HF Space: {self.is_hf_space})") | |
def get_data_paths(self) -> Tuple[str, str]: | |
""" | |
Get the correct data paths for current environment. | |
Returns: | |
Tuple of (vector_store_path, chat_history_path) | |
""" | |
vector_store_path = config.rag.vector_store_path | |
chat_history_path = config.rag.chat_history_path | |
logger.info(f"Data paths - Vector store: {vector_store_path}, Chat history: {chat_history_path}") | |
return vector_store_path, chat_history_path | |
def clear_vector_store(self) -> Tuple[bool, str, Dict[str, Any]]: | |
""" | |
Clear all documents from the vector store. | |
Returns: | |
Tuple of (success, message, stats) | |
""" | |
try: | |
# Get initial document count | |
collection_info = vector_store_manager.get_collection_info() | |
initial_count = collection_info.get("document_count", 0) | |
if initial_count == 0: | |
return True, "Vector store is already empty", {"cleared_documents": 0} | |
# Clear the collection using the vector store manager's method | |
success = vector_store_manager.clear_all_documents() | |
if not success: | |
return False, "Failed to clear vector store", {"error": "clear_all_documents returned False"} | |
logger.info(f"Cleared {initial_count} documents from vector store") | |
return True, f"Successfully cleared {initial_count} documents from vector store", { | |
"cleared_documents": initial_count, | |
"collection_name": collection_info.get("collection_name", "unknown") | |
} | |
except Exception as e: | |
error_msg = f"Error clearing vector store: {str(e)}" | |
logger.error(error_msg) | |
return False, error_msg, {"error": str(e)} | |
def clear_chat_history(self) -> Tuple[bool, str, Dict[str, Any]]: | |
""" | |
Clear all chat history files. | |
Returns: | |
Tuple of (success, message, stats) | |
""" | |
try: | |
_, chat_history_path = self.get_data_paths() | |
chat_dir = Path(chat_history_path) | |
if not chat_dir.exists(): | |
return True, "Chat history directory doesn't exist", {"cleared_files": 0} | |
# Count files before deletion | |
files_to_clear = list(chat_dir.rglob("*")) | |
file_count = len([f for f in files_to_clear if f.is_file()]) | |
if file_count == 0: | |
return True, "Chat history is already empty", {"cleared_files": 0} | |
# Clear all contents of the chat history directory | |
for item in chat_dir.iterdir(): | |
if item.is_file(): | |
item.unlink() | |
logger.debug(f"Removed file: {item}") | |
elif item.is_dir(): | |
shutil.rmtree(item) | |
logger.debug(f"Removed directory: {item}") | |
logger.info(f"Cleared {file_count} files from chat history") | |
return True, f"Successfully cleared {file_count} files from chat history", { | |
"cleared_files": file_count, | |
"chat_history_path": str(chat_dir) | |
} | |
except Exception as e: | |
error_msg = f"Error clearing chat history: {str(e)}" | |
logger.error(error_msg) | |
return False, error_msg, {"error": str(e)} | |
def clear_directory_contents(self, directory_path: str) -> Tuple[bool, str, int]: | |
""" | |
Clear all contents of a specific directory. | |
Args: | |
directory_path: Path to directory to clear | |
Returns: | |
Tuple of (success, message, items_cleared) | |
""" | |
try: | |
dir_path = Path(directory_path) | |
if not dir_path.exists(): | |
return True, f"Directory doesn't exist: {directory_path}", 0 | |
items_cleared = 0 | |
for item in dir_path.iterdir(): | |
try: | |
if item.is_file(): | |
item.unlink() | |
items_cleared += 1 | |
logger.debug(f"Removed file: {item}") | |
elif item.is_dir(): | |
shutil.rmtree(item) | |
items_cleared += 1 | |
logger.debug(f"Removed directory: {item}") | |
except Exception as e: | |
logger.warning(f"Failed to remove {item}: {e}") | |
return True, f"Cleared {items_cleared} items from {directory_path}", items_cleared | |
except Exception as e: | |
error_msg = f"Error clearing directory {directory_path}: {str(e)}" | |
logger.error(error_msg) | |
return False, error_msg, 0 | |
def clear_all_data(self) -> Tuple[bool, str, Dict[str, Any]]: | |
""" | |
Clear all RAG-related data (vector store + chat history). | |
Returns: | |
Tuple of (success, message, combined_stats) | |
""" | |
logger.info("Starting complete data clearing operation") | |
combined_stats = { | |
"vector_store": {}, | |
"chat_history": {}, | |
"total_cleared_documents": 0, | |
"total_cleared_files": 0, | |
"environment": "hf_space" if self.is_hf_space else "local", | |
"errors": [] | |
} | |
# Clear vector store | |
vs_success, vs_message, vs_stats = self.clear_vector_store() | |
combined_stats["vector_store"] = { | |
"success": vs_success, | |
"message": vs_message, | |
**vs_stats | |
} | |
if not vs_success: | |
combined_stats["errors"].append(f"Vector store: {vs_message}") | |
else: | |
combined_stats["total_cleared_documents"] = vs_stats.get("cleared_documents", 0) | |
# Clear chat history | |
ch_success, ch_message, ch_stats = self.clear_chat_history() | |
combined_stats["chat_history"] = { | |
"success": ch_success, | |
"message": ch_message, | |
**ch_stats | |
} | |
if not ch_success: | |
combined_stats["errors"].append(f"Chat history: {ch_message}") | |
else: | |
combined_stats["total_cleared_files"] = ch_stats.get("cleared_files", 0) | |
# Overall success | |
overall_success = vs_success and ch_success | |
if overall_success: | |
total_items = combined_stats["total_cleared_documents"] + combined_stats["total_cleared_files"] | |
if total_items == 0: | |
overall_message = "All data was already clear" | |
else: | |
overall_message = f"Successfully cleared all data: {combined_stats['total_cleared_documents']} documents, {combined_stats['total_cleared_files']} files" | |
else: | |
overall_message = f"Data clearing completed with errors: {'; '.join(combined_stats['errors'])}" | |
logger.info(f"Data clearing operation completed: {overall_message}") | |
return overall_success, overall_message, combined_stats | |
def get_data_status(self) -> Dict[str, Any]: | |
""" | |
Get current status of data directories and vector store. | |
Returns: | |
Dictionary with data status information | |
""" | |
try: | |
vector_store_path, chat_history_path = self.get_data_paths() | |
# Vector store status | |
collection_info = vector_store_manager.get_collection_info() | |
vs_document_count = collection_info.get("document_count", 0) | |
# Chat history status | |
chat_dir = Path(chat_history_path) | |
ch_file_count = 0 | |
if chat_dir.exists(): | |
ch_file_count = len([f for f in chat_dir.rglob("*") if f.is_file()]) | |
# Directory status | |
vs_dir = Path(vector_store_path) | |
vs_exists = vs_dir.exists() | |
ch_exists = chat_dir.exists() | |
status = { | |
"environment": "hf_space" if self.is_hf_space else "local", | |
"vector_store": { | |
"path": vector_store_path, | |
"exists": vs_exists, | |
"document_count": vs_document_count, | |
"collection_name": collection_info.get("collection_name", "unknown") | |
}, | |
"chat_history": { | |
"path": chat_history_path, | |
"exists": ch_exists, | |
"file_count": ch_file_count | |
}, | |
"total_data_items": vs_document_count + ch_file_count, | |
"has_data": vs_document_count > 0 or ch_file_count > 0 | |
} | |
return status | |
except Exception as e: | |
logger.error(f"Error getting data status: {e}") | |
return { | |
"error": str(e), | |
"environment": "hf_space" if self.is_hf_space else "local" | |
} | |
# Global data clearing service instance | |
data_clearing_service = DataClearingService() |