# coding: utf-8 """ oss.py Utility class for OSS (Object Storage Service) operations. Provides simple methods for data operations: upload, read, delete, update. """ import os import json import tempfile from typing import Optional, Dict, List, Any, Tuple, Union, BinaryIO, TextIO, IO, AnyStr from aworld.utils import import_package from aworld.logs.util import logger class OSSClient: """ A utility class for OSS (Object Storage Service) operations. Provides methods for data operations: upload, read, delete, update. """ def __init__(self, access_key_id: Optional[str] = None, access_key_secret: Optional[str] = None, endpoint: Optional[str] = None, bucket_name: Optional[str] = None, enable_export: Optional[bool] = None): """ Initialize OSSClient with credentials. Args: access_key_id: OSS access key ID. If None, will try to get from environment variable OSS_ACCESS_KEY_ID access_key_secret: OSS access key secret. If None, will try to get from environment variable OSS_ACCESS_KEY_SECRET endpoint: OSS endpoint. If None, will try to get from environment variable OSS_ENDPOINT bucket_name: OSS bucket name. If None, will try to get from environment variable OSS_BUCKET_NAME enable_export: Whether to enable OSS export. If None, will try to get from environment variable EXPORT_REPLAY_TRACE_TO_OSS """ self.access_key_id = access_key_id or os.getenv('OSS_ACCESS_KEY_ID') self.access_key_secret = access_key_secret or os.getenv('OSS_ACCESS_KEY_SECRET') self.endpoint = endpoint or os.getenv('OSS_ENDPOINT') self.bucket_name = bucket_name or os.getenv('OSS_BUCKET_NAME') self.enable_export = enable_export if enable_export is not None else os.getenv("EXPORT_REPLAY_TRACE_TO_OSS", "false").lower() == "true" self.bucket = None self._initialized = False def initialize(self) -> bool: """ Initialize the OSS client with the provided or environment credentials. Returns: bool: True if initialization is successful, False otherwise """ if self._initialized: return True if not self.enable_export: logger.info("OSS export is disabled. Set EXPORT_REPLAY_TRACE_TO_OSS=true to enable.") return False if not all([self.access_key_id, self.access_key_secret, self.endpoint, self.bucket_name]): logger.warn( "Missing required OSS credentials. Please provide all required parameters or set environment variables.") return False try: import_package("oss2") import oss2 auth = oss2.Auth(self.access_key_id, self.access_key_secret) self.bucket = oss2.Bucket(auth, self.endpoint, self.bucket_name) self._initialized = True return True except ImportError: logger.warn("Failed to import oss2 module. Please install it with 'pip install oss2'.") return False except Exception as e: logger.warn(f"Failed to initialize OSS client. Error: {str(e)}") return False # ---- Basic Data Operation Methods ---- def upload_data(self, data: Union[IO[AnyStr], str, bytes, dict], oss_key: str) -> Optional[str]: """ Upload data to OSS. Supports various types of data: - In-memory file objects (IO[AnyStr]) - Strings (str) - Bytes (bytes) - Dictionaries (dict), will be automatically converted to JSON - File paths (str) Args: data: Data to upload, can be a file object or other supported types oss_key: The key (path) in OSS where the data will be stored Returns: str: The OSS key if successful, None otherwise """ if not self.initialize(): logger.warn("OSS client not initialized or export is disabled") return None try: # Handle file objects if hasattr(data, 'read'): content = data.read() if isinstance(content, str): content = content.encode('utf-8') self.bucket.put_object(oss_key, content) logger.info(f"Successfully uploaded memory file to OSS: {oss_key}") return oss_key # Handle dictionaries if isinstance(data, dict): content = json.dumps(data, ensure_ascii=False).encode('utf-8') self.bucket.put_object(oss_key, content) return oss_key # Handle strings if isinstance(data, str): # Check if it's a file path if os.path.isfile(data): self.bucket.put_object_from_file(oss_key, data) logger.info(f"Successfully uploaded file {data} to OSS: {oss_key}") return oss_key # Otherwise treat as string content content = data.encode('utf-8') self.bucket.put_object(oss_key, content) return oss_key # Handle bytes self.bucket.put_object(oss_key, data) logger.info(f"Successfully uploaded data to OSS: {oss_key}") return oss_key except Exception as e: logger.warn(f"Failed to upload data to OSS: {str(e)}") return None def read_data(self, oss_key: str, as_json: bool = False) -> Union[bytes, dict, str, None]: """ Read data from OSS. Args: oss_key: The key (path) in OSS of the data to read as_json: If True, parse the data as JSON and return a dict Returns: The data as bytes, dict (if as_json=True), or None if failed """ if not self.initialize(): logger.warn("OSS client not initialized or export is disabled") return None try: # Read data result = self.bucket.get_object(oss_key) data = result.read() # Convert to string or JSON if requested if as_json: return json.loads(data) return data except Exception as e: logger.warn(f"Failed to read data from OSS: {str(e)}") return None def read_text(self, oss_key: str) -> Optional[str]: """ Read text data from OSS. Args: oss_key: The key (path) in OSS of the text to read Returns: str: The text data, or None if failed """ data = self.read_data(oss_key) if data is not None: try: return data.decode('utf-8') except Exception as e: logger.warn(f"Failed to decode data as UTF-8: {str(e)}") return None def delete_data(self, oss_key: str) -> bool: """ Delete data from OSS. Args: oss_key: The key (path) in OSS of the data to delete Returns: bool: True if successful, False otherwise """ if not self.initialize(): logger.warn("OSS client not initialized or export is disabled") return False try: self.bucket.delete_object(oss_key) logger.info(f"Successfully deleted data from OSS: {oss_key}") return True except Exception as e: logger.warn(f"Failed to delete data from OSS: {str(e)}") return False def update_data(self, oss_key: str, data: Union[IO[AnyStr], str, bytes, dict]) -> Optional[str]: """ Update data in OSS (delete and upload). Args: oss_key: The key (path) in OSS of the data to update data: New data to upload, can be a file object or other supported types Returns: str: The OSS key if successful, None otherwise """ # For OSS, update is the same as upload (it overwrites) return self.upload_data(data, oss_key) def update_json(self, oss_key: str, update_dict: dict) -> Optional[str]: """ Update JSON data in OSS by merging with existing data. Args: oss_key: The key (path) in OSS of the JSON data to update update_dict: Dictionary with fields to update Returns: str: The OSS key if successful, None otherwise """ if not self.initialize(): logger.warn("OSS client not initialized or export is disabled") return None try: # Read existing data existing_data = self.read_data(oss_key, as_json=True) if existing_data is None: existing_data = {} # Update data if isinstance(existing_data, dict): existing_data.update(update_dict) else: logger.warn(f"Existing data is not a dictionary: {oss_key}") return None # Upload updated data return self.upload_data(existing_data, oss_key) except Exception as e: logger.warn(f"Failed to update JSON data in OSS: {str(e)}") return None # ---- File Operation Methods ---- def upload_file(self, local_file: str, oss_key: Optional[str] = None) -> Optional[str]: """ Upload a local file to OSS. Args: local_file: Path to the local file oss_key: The key (path) in OSS where the file will be stored. If None, will use the basename of the local file Returns: str: The OSS key if successful, None otherwise """ if not self.initialize(): logger.warn("OSS client not initialized or export is disabled") return None try: if not os.path.exists(local_file): logger.warn(f"Local file {local_file} does not exist") return None if oss_key is None: oss_key = f"uploads/{os.path.basename(local_file)}" self.bucket.put_object_from_file(oss_key, local_file) logger.info(f"Successfully uploaded {local_file} to OSS: {oss_key}") return oss_key except Exception as e: logger.warn(f"Failed to upload {local_file} to OSS: {str(e)}") return None def download_file(self, oss_key: str, local_file: str) -> bool: """ Download a file from OSS to local. Args: oss_key: The key (path) in OSS of the file to download local_file: Path where the downloaded file will be saved Returns: bool: True if successful, False otherwise """ if not self.initialize(): logger.warn("OSS client not initialized or export is disabled") return False try: # Ensure the directory exists os.makedirs(os.path.dirname(os.path.abspath(local_file)), exist_ok=True) # Download the file self.bucket.get_object_to_file(oss_key, local_file) logger.info(f"Successfully downloaded {oss_key} to {local_file}") return True except Exception as e: logger.warn(f"Failed to download {oss_key} from OSS: {str(e)}") return False def list_objects(self, prefix: str = "", delimiter: str = "") -> List[Dict[str, Any]]: """ List objects in the OSS bucket with the given prefix. Args: prefix: Prefix to filter objects delimiter: Delimiter for hierarchical listing Returns: List of objects with their properties """ if not self.initialize(): logger.warn("OSS client not initialized or export is disabled") return [] try: result = [] for obj in self.bucket.list_objects(prefix=prefix, delimiter=delimiter).object_list: result.append({ 'key': obj.key, 'size': obj.size, 'last_modified': obj.last_modified }) return result except Exception as e: logger.warn(f"Failed to list objects with prefix {prefix}: {str(e)}") return [] # ---- Advanced Operation Methods ---- def exists(self, oss_key: str) -> bool: """ Check if an object exists in OSS. Args: oss_key: The key (path) in OSS to check Returns: bool: True if the object exists, False otherwise """ if not self.initialize(): logger.warn("OSS client not initialized or export is disabled") return False try: # Use head_object to check if the object exists self.bucket.head_object(oss_key) return True except: return False def copy_object(self, source_key: str, target_key: str) -> bool: """ Copy an object within the same bucket. Args: source_key: The source object key target_key: The target object key Returns: bool: True if successful, False otherwise """ if not self.initialize(): logger.warn("OSS client not initialized or export is disabled") return False try: self.bucket.copy_object(self.bucket_name, source_key, target_key) logger.info(f"Successfully copied {source_key} to {target_key}") return True except Exception as e: logger.warn(f"Failed to copy {source_key} to {target_key}: {str(e)}") return False def get_object_url(self, oss_key: str, expires: int = 3600) -> Optional[str]: """ Generate a temporary URL for accessing an object. Args: oss_key: The key (path) in OSS of the object expires: URL expiration time in seconds Returns: str: The signed URL, or None if failed """ if not self.initialize(): logger.warn("OSS client not initialized or export is disabled") return None try: url = self.bucket.sign_url('GET', oss_key, expires) return url except Exception as e: logger.warn(f"Failed to generate URL for {oss_key}: {str(e)}") return None def upload_directory(self, local_dir: str, oss_prefix: str = "") -> Tuple[bool, List[str]]: """ Upload an entire directory to OSS. Args: local_dir: Path to the local directory oss_prefix: Prefix to prepend to all uploaded files Returns: Tuple of (success, list of uploaded files) """ if not self.initialize(): logger.warn("OSS client not initialized or export is disabled") return False, [] if not os.path.isdir(local_dir): logger.warn(f"Local directory {local_dir} does not exist or is not a directory") return False, [] uploaded_files = [] errors = [] for root, _, files in os.walk(local_dir): for file in files: local_file = os.path.join(root, file) rel_path = os.path.relpath(local_file, local_dir) oss_key = os.path.join(oss_prefix, rel_path).replace("\\", "/") result = self.upload_file(local_file, oss_key) if result: uploaded_files.append(result) else: errors.append(local_file) if errors: logger.warn(f"Failed to upload {len(errors)} files") return False, uploaded_files return True, uploaded_files def get_oss_client(access_key_id: Optional[str] = None, access_key_secret: Optional[str] = None, endpoint: Optional[str] = None, bucket_name: Optional[str] = None, enable_export: Optional[bool] = None) -> OSSClient: """ Factory function to create and initialize an OSSClient. Args: access_key_id: OSS access key ID access_key_secret: OSS access key secret endpoint: OSS endpoint bucket_name: OSS bucket name enable_export: Whether to enable OSS export Returns: OSSClient: An initialized OSSClient instance """ client = OSSClient( access_key_id=access_key_id, access_key_secret=access_key_secret, endpoint=endpoint, bucket_name=bucket_name, enable_export=enable_export ) client.initialize() return client # ---- Test Cases ---- if __name__ == "__main__": """ OSS tool class test cases Note: Before running the tests, you need to set the following environment variables, or provide the parameters directly in the test code: - OSS_ACCESS_KEY_ID - OSS_ACCESS_KEY_SECRET - OSS_ENDPOINT - OSS_BUCKET_NAME - EXPORT_REPLAY_TRACE_TO_OSS=true """ import io import time # Test configuration TEST_PREFIX = f"test/oss_utils_123" # Use timestamp to avoid conflicts # Initialize client # Method 1: Using environment variables # oss_client = get_oss_client(enable_export=True) # Method 2: Provide parameters directly oss_client = get_oss_client( access_key_id="", # Replace with your actual access key ID access_key_secret="", # Replace with your actual access key secret endpoint="", # Replace with your actual OSS endpoint bucket_name="", # Replace with your actual bucket name enable_export=True ) text_key = f"{TEST_PREFIX}/text.txt" result = oss_client.upload_data("malai This is a test text", text_key) print(f"Upload string data: {'Success: ' + result if result else 'Failed'}") print("\nTest 6: Read text data") content = oss_client.read_text(text_key) print(f"Read text data: {content}") # Test 1: Upload string data print("\nTest 1: Upload string data") text_key = f"{TEST_PREFIX}/text.txt" result = oss_client.upload_data("This is a test text", text_key) print(f"Upload string data: {'Success: ' + result if result else 'Failed'}") # Test 2: Upload dictionary data (automatically converted to JSON) print("\nTest 2: Upload dictionary data") json_key = f"{TEST_PREFIX}/data.json" data = { "name": "Test data", "values": [1, 2, 3], "nested": { "key": "value" } } result = oss_client.upload_data(data, json_key) print(f"Upload dictionary data: {'Success: ' + result if result else 'Failed'}") # Test 3: Upload in-memory binary file object print("\nTest 3: Upload in-memory binary file object") binary_key = f"{TEST_PREFIX}/binary.dat" binary_data = io.BytesIO(b"\x00\x01\x02\x03\x04") result = oss_client.upload_data(binary_data, binary_key) print(f"Upload binary file object: {'Success: ' + result if result else 'Failed'}") # Test 4: Upload in-memory text file object print("\nTest 4: Upload in-memory text file object") text_file_key = f"{TEST_PREFIX}/text_file.txt" text_file = io.StringIO("This is the content of an in-memory text file") result = oss_client.upload_data(text_file, text_file_key) print(f"Upload text file object: {'Success: ' + result if result else 'Failed'}") # Test 5: Create and upload temporary file print("\nTest 5: Create and upload temporary file") with tempfile.NamedTemporaryFile(delete=False) as tmp: tmp.write(b"This is the content of a temporary file") tmp_path = tmp.name file_key = f"{TEST_PREFIX}/temp_file.txt" result = oss_client.upload_file(tmp_path, file_key) print(f"Upload temporary file: {'Success: ' + result if result else 'Failed'}") os.unlink(tmp_path) # Delete temporary file # Test 6: Read text data print("\nTest 6: Read text data") content = oss_client.read_text(text_key) print(f"Read text data: {content}") # Test 7: Read JSON data print("\nTest 7: Read JSON data") json_content = oss_client.read_data(json_key, as_json=True) print(f"Read JSON data: {json_content}") # Test 8: Update JSON data (merge method) print("\nTest 8: Update JSON data") update_data = {"updated": True, "timestamp": time.time()} result = oss_client.update_json(json_key, update_data) print(f"Update JSON data: {'Success: ' + result if result else 'Failed'}") # View updated JSON data updated_json = oss_client.read_data(json_key, as_json=True) print(f"Updated JSON data: {updated_json}") # Test 9: Overwrite existing data print("\nTest 9: Overwrite existing data") result = oss_client.upload_data("This is the overwritten text", text_key) print(f"Overwrite existing data: {'Success: ' + result if result else 'Failed'}") # View overwritten data new_content = oss_client.read_text(text_key) print(f"Overwritten text data: {new_content}") # Test 10: List objects print("\nTest 10: List objects") objects = oss_client.list_objects(prefix=TEST_PREFIX) print(f"Found {len(objects)} objects:") for obj in objects: print(f" - {obj['key']} (Size: {obj['size']} bytes, Modified: {obj['last_modified']})") # Test 11: Generate temporary URL print("\nTest 11: Generate temporary URL") url = oss_client.get_object_url(text_key, expires=300) # 5 minutes expiration print(f"Temporary URL: {url}") # Test 12: Copy object print("\nTest 12: Copy object") copy_key = f"{TEST_PREFIX}/copy_of_text.txt" result = oss_client.copy_object(text_key, copy_key) print(f"Copy object: {'Success: ' + copy_key if result else 'Failed'}") # Test 13: Check if object exists print("\nTest 13: Check if object exists") exists = oss_client.exists(text_key) print(f"Object {text_key} exists: {exists}") non_existent_key = f"{TEST_PREFIX}/non_existent.txt" exists = oss_client.exists(non_existent_key) print(f"Object {non_existent_key} exists: {exists}") # Test 14: Delete objects print("\nTest 14: Delete objects") for obj in objects: success = oss_client.delete_data(obj['key']) print(f"Delete object {obj['key']}: {'Success' if success else 'Failed'}") # Cleanup: Delete copied object (may not be included in the previous list) oss_client.delete_data(copy_key) print("\nTests completed!")