| | import os |
| | import shutil |
| | from typing import Dict, Any, List |
| | from pathlib import Path |
| | from datetime import datetime |
| | from abc import abstractmethod |
| |
|
| | from .storage_base import StorageBase |
| | from ..core.logging import logger |
| |
|
| |
|
| | class FileStorageHandler(StorageBase): |
| | """ |
| | Reference implementation showing all available _raw_xxx methods. |
| | This class serves as a template for developers creating new storage handlers. |
| | Concrete handlers only need to implement the _raw_xxx methods they need. |
| | """ |
| | |
| | def __init__(self, base_path: str = ".", **kwargs): |
| | """ |
| | Initialize the storage handler. |
| | |
| | Args: |
| | base_path (str): Base directory for storage operations (default: current directory) |
| | **kwargs: Additional keyword arguments for parent class initialization |
| | """ |
| | super().__init__(base_path=base_path, **kwargs) |
| | |
| | |
| | def create(self, file_path: str, content: Any, **kwargs) -> Dict[str, Any]: |
| | return super().save(file_path, content, **kwargs) |
| | |
| | def read(self, file_path: str, **kwargs) -> Dict[str, Any]: |
| | return super().read(file_path, **kwargs) |
| | |
| | def list(self, path: str = None, max_depth: int = 3, include_hidden: bool = False) -> Dict[str, Any]: |
| | return super().list(path, max_depth, include_hidden) |
| | |
| | def delete(self, file_path: str, **kwargs) -> Dict[str, Any]: |
| | return super().delete(file_path, **kwargs) |
| | |
| | def move(self, source: str, destination: str, **kwargs) -> Dict[str, Any]: |
| | return super().move(source, destination, **kwargs) |
| | |
| | def copy(self, source: str, destination: str, **kwargs) -> Dict[str, Any]: |
| | return super().copy(source, destination, **kwargs) |
| | |
| | def create_directory(self, path: str, **kwargs) -> Dict[str, Any]: |
| | return super().create_directory(path, **kwargs) |
| | |
| | |
| | |
| | |
| | @abstractmethod |
| | def _initialize_storage(self): |
| | """Initialize storage - must be implemented by subclasses""" |
| | pass |
| | |
| | @abstractmethod |
| | def _read_raw(self, path: str, **kwargs) -> bytes: |
| | """Read raw file content - must be implemented by subclasses""" |
| | pass |
| | |
| | @abstractmethod |
| | def _write_raw(self, path: str, content: bytes, **kwargs) -> bool: |
| | """Write raw file content - must be implemented by subclasses""" |
| | pass |
| | |
| | @abstractmethod |
| | def _delete_raw(self, path: str) -> bool: |
| | """Delete file or directory - must be implemented by subclasses""" |
| | pass |
| | |
| | @abstractmethod |
| | def _exists_raw(self, path: str) -> bool: |
| | """Check if path exists - must be implemented by subclasses""" |
| | pass |
| | |
| | @abstractmethod |
| | def _create_directory_raw(self, path: str) -> bool: |
| | """Create directory - must be implemented by subclasses""" |
| | pass |
| | |
| | @abstractmethod |
| | def _list_raw(self, path: str = None, **kwargs) -> List[Dict[str, Any]]: |
| | """List files and directories - must be implemented by subclasses""" |
| | pass |
| | |
| | |
| |
|
| |
|
| | |
| | def create_file(self, file_path: str, content: Any, **kwargs) -> Dict[str, Any]: |
| | return self.save(file_path, content, **kwargs) |
| | |
| | def read_file(self, file_path: str, **kwargs) -> Dict[str, Any]: |
| | return self.read(file_path, **kwargs) |
| | |
| | def list_files(self, path: str = None, max_depth: int = 3, include_hidden: bool = False) -> Dict[str, Any]: |
| | return self.list(path, max_depth, include_hidden) |
| | |
| | def delete_file(self, file_path: str, **kwargs) -> Dict[str, Any]: |
| | return self.delete(file_path, **kwargs) |
| | |
| | def move_file(self, source: str, destination: str, **kwargs) -> Dict[str, Any]: |
| | return self.move(source, destination, **kwargs) |
| | |
| | def copy_file(self, source: str, destination: str, **kwargs) -> Dict[str, Any]: |
| | return self.copy(source, destination, **kwargs) |
| |
|
| |
|
| | class LocalStorageHandler(FileStorageHandler): |
| | """ |
| | Local filesystem storage implementation. |
| | Provides all file operations for local storage with default working directory. |
| | """ |
| | |
| | def __init__(self, base_path: str = ".", **kwargs): |
| | """ |
| | Initialize local storage handler. |
| | |
| | Args: |
| | base_path (str): Base directory for storage operations (default: current directory) |
| | **kwargs: Additional keyword arguments for parent class initialization |
| | """ |
| | super().__init__(base_path=base_path, **kwargs) |
| | |
| | def _initialize_storage(self): |
| | """Initialize local storage - ensure base directory exists""" |
| | try: |
| | |
| | Path(self.base_path).mkdir(parents=True, exist_ok=True) |
| | logger.info(f"Local storage initialized with base path: {self.base_path}") |
| | except Exception as e: |
| | logger.error(f"Error initializing local storage: {str(e)}") |
| | raise |
| | |
| | def _read_raw(self, path: str, **kwargs) -> bytes: |
| | """Read raw file content from local filesystem""" |
| | try: |
| | with open(path, 'rb') as f: |
| | return f.read() |
| | except Exception as e: |
| | logger.error(f"Error reading file {path}: {str(e)}") |
| | raise |
| | |
| | def _write_raw(self, path: str, content: bytes, **kwargs) -> bool: |
| | """Write raw file content to local filesystem""" |
| | try: |
| | |
| | Path(path).parent.mkdir(parents=True, exist_ok=True) |
| | with open(path, 'wb') as f: |
| | f.write(content) |
| | return True |
| | except Exception as e: |
| | logger.error(f"Error writing file {path}: {str(e)}") |
| | return False |
| | |
| | def _delete_raw(self, path: str) -> bool: |
| | """Delete file or directory from local filesystem""" |
| | try: |
| | path_obj = Path(path) |
| | if path_obj.is_file(): |
| | path_obj.unlink() |
| | elif path_obj.is_dir(): |
| | shutil.rmtree(path_obj) |
| | else: |
| | return False |
| | return True |
| | except Exception as e: |
| | logger.error(f"Error deleting {path}: {str(e)}") |
| | return False |
| | |
| | def _list_raw(self, path: str = None, max_depth: int = 3, include_hidden: bool = False) -> List[Dict[str, Any]]: |
| | """List files and directories in local filesystem""" |
| | try: |
| | if path is None: |
| | path = str(self.base_path) |
| | |
| | path_obj = Path(path) |
| | if not path_obj.exists() or not path_obj.is_dir(): |
| | return [] |
| | |
| | items = [] |
| | |
| | def scan_directory(current_path: Path, current_depth: int): |
| | if current_depth > max_depth: |
| | return |
| | |
| | try: |
| | for item in current_path.iterdir(): |
| | |
| | if not include_hidden and item.name.startswith('.'): |
| | continue |
| | |
| | try: |
| | stat = item.stat() |
| | item_info = { |
| | "name": item.name, |
| | "path": str(item), |
| | "type": "directory" if item.is_dir() else "file", |
| | "size_bytes": stat.st_size if item.is_file() else 0, |
| | "size_mb": round(stat.st_size / (1024 * 1024), 2) if item.is_file() else 0, |
| | "modified_time": datetime.fromtimestamp(stat.st_mtime).isoformat(), |
| | "extension": item.suffix.lower() if item.is_file() else "", |
| | "is_hidden": item.name.startswith('.') |
| | } |
| | |
| | items.append(item_info) |
| | |
| | |
| | if item.is_dir() and current_depth < max_depth: |
| | scan_directory(item, current_depth + 1) |
| | |
| | except (PermissionError, OSError): |
| | |
| | continue |
| | |
| | except (PermissionError, OSError) as e: |
| | logger.warning(f"Error scanning directory {current_path}: {str(e)}") |
| | |
| | scan_directory(path_obj, 0) |
| | return items |
| | |
| | except Exception as e: |
| | logger.error(f"Error listing directory {path}: {str(e)}") |
| | return [] |
| | |
| | def _exists_raw(self, path: str) -> bool: |
| | """Check if path exists in local filesystem""" |
| | return Path(path).exists() |
| | |
| | def _create_directory_raw(self, path: str) -> bool: |
| | """Create directory in local filesystem""" |
| | try: |
| | Path(path).mkdir(parents=True, exist_ok=True) |
| | return True |
| | except Exception as e: |
| | logger.error(f"Error creating directory {path}: {str(e)}") |
| | return False |
| | |
| |
|
| |
|
| | class SupabaseStorageHandler(FileStorageHandler): |
| | """ |
| | Supabase remote storage implementation. |
| | Provides file operations via Supabase Storage API with environment-based configuration. |
| | """ |
| | |
| | def __init__(self, bucket_name: str = None, base_path: str = "/", **kwargs): |
| | """ |
| | Initialize Supabase storage handler. |
| | |
| | Args: |
| | bucket_name: Supabase storage bucket name (default: from environment or "default") |
| | base_path: Base path for storage operations (default: "/") |
| | **kwargs: Additional keyword arguments for parent class initialization |
| | """ |
| | |
| | super().__init__(base_path=base_path, **kwargs) |
| | |
| | |
| | self.bucket_name = bucket_name or os.getenv("SUPABASE_BUCKET_STORAGE") or "default" |
| | self.supabase_url = os.getenv("SUPABASE_URL_STORAGE") |
| | self.supabase_key = os.getenv("SUPABASE_KEY_STORAGE") |
| | |
| | if not self.supabase_url or not self.supabase_key: |
| | raise ValueError( |
| | "Supabase configuration not found in environment variables. " |
| | "Please set SUPABASE_URL/SUPABASE_KEY environment variables." |
| | ) |
| | |
| | |
| | try: |
| | from supabase import create_client, Client |
| | logger.info(f"Creating Supabase client with URL: {self.supabase_url[:30]}...") |
| | self.supabase: Client = create_client(self.supabase_url, self.supabase_key) |
| | logger.info(f"Successfully initialized Supabase client for bucket: {bucket_name}") |
| | except ImportError: |
| | raise ImportError( |
| | "Supabase Python client not installed. " |
| | "Please install it with: pip install supabase" |
| | ) |
| | except Exception as e: |
| | logger.error(f"Failed to initialize Supabase client: {str(e)}") |
| | raise Exception(f"Failed to initialize Supabase client: {str(e)}") |
| | |
| | |
| | self._initialize_storage() |
| | |
| | def _initialize_storage(self): |
| | """Initialize remote storage - verify bucket exists and is accessible""" |
| | |
| | if not hasattr(self, 'bucket_name') or not hasattr(self, 'supabase'): |
| | |
| | |
| | return |
| | |
| | try: |
| | |
| | logger.info(f"Testing bucket access for: {self.bucket_name}") |
| | self.supabase.storage.from_(self.bucket_name).list() |
| | logger.info(f"Successfully connected to Supabase bucket: {self.bucket_name}") |
| | except Exception as e: |
| | logger.warning(f"Could not verify bucket access: {str(e)}") |
| | |
| | |
| | def translate_in(self, file_path: str) -> str: |
| | """Resolve file path for remote storage""" |
| | |
| | |
| | if self.base_path == "/": |
| | |
| | return file_path.lstrip('/') |
| | else: |
| | |
| | return super().translate_in(file_path) |
| | |
| | def _read_raw(self, path: str, **kwargs) -> bytes: |
| | """Read raw file content from Supabase Storage""" |
| | try: |
| | |
| | file_path = path.lstrip('/') |
| | |
| | |
| | response = self.supabase.storage.from_(self.bucket_name).download(file_path) |
| | |
| | if isinstance(response, bytes): |
| | return response |
| | else: |
| | |
| | return bytes(response) if response else b"" |
| | |
| | except Exception as e: |
| | logger.error(f"Error reading file {path} from Supabase: {str(e)}") |
| | raise |
| | |
| | def _write_raw(self, path: str, content: bytes, **kwargs) -> bool: |
| | """Write raw file content to Supabase Storage with smart insert/update logic""" |
| | try: |
| | |
| | file_path = path.lstrip('/') |
| | |
| | |
| | file_exists = self._exists_raw(file_path) |
| | |
| | if file_exists: |
| | |
| | logger.info(f"File {file_path} exists, using update method") |
| | response = self.supabase.storage.from_(self.bucket_name).update( |
| | path=file_path, |
| | file=content, |
| | file_options={ |
| | "content-type": kwargs.get("content_type", "application/octet-stream"), |
| | "upsert": "true" |
| | } |
| | ) |
| | else: |
| | |
| | logger.info(f"File {file_path} doesn't exist, using upload method") |
| | response = self.supabase.storage.from_(self.bucket_name).upload( |
| | path=file_path, |
| | file=content, |
| | file_options={"content-type": kwargs.get("content_type", "application/octet-stream")} |
| | ) |
| | |
| | |
| | if response and (not isinstance(response, dict) or response.get("error") is None): |
| | operation = "updated" if file_exists else "uploaded" |
| | logger.info(f"Successfully {operation} file to Supabase: {file_path}") |
| | return True |
| | else: |
| | logger.error(f"Operation failed: {response}") |
| | return False |
| | |
| | except Exception as e: |
| | logger.error(f"Error writing file {path} to Supabase: {str(e)}") |
| | return False |
| | |
| | def _delete_raw(self, path: str) -> bool: |
| | """Delete file from Supabase Storage""" |
| | try: |
| | |
| | file_path = path.lstrip('/') |
| | |
| | |
| | response = self.supabase.storage.from_(self.bucket_name).remove([file_path]) |
| | |
| | |
| | |
| | if response is not None: |
| | if isinstance(response, list): |
| | |
| | logger.info(f"Successfully deleted file from Supabase: {file_path}") |
| | return True |
| | elif isinstance(response, dict) and response.get("error") is None: |
| | |
| | logger.info(f"Successfully deleted file from Supabase: {file_path}") |
| | return True |
| | else: |
| | logger.error(f"Deletion failed: {response}") |
| | return False |
| | else: |
| | logger.error(f"Deletion failed: {response}") |
| | return False |
| | |
| | except Exception as e: |
| | logger.error(f"Error deleting {path} from Supabase: {str(e)}") |
| | return False |
| | |
| | def _list_raw(self, path: str = None, max_depth: int = 3, include_hidden: bool = False) -> List[Dict[str, Any]]: |
| | """List files in Supabase Storage""" |
| | try: |
| | |
| | list_path = (path or self.base_path).lstrip('/') |
| | |
| | |
| | response = self.supabase.storage.from_(self.bucket_name).list(list_path) |
| | |
| | items = [] |
| | if response and isinstance(response, list): |
| | for item in response: |
| | |
| | if not include_hidden and item.get('name', '').startswith('.'): |
| | continue |
| | |
| | |
| | full_path = f"{list_path}/{item['name']}" if list_path else item['name'] |
| | |
| | items.append({ |
| | "name": item.get('name', ''), |
| | "path": full_path, |
| | "type": "directory" if item.get('metadata', {}).get('mimetype') == 'application/x-directory' else "file", |
| | "size_bytes": item.get('metadata', {}).get('size', 0), |
| | "size_mb": round(item.get('metadata', {}).get('size', 0) / (1024 * 1024), 2), |
| | "modified_time": item.get('updated_at', ''), |
| | "extension": Path(item.get('name', '')).suffix.lower(), |
| | "is_hidden": item.get('name', '').startswith('.'), |
| | "mime_type": item.get('metadata', {}).get('mimetype', '') |
| | }) |
| | |
| | return items |
| | |
| | except Exception as e: |
| | logger.error(f"Error listing directory {path} from Supabase: {str(e)}") |
| | return [] |
| | |
| | def _exists_raw(self, path: str) -> bool: |
| | """Check if path exists in Supabase Storage""" |
| | try: |
| | |
| | file_path = path.lstrip('/') |
| | |
| | |
| | parent_dir = os.path.dirname(file_path) |
| | file_name = os.path.basename(file_path) |
| | |
| | |
| | if not parent_dir: |
| | parent_dir = "" |
| | |
| | try: |
| | |
| | response = self.supabase.storage.from_(self.bucket_name).list(parent_dir) |
| | |
| | if response and isinstance(response, list): |
| | |
| | for item in response: |
| | if item.get('name') == file_name: |
| | return True |
| | |
| | return False |
| | |
| | except Exception as e: |
| | logger.warning(f"Error listing directory {parent_dir}: {str(e)}") |
| | return False |
| | |
| | except Exception as e: |
| | logger.warning(f"Error checking if file {path} exists: {str(e)}") |
| | return False |
| | |
| | def _create_directory_raw(self, path: str) -> bool: |
| | """Create directory in Supabase Storage""" |
| | try: |
| | |
| | dir_path = path.lstrip('/') |
| | |
| | |
| | placeholder_content = b"# Directory placeholder" |
| | placeholder_path = f"{dir_path}/.placeholder" |
| | |
| | response = self.supabase.storage.from_(self.bucket_name).upload( |
| | path=placeholder_path, |
| | file=placeholder_content, |
| | file_options={"content-type": "text/plain"} |
| | ) |
| | |
| | |
| | if response and not isinstance(response, dict) or response.get("error") is None: |
| | return True |
| | else: |
| | logger.error(f"Directory creation failed: {response}") |
| | return False |
| | |
| | except Exception as e: |
| | logger.error(f"Error creating directory {path} in Supabase: {str(e)}") |
| | return False |
| | |
| | |