File size: 10,091 Bytes
f46dfbd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
"""
Data clearing service for both local and Hugging Face Space environments.
Provides functionality to clear vector store and chat history data.
"""

import os
import shutil
from pathlib import Path
from typing import Dict, Any, Tuple, List
from src.core.config import config
from src.core.logging_config import get_logger
from src.rag.vector_store import vector_store_manager

logger = get_logger(__name__)


class DataClearingService:
    """Service for clearing all RAG-related data across different environments."""
    
    def __init__(self):
        """Initialize the data clearing service."""
        self.is_hf_space = bool(os.getenv("SPACE_ID"))
        logger.info(f"DataClearingService initialized (HF Space: {self.is_hf_space})")
    
    def get_data_paths(self) -> Tuple[str, str]:
        """
        Get the correct data paths for current environment.
        
        Returns:
            Tuple of (vector_store_path, chat_history_path)
        """
        vector_store_path = config.rag.vector_store_path
        chat_history_path = config.rag.chat_history_path
        
        logger.info(f"Data paths - Vector store: {vector_store_path}, Chat history: {chat_history_path}")
        return vector_store_path, chat_history_path
    
    def clear_vector_store(self) -> Tuple[bool, str, Dict[str, Any]]:
        """
        Clear all documents from the vector store.
        
        Returns:
            Tuple of (success, message, stats)
        """
        try:
            # Get initial document count
            collection_info = vector_store_manager.get_collection_info()
            initial_count = collection_info.get("document_count", 0)
            
            if initial_count == 0:
                return True, "Vector store is already empty", {"cleared_documents": 0}
            
            # Clear the collection using the vector store manager's method
            success = vector_store_manager.clear_all_documents()
            
            if not success:
                return False, "Failed to clear vector store", {"error": "clear_all_documents returned False"}
            
            logger.info(f"Cleared {initial_count} documents from vector store")
            
            return True, f"Successfully cleared {initial_count} documents from vector store", {
                "cleared_documents": initial_count,
                "collection_name": collection_info.get("collection_name", "unknown")
            }
            
        except Exception as e:
            error_msg = f"Error clearing vector store: {str(e)}"
            logger.error(error_msg)
            return False, error_msg, {"error": str(e)}
    
    def clear_chat_history(self) -> Tuple[bool, str, Dict[str, Any]]:
        """
        Clear all chat history files.
        
        Returns:
            Tuple of (success, message, stats)
        """
        try:
            _, chat_history_path = self.get_data_paths()
            chat_dir = Path(chat_history_path)
            
            if not chat_dir.exists():
                return True, "Chat history directory doesn't exist", {"cleared_files": 0}
            
            # Count files before deletion
            files_to_clear = list(chat_dir.rglob("*"))
            file_count = len([f for f in files_to_clear if f.is_file()])
            
            if file_count == 0:
                return True, "Chat history is already empty", {"cleared_files": 0}
            
            # Clear all contents of the chat history directory
            for item in chat_dir.iterdir():
                if item.is_file():
                    item.unlink()
                    logger.debug(f"Removed file: {item}")
                elif item.is_dir():
                    shutil.rmtree(item)
                    logger.debug(f"Removed directory: {item}")
            
            logger.info(f"Cleared {file_count} files from chat history")
            
            return True, f"Successfully cleared {file_count} files from chat history", {
                "cleared_files": file_count,
                "chat_history_path": str(chat_dir)
            }
            
        except Exception as e:
            error_msg = f"Error clearing chat history: {str(e)}"
            logger.error(error_msg)
            return False, error_msg, {"error": str(e)}
    
    def clear_directory_contents(self, directory_path: str) -> Tuple[bool, str, int]:
        """
        Clear all contents of a specific directory.
        
        Args:
            directory_path: Path to directory to clear
            
        Returns:
            Tuple of (success, message, items_cleared)
        """
        try:
            dir_path = Path(directory_path)
            
            if not dir_path.exists():
                return True, f"Directory doesn't exist: {directory_path}", 0
            
            items_cleared = 0
            for item in dir_path.iterdir():
                try:
                    if item.is_file():
                        item.unlink()
                        items_cleared += 1
                        logger.debug(f"Removed file: {item}")
                    elif item.is_dir():
                        shutil.rmtree(item)
                        items_cleared += 1
                        logger.debug(f"Removed directory: {item}")
                except Exception as e:
                    logger.warning(f"Failed to remove {item}: {e}")
            
            return True, f"Cleared {items_cleared} items from {directory_path}", items_cleared
            
        except Exception as e:
            error_msg = f"Error clearing directory {directory_path}: {str(e)}"
            logger.error(error_msg)
            return False, error_msg, 0
    
    def clear_all_data(self) -> Tuple[bool, str, Dict[str, Any]]:
        """
        Clear all RAG-related data (vector store + chat history).
        
        Returns:
            Tuple of (success, message, combined_stats)
        """
        logger.info("Starting complete data clearing operation")
        
        combined_stats = {
            "vector_store": {},
            "chat_history": {},
            "total_cleared_documents": 0,
            "total_cleared_files": 0,
            "environment": "hf_space" if self.is_hf_space else "local",
            "errors": []
        }
        
        # Clear vector store
        vs_success, vs_message, vs_stats = self.clear_vector_store()
        combined_stats["vector_store"] = {
            "success": vs_success,
            "message": vs_message,
            **vs_stats
        }
        
        if not vs_success:
            combined_stats["errors"].append(f"Vector store: {vs_message}")
        else:
            combined_stats["total_cleared_documents"] = vs_stats.get("cleared_documents", 0)
        
        # Clear chat history
        ch_success, ch_message, ch_stats = self.clear_chat_history()
        combined_stats["chat_history"] = {
            "success": ch_success,
            "message": ch_message,
            **ch_stats
        }
        
        if not ch_success:
            combined_stats["errors"].append(f"Chat history: {ch_message}")
        else:
            combined_stats["total_cleared_files"] = ch_stats.get("cleared_files", 0)
        
        # Overall success
        overall_success = vs_success and ch_success
        
        if overall_success:
            total_items = combined_stats["total_cleared_documents"] + combined_stats["total_cleared_files"]
            if total_items == 0:
                overall_message = "All data was already clear"
            else:
                overall_message = f"Successfully cleared all data: {combined_stats['total_cleared_documents']} documents, {combined_stats['total_cleared_files']} files"
        else:
            overall_message = f"Data clearing completed with errors: {'; '.join(combined_stats['errors'])}"
        
        logger.info(f"Data clearing operation completed: {overall_message}")
        
        return overall_success, overall_message, combined_stats
    
    def get_data_status(self) -> Dict[str, Any]:
        """
        Get current status of data directories and vector store.
        
        Returns:
            Dictionary with data status information
        """
        try:
            vector_store_path, chat_history_path = self.get_data_paths()
            
            # Vector store status
            collection_info = vector_store_manager.get_collection_info()
            vs_document_count = collection_info.get("document_count", 0)
            
            # Chat history status
            chat_dir = Path(chat_history_path)
            ch_file_count = 0
            if chat_dir.exists():
                ch_file_count = len([f for f in chat_dir.rglob("*") if f.is_file()])
            
            # Directory status
            vs_dir = Path(vector_store_path)
            vs_exists = vs_dir.exists()
            ch_exists = chat_dir.exists()
            
            status = {
                "environment": "hf_space" if self.is_hf_space else "local",
                "vector_store": {
                    "path": vector_store_path,
                    "exists": vs_exists,
                    "document_count": vs_document_count,
                    "collection_name": collection_info.get("collection_name", "unknown")
                },
                "chat_history": {
                    "path": chat_history_path,
                    "exists": ch_exists,
                    "file_count": ch_file_count
                },
                "total_data_items": vs_document_count + ch_file_count,
                "has_data": vs_document_count > 0 or ch_file_count > 0
            }
            
            return status
            
        except Exception as e:
            logger.error(f"Error getting data status: {e}")
            return {
                "error": str(e),
                "environment": "hf_space" if self.is_hf_space else "local"
            }


# Global data clearing service instance
data_clearing_service = DataClearingService()