File size: 13,176 Bytes
9145e48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
import logging
import asyncio
import functools
from typing import Any, Callable, Dict, List, Optional
import time
import json
from pathlib import Path

logger = logging.getLogger(__name__)

def async_timer(func: Callable) -> Callable:
    """Decorator to time async function execution"""
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.time()
        try:
            result = await func(*args, **kwargs)
            end_time = time.time()
            logger.debug(f"{func.__name__} completed in {end_time - start_time:.3f}s")
            return result
        except Exception as e:
            end_time = time.time()
            logger.error(f"{func.__name__} failed after {end_time - start_time:.3f}s: {str(e)}")
            raise
    return wrapper

def retry_async(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0):
    """Decorator to retry async functions with exponential backoff"""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            attempt = 1
            current_delay = delay
            
            while attempt <= max_attempts:
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts:
                        logger.error(f"{func.__name__} failed after {max_attempts} attempts: {str(e)}")
                        raise
                    
                    logger.warning(f"{func.__name__} attempt {attempt} failed: {str(e)}")
                    logger.info(f"Retrying in {current_delay}s...")
                    
                    await asyncio.sleep(current_delay)
                    attempt += 1
                    current_delay *= backoff
            
        return wrapper
    return decorator

class MCPToolResponse:
    """Standardized response format for MCP tools"""
    
    def __init__(self, success: bool, data: Any = None, error: str = None, 
                 metadata: Dict[str, Any] = None):
        self.success = success
        self.data = data
        self.error = error
        self.metadata = metadata or {}
        self.timestamp = time.time()
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert response to dictionary"""
        result = {
            "success": self.success,
            "timestamp": self.timestamp
        }
        
        if self.success:
            result["data"] = self.data
        else:
            result["error"] = self.error
        
        if self.metadata:
            result["metadata"] = self.metadata
        
        return result
    
    @classmethod
    def success_response(cls, data: Any, metadata: Dict[str, Any] = None):
        """Create a success response"""
        return cls(success=True, data=data, metadata=metadata)
    
    @classmethod
    def error_response(cls, error: str, metadata: Dict[str, Any] = None):
        """Create an error response"""
        return cls(success=False, error=error, metadata=metadata)

def validate_required_params(params: Dict[str, Any], required: List[str]) -> Optional[str]:
    """Validate that required parameters are present"""
    missing = []
    for param in required:
        if param not in params or params[param] is None:
            missing.append(param)
    
    if missing:
        return f"Missing required parameters: {', '.join(missing)}"
    
    return None

def sanitize_filename(filename: str) -> str:
    """Sanitize filename for safe storage"""
    import re
    
    # Remove or replace invalid characters
    filename = re.sub(r'[<>:"/\\|?*]', '_', filename)
    
    # Remove leading/trailing dots and spaces
    filename = filename.strip('. ')
    
    # Limit length
    if len(filename) > 255:
        name, ext = Path(filename).stem, Path(filename).suffix
        max_name_len = 255 - len(ext)
        filename = name[:max_name_len] + ext
    
    # Ensure not empty
    if not filename:
        filename = "unnamed_file"
    
    return filename

def truncate_text(text: str, max_length: int, add_ellipsis: bool = True) -> str:
    """Truncate text to specified length"""
    if len(text) <= max_length:
        return text
    
    if add_ellipsis and max_length > 3:
        return text[:max_length - 3] + "..."
    else:
        return text[:max_length]

def extract_file_info(file_path: str) -> Dict[str, Any]:
    """Extract information about a file"""
    try:
        path = Path(file_path)
        stat = path.stat()
        
        return {
            "filename": path.name,
            "extension": path.suffix.lower(),
            "size_bytes": stat.st_size,
            "size_mb": round(stat.st_size / (1024 * 1024), 2),
            "created_time": stat.st_ctime,
            "modified_time": stat.st_mtime,
            "exists": path.exists(),
            "is_file": path.is_file(),
            "is_dir": path.is_dir()
        }
    except Exception as e:
        return {"error": str(e)}

async def batch_process(items: List[Any], processor: Callable, batch_size: int = 10, 
                       max_concurrent: int = 5) -> List[Any]:
    """Process items in batches with concurrency control"""
    results = []
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def process_item(item):
        async with semaphore:
            return await processor(item)
    
    # Process in batches
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        batch_tasks = [process_item(item) for item in batch]
        batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
        results.extend(batch_results)
    
    return results

def format_file_size(size_bytes: int) -> str:
    """Format file size in human-readable format"""
    for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
        if size_bytes < 1024.0:
            return f"{size_bytes:.1f} {unit}"
        size_bytes /= 1024.0
    return f"{size_bytes:.1f} PB"

def calculate_reading_time(text: str, words_per_minute: int = 200) -> int:
    """Calculate estimated reading time in minutes"""
    word_count = len(text.split())
    return max(1, round(word_count / words_per_minute))

class ProgressTracker:
    """Track progress of long-running operations"""
    
    def __init__(self, total_items: int, description: str = "Processing"):
        self.total_items = total_items
        self.completed_items = 0
        self.description = description
        self.start_time = time.time()
        self.errors = []
    
    def update(self, completed: int = 1, error: str = None):
        """Update progress"""
        self.completed_items += completed
        if error:
            self.errors.append(error)
    
    def get_progress(self) -> Dict[str, Any]:
        """Get current progress information"""
        elapsed_time = time.time() - self.start_time
        progress_percent = (self.completed_items / self.total_items) * 100 if self.total_items > 0 else 0
        
        # Estimate remaining time
        if self.completed_items > 0:
            avg_time_per_item = elapsed_time / self.completed_items
            remaining_items = self.total_items - self.completed_items
            estimated_remaining_time = avg_time_per_item * remaining_items
        else:
            estimated_remaining_time = 0
        
        return {
            "description": self.description,
            "total_items": self.total_items,
            "completed_items": self.completed_items,
            "progress_percent": round(progress_percent, 1),
            "elapsed_time_seconds": round(elapsed_time, 1),
            "estimated_remaining_seconds": round(estimated_remaining_time, 1),
            "errors_count": len(self.errors),
            "errors": self.errors[-5:] if self.errors else []  # Last 5 errors
        }
    
    def is_complete(self) -> bool:
        """Check if processing is complete"""
        return self.completed_items >= self.total_items

def load_json_config(config_path: str, default_config: Dict[str, Any] = None) -> Dict[str, Any]:
    """Load configuration from JSON file with fallback to defaults"""
    try:
        with open(config_path, 'r') as f:
            config = json.load(f)
        logger.info(f"Loaded configuration from {config_path}")
        return config
    except FileNotFoundError:
        logger.warning(f"Configuration file {config_path} not found, using defaults")
        return default_config or {}
    except json.JSONDecodeError as e:
        logger.error(f"Invalid JSON in configuration file {config_path}: {str(e)}")
        return default_config or {}

def save_json_config(config: Dict[str, Any], config_path: str) -> bool:
    """Save configuration to JSON file"""
    try:
        # Create directory if it doesn't exist
        Path(config_path).parent.mkdir(parents=True, exist_ok=True)
        
        with open(config_path, 'w') as f:
            json.dump(config, f, indent=2)
        
        logger.info(f"Saved configuration to {config_path}")
        return True
    except Exception as e:
        logger.error(f"Failed to save configuration to {config_path}: {str(e)}")
        return False

class RateLimiter:
    """Simple rate limiter for API calls"""
    
    def __init__(self, max_calls: int, time_window: float):
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = []
    
    async def acquire(self):
        """Acquire permission to make a call"""
        now = time.time()
        
        # Remove old calls outside the time window
        self.calls = [call_time for call_time in self.calls if now - call_time < self.time_window]
        
        # Check if we can make a new call
        if len(self.calls) >= self.max_calls:
            # Wait until we can make a call
            oldest_call = min(self.calls)
            wait_time = self.time_window - (now - oldest_call)
            if wait_time > 0:
                await asyncio.sleep(wait_time)
                return await self.acquire()  # Recursive call after waiting
        
        # Record this call
        self.calls.append(now)

def escape_markdown(text: str) -> str:
    """Escape markdown special characters"""
    import re
    
    # Characters that need escaping in markdown
    markdown_chars = r'([*_`\[\]()#+\-!\\])'
    return re.sub(markdown_chars, r'\\\1', text)

def create_error_summary(errors: List[Exception]) -> str:
    """Create a summary of multiple errors"""
    if not errors:
        return "No errors"
    
    error_counts = {}
    for error in errors:
        error_type = type(error).__name__
        error_counts[error_type] = error_counts.get(error_type, 0) + 1
    
    summary_parts = []
    for error_type, count in error_counts.items():
        if count == 1:
            summary_parts.append(f"1 {error_type}")
        else:
            summary_parts.append(f"{count} {error_type}s")
    
    return f"Encountered {len(errors)} total errors: " + ", ".join(summary_parts)

async def safe_execute(func: Callable, *args, default_return=None, **kwargs):
    """Safely execute a function and return default on error"""
    try:
        if asyncio.iscoroutinefunction(func):
            return await func(*args, **kwargs)
        else:
            return func(*args, **kwargs)
    except Exception as e:
        logger.error(f"Error executing {func.__name__}: {str(e)}")
        return default_return

def get_content_preview(content: str, max_length: int = 200) -> str:
    """Get a preview of content for display"""
    if not content:
        return "No content"
    
    # Clean up whitespace
    content = ' '.join(content.split())
    
    if len(content) <= max_length:
        return content
    
    # Try to break at sentence boundary
    preview = content[:max_length]
    last_sentence_end = max(preview.rfind('.'), preview.rfind('!'), preview.rfind('?'))
    
    if last_sentence_end > max_length * 0.7:  # If we found a good breaking point
        return preview[:last_sentence_end + 1]
    else:
        # Break at word boundary
        last_space = preview.rfind(' ')
        if last_space > max_length * 0.7:
            return preview[:last_space] + "..."
        else:
            return preview + "..."

class MemoryUsageTracker:
    """Track memory usage of operations"""
    
    def __init__(self):
        self.start_memory = self._get_memory_usage()
    
    def _get_memory_usage(self) -> float:
        """Get current memory usage in MB"""
        try:
            import psutil
            process = psutil.Process()
            return process.memory_info().rss / 1024 / 1024  # Convert to MB
        except ImportError:
            return 0.0
    
    def get_usage_delta(self) -> float:
        """Get memory usage change since initialization"""
        current_memory = self._get_memory_usage()
        return current_memory - self.start_memory
    
    def log_usage(self, operation_name: str):
        """Log current memory usage for an operation"""
        delta = self.get_usage_delta()
        logger.info(f"{operation_name} memory delta: {delta:.1f} MB")