Spaces:
Sleeping
Sleeping
# coding: utf-8 | |
""" | |
oss.py | |
Utility class for OSS (Object Storage Service) operations. | |
Provides simple methods for data operations: upload, read, delete, update. | |
""" | |
import os | |
import json | |
import tempfile | |
from typing import Optional, Dict, List, Any, Tuple, Union, BinaryIO, TextIO, IO, AnyStr | |
from aworld.utils import import_package | |
from aworld.logs.util import logger | |
class OSSClient: | |
""" | |
A utility class for OSS (Object Storage Service) operations. | |
Provides methods for data operations: upload, read, delete, update. | |
""" | |
def __init__(self, | |
access_key_id: Optional[str] = None, | |
access_key_secret: Optional[str] = None, | |
endpoint: Optional[str] = None, | |
bucket_name: Optional[str] = None, | |
enable_export: Optional[bool] = None): | |
""" | |
Initialize OSSClient with credentials. | |
Args: | |
access_key_id: OSS access key ID. If None, will try to get from environment variable OSS_ACCESS_KEY_ID | |
access_key_secret: OSS access key secret. If None, will try to get from environment variable OSS_ACCESS_KEY_SECRET | |
endpoint: OSS endpoint. If None, will try to get from environment variable OSS_ENDPOINT | |
bucket_name: OSS bucket name. If None, will try to get from environment variable OSS_BUCKET_NAME | |
enable_export: Whether to enable OSS export. If None, will try to get from environment variable EXPORT_REPLAY_TRACE_TO_OSS | |
""" | |
self.access_key_id = access_key_id or os.getenv('OSS_ACCESS_KEY_ID') | |
self.access_key_secret = access_key_secret or os.getenv('OSS_ACCESS_KEY_SECRET') | |
self.endpoint = endpoint or os.getenv('OSS_ENDPOINT') | |
self.bucket_name = bucket_name or os.getenv('OSS_BUCKET_NAME') | |
self.enable_export = enable_export if enable_export is not None else os.getenv("EXPORT_REPLAY_TRACE_TO_OSS", | |
"false").lower() == "true" | |
self.bucket = None | |
self._initialized = False | |
def initialize(self) -> bool: | |
""" | |
Initialize the OSS client with the provided or environment credentials. | |
Returns: | |
bool: True if initialization is successful, False otherwise | |
""" | |
if self._initialized: | |
return True | |
if not self.enable_export: | |
logger.info("OSS export is disabled. Set EXPORT_REPLAY_TRACE_TO_OSS=true to enable.") | |
return False | |
if not all([self.access_key_id, self.access_key_secret, self.endpoint, self.bucket_name]): | |
logger.warn( | |
"Missing required OSS credentials. Please provide all required parameters or set environment variables.") | |
return False | |
try: | |
import_package("oss2") | |
import oss2 | |
auth = oss2.Auth(self.access_key_id, self.access_key_secret) | |
self.bucket = oss2.Bucket(auth, self.endpoint, self.bucket_name) | |
self._initialized = True | |
return True | |
except ImportError: | |
logger.warn("Failed to import oss2 module. Please install it with 'pip install oss2'.") | |
return False | |
except Exception as e: | |
logger.warn(f"Failed to initialize OSS client. Error: {str(e)}") | |
return False | |
# ---- Basic Data Operation Methods ---- | |
def upload_data(self, data: Union[IO[AnyStr], str, bytes, dict], oss_key: str) -> Optional[str]: | |
""" | |
Upload data to OSS. Supports various types of data: | |
- In-memory file objects (IO[AnyStr]) | |
- Strings (str) | |
- Bytes (bytes) | |
- Dictionaries (dict), will be automatically converted to JSON | |
- File paths (str) | |
Args: | |
data: Data to upload, can be a file object or other supported types | |
oss_key: The key (path) in OSS where the data will be stored | |
Returns: | |
str: The OSS key if successful, None otherwise | |
""" | |
if not self.initialize(): | |
logger.warn("OSS client not initialized or export is disabled") | |
return None | |
try: | |
# Handle file objects | |
if hasattr(data, 'read'): | |
content = data.read() | |
if isinstance(content, str): | |
content = content.encode('utf-8') | |
self.bucket.put_object(oss_key, content) | |
logger.info(f"Successfully uploaded memory file to OSS: {oss_key}") | |
return oss_key | |
# Handle dictionaries | |
if isinstance(data, dict): | |
content = json.dumps(data, ensure_ascii=False).encode('utf-8') | |
self.bucket.put_object(oss_key, content) | |
return oss_key | |
# Handle strings | |
if isinstance(data, str): | |
# Check if it's a file path | |
if os.path.isfile(data): | |
self.bucket.put_object_from_file(oss_key, data) | |
logger.info(f"Successfully uploaded file {data} to OSS: {oss_key}") | |
return oss_key | |
# Otherwise treat as string content | |
content = data.encode('utf-8') | |
self.bucket.put_object(oss_key, content) | |
return oss_key | |
# Handle bytes | |
self.bucket.put_object(oss_key, data) | |
logger.info(f"Successfully uploaded data to OSS: {oss_key}") | |
return oss_key | |
except Exception as e: | |
logger.warn(f"Failed to upload data to OSS: {str(e)}") | |
return None | |
def read_data(self, oss_key: str, as_json: bool = False) -> Union[bytes, dict, str, None]: | |
""" | |
Read data from OSS. | |
Args: | |
oss_key: The key (path) in OSS of the data to read | |
as_json: If True, parse the data as JSON and return a dict | |
Returns: | |
The data as bytes, dict (if as_json=True), or None if failed | |
""" | |
if not self.initialize(): | |
logger.warn("OSS client not initialized or export is disabled") | |
return None | |
try: | |
# Read data | |
result = self.bucket.get_object(oss_key) | |
data = result.read() | |
# Convert to string or JSON if requested | |
if as_json: | |
return json.loads(data) | |
return data | |
except Exception as e: | |
logger.warn(f"Failed to read data from OSS: {str(e)}") | |
return None | |
def read_text(self, oss_key: str) -> Optional[str]: | |
""" | |
Read text data from OSS. | |
Args: | |
oss_key: The key (path) in OSS of the text to read | |
Returns: | |
str: The text data, or None if failed | |
""" | |
data = self.read_data(oss_key) | |
if data is not None: | |
try: | |
return data.decode('utf-8') | |
except Exception as e: | |
logger.warn(f"Failed to decode data as UTF-8: {str(e)}") | |
return None | |
def delete_data(self, oss_key: str) -> bool: | |
""" | |
Delete data from OSS. | |
Args: | |
oss_key: The key (path) in OSS of the data to delete | |
Returns: | |
bool: True if successful, False otherwise | |
""" | |
if not self.initialize(): | |
logger.warn("OSS client not initialized or export is disabled") | |
return False | |
try: | |
self.bucket.delete_object(oss_key) | |
logger.info(f"Successfully deleted data from OSS: {oss_key}") | |
return True | |
except Exception as e: | |
logger.warn(f"Failed to delete data from OSS: {str(e)}") | |
return False | |
def update_data(self, oss_key: str, data: Union[IO[AnyStr], str, bytes, dict]) -> Optional[str]: | |
""" | |
Update data in OSS (delete and upload). | |
Args: | |
oss_key: The key (path) in OSS of the data to update | |
data: New data to upload, can be a file object or other supported types | |
Returns: | |
str: The OSS key if successful, None otherwise | |
""" | |
# For OSS, update is the same as upload (it overwrites) | |
return self.upload_data(data, oss_key) | |
def update_json(self, oss_key: str, update_dict: dict) -> Optional[str]: | |
""" | |
Update JSON data in OSS by merging with existing data. | |
Args: | |
oss_key: The key (path) in OSS of the JSON data to update | |
update_dict: Dictionary with fields to update | |
Returns: | |
str: The OSS key if successful, None otherwise | |
""" | |
if not self.initialize(): | |
logger.warn("OSS client not initialized or export is disabled") | |
return None | |
try: | |
# Read existing data | |
existing_data = self.read_data(oss_key, as_json=True) | |
if existing_data is None: | |
existing_data = {} | |
# Update data | |
if isinstance(existing_data, dict): | |
existing_data.update(update_dict) | |
else: | |
logger.warn(f"Existing data is not a dictionary: {oss_key}") | |
return None | |
# Upload updated data | |
return self.upload_data(existing_data, oss_key) | |
except Exception as e: | |
logger.warn(f"Failed to update JSON data in OSS: {str(e)}") | |
return None | |
# ---- File Operation Methods ---- | |
def upload_file(self, local_file: str, oss_key: Optional[str] = None) -> Optional[str]: | |
""" | |
Upload a local file to OSS. | |
Args: | |
local_file: Path to the local file | |
oss_key: The key (path) in OSS where the file will be stored. | |
If None, will use the basename of the local file | |
Returns: | |
str: The OSS key if successful, None otherwise | |
""" | |
if not self.initialize(): | |
logger.warn("OSS client not initialized or export is disabled") | |
return None | |
try: | |
if not os.path.exists(local_file): | |
logger.warn(f"Local file {local_file} does not exist") | |
return None | |
if oss_key is None: | |
oss_key = f"uploads/{os.path.basename(local_file)}" | |
self.bucket.put_object_from_file(oss_key, local_file) | |
logger.info(f"Successfully uploaded {local_file} to OSS: {oss_key}") | |
return oss_key | |
except Exception as e: | |
logger.warn(f"Failed to upload {local_file} to OSS: {str(e)}") | |
return None | |
def download_file(self, oss_key: str, local_file: str) -> bool: | |
""" | |
Download a file from OSS to local. | |
Args: | |
oss_key: The key (path) in OSS of the file to download | |
local_file: Path where the downloaded file will be saved | |
Returns: | |
bool: True if successful, False otherwise | |
""" | |
if not self.initialize(): | |
logger.warn("OSS client not initialized or export is disabled") | |
return False | |
try: | |
# Ensure the directory exists | |
os.makedirs(os.path.dirname(os.path.abspath(local_file)), exist_ok=True) | |
# Download the file | |
self.bucket.get_object_to_file(oss_key, local_file) | |
logger.info(f"Successfully downloaded {oss_key} to {local_file}") | |
return True | |
except Exception as e: | |
logger.warn(f"Failed to download {oss_key} from OSS: {str(e)}") | |
return False | |
def list_objects(self, prefix: str = "", delimiter: str = "") -> List[Dict[str, Any]]: | |
""" | |
List objects in the OSS bucket with the given prefix. | |
Args: | |
prefix: Prefix to filter objects | |
delimiter: Delimiter for hierarchical listing | |
Returns: | |
List of objects with their properties | |
""" | |
if not self.initialize(): | |
logger.warn("OSS client not initialized or export is disabled") | |
return [] | |
try: | |
result = [] | |
for obj in self.bucket.list_objects(prefix=prefix, delimiter=delimiter).object_list: | |
result.append({ | |
'key': obj.key, | |
'size': obj.size, | |
'last_modified': obj.last_modified | |
}) | |
return result | |
except Exception as e: | |
logger.warn(f"Failed to list objects with prefix {prefix}: {str(e)}") | |
return [] | |
# ---- Advanced Operation Methods ---- | |
def exists(self, oss_key: str) -> bool: | |
""" | |
Check if an object exists in OSS. | |
Args: | |
oss_key: The key (path) in OSS to check | |
Returns: | |
bool: True if the object exists, False otherwise | |
""" | |
if not self.initialize(): | |
logger.warn("OSS client not initialized or export is disabled") | |
return False | |
try: | |
# Use head_object to check if the object exists | |
self.bucket.head_object(oss_key) | |
return True | |
except: | |
return False | |
def copy_object(self, source_key: str, target_key: str) -> bool: | |
""" | |
Copy an object within the same bucket. | |
Args: | |
source_key: The source object key | |
target_key: The target object key | |
Returns: | |
bool: True if successful, False otherwise | |
""" | |
if not self.initialize(): | |
logger.warn("OSS client not initialized or export is disabled") | |
return False | |
try: | |
self.bucket.copy_object(self.bucket_name, source_key, target_key) | |
logger.info(f"Successfully copied {source_key} to {target_key}") | |
return True | |
except Exception as e: | |
logger.warn(f"Failed to copy {source_key} to {target_key}: {str(e)}") | |
return False | |
def get_object_url(self, oss_key: str, expires: int = 3600) -> Optional[str]: | |
""" | |
Generate a temporary URL for accessing an object. | |
Args: | |
oss_key: The key (path) in OSS of the object | |
expires: URL expiration time in seconds | |
Returns: | |
str: The signed URL, or None if failed | |
""" | |
if not self.initialize(): | |
logger.warn("OSS client not initialized or export is disabled") | |
return None | |
try: | |
url = self.bucket.sign_url('GET', oss_key, expires) | |
return url | |
except Exception as e: | |
logger.warn(f"Failed to generate URL for {oss_key}: {str(e)}") | |
return None | |
def upload_directory(self, local_dir: str, oss_prefix: str = "") -> Tuple[bool, List[str]]: | |
""" | |
Upload an entire directory to OSS. | |
Args: | |
local_dir: Path to the local directory | |
oss_prefix: Prefix to prepend to all uploaded files | |
Returns: | |
Tuple of (success, list of uploaded files) | |
""" | |
if not self.initialize(): | |
logger.warn("OSS client not initialized or export is disabled") | |
return False, [] | |
if not os.path.isdir(local_dir): | |
logger.warn(f"Local directory {local_dir} does not exist or is not a directory") | |
return False, [] | |
uploaded_files = [] | |
errors = [] | |
for root, _, files in os.walk(local_dir): | |
for file in files: | |
local_file = os.path.join(root, file) | |
rel_path = os.path.relpath(local_file, local_dir) | |
oss_key = os.path.join(oss_prefix, rel_path).replace("\\", "/") | |
result = self.upload_file(local_file, oss_key) | |
if result: | |
uploaded_files.append(result) | |
else: | |
errors.append(local_file) | |
if errors: | |
logger.warn(f"Failed to upload {len(errors)} files") | |
return False, uploaded_files | |
return True, uploaded_files | |
def get_oss_client(access_key_id: Optional[str] = None, | |
access_key_secret: Optional[str] = None, | |
endpoint: Optional[str] = None, | |
bucket_name: Optional[str] = None, | |
enable_export: Optional[bool] = None) -> OSSClient: | |
""" | |
Factory function to create and initialize an OSSClient. | |
Args: | |
access_key_id: OSS access key ID | |
access_key_secret: OSS access key secret | |
endpoint: OSS endpoint | |
bucket_name: OSS bucket name | |
enable_export: Whether to enable OSS export | |
Returns: | |
OSSClient: An initialized OSSClient instance | |
""" | |
client = OSSClient( | |
access_key_id=access_key_id, | |
access_key_secret=access_key_secret, | |
endpoint=endpoint, | |
bucket_name=bucket_name, | |
enable_export=enable_export | |
) | |
client.initialize() | |
return client | |
# ---- Test Cases ---- | |
if __name__ == "__main__": | |
""" | |
OSS tool class test cases | |
Note: Before running the tests, you need to set the following environment variables, | |
or provide the parameters directly in the test code: | |
- OSS_ACCESS_KEY_ID | |
- OSS_ACCESS_KEY_SECRET | |
- OSS_ENDPOINT | |
- OSS_BUCKET_NAME | |
- EXPORT_REPLAY_TRACE_TO_OSS=true | |
""" | |
import io | |
import time | |
# Test configuration | |
TEST_PREFIX = f"test/oss_utils_123" # Use timestamp to avoid conflicts | |
# Initialize client | |
# Method 1: Using environment variables | |
# oss_client = get_oss_client(enable_export=True) | |
# Method 2: Provide parameters directly | |
oss_client = get_oss_client( | |
access_key_id="", # Replace with your actual access key ID | |
access_key_secret="", # Replace with your actual access key secret | |
endpoint="", # Replace with your actual OSS endpoint | |
bucket_name="", # Replace with your actual bucket name | |
enable_export=True | |
) | |
text_key = f"{TEST_PREFIX}/text.txt" | |
result = oss_client.upload_data("malai This is a test text", text_key) | |
print(f"Upload string data: {'Success: ' + result if result else 'Failed'}") | |
print("\nTest 6: Read text data") | |
content = oss_client.read_text(text_key) | |
print(f"Read text data: {content}") | |
# Test 1: Upload string data | |
print("\nTest 1: Upload string data") | |
text_key = f"{TEST_PREFIX}/text.txt" | |
result = oss_client.upload_data("This is a test text", text_key) | |
print(f"Upload string data: {'Success: ' + result if result else 'Failed'}") | |
# Test 2: Upload dictionary data (automatically converted to JSON) | |
print("\nTest 2: Upload dictionary data") | |
json_key = f"{TEST_PREFIX}/data.json" | |
data = { | |
"name": "Test data", | |
"values": [1, 2, 3], | |
"nested": { | |
"key": "value" | |
} | |
} | |
result = oss_client.upload_data(data, json_key) | |
print(f"Upload dictionary data: {'Success: ' + result if result else 'Failed'}") | |
# Test 3: Upload in-memory binary file object | |
print("\nTest 3: Upload in-memory binary file object") | |
binary_key = f"{TEST_PREFIX}/binary.dat" | |
binary_data = io.BytesIO(b"\x00\x01\x02\x03\x04") | |
result = oss_client.upload_data(binary_data, binary_key) | |
print(f"Upload binary file object: {'Success: ' + result if result else 'Failed'}") | |
# Test 4: Upload in-memory text file object | |
print("\nTest 4: Upload in-memory text file object") | |
text_file_key = f"{TEST_PREFIX}/text_file.txt" | |
text_file = io.StringIO("This is the content of an in-memory text file") | |
result = oss_client.upload_data(text_file, text_file_key) | |
print(f"Upload text file object: {'Success: ' + result if result else 'Failed'}") | |
# Test 5: Create and upload temporary file | |
print("\nTest 5: Create and upload temporary file") | |
with tempfile.NamedTemporaryFile(delete=False) as tmp: | |
tmp.write(b"This is the content of a temporary file") | |
tmp_path = tmp.name | |
file_key = f"{TEST_PREFIX}/temp_file.txt" | |
result = oss_client.upload_file(tmp_path, file_key) | |
print(f"Upload temporary file: {'Success: ' + result if result else 'Failed'}") | |
os.unlink(tmp_path) # Delete temporary file | |
# Test 6: Read text data | |
print("\nTest 6: Read text data") | |
content = oss_client.read_text(text_key) | |
print(f"Read text data: {content}") | |
# Test 7: Read JSON data | |
print("\nTest 7: Read JSON data") | |
json_content = oss_client.read_data(json_key, as_json=True) | |
print(f"Read JSON data: {json_content}") | |
# Test 8: Update JSON data (merge method) | |
print("\nTest 8: Update JSON data") | |
update_data = {"updated": True, "timestamp": time.time()} | |
result = oss_client.update_json(json_key, update_data) | |
print(f"Update JSON data: {'Success: ' + result if result else 'Failed'}") | |
# View updated JSON data | |
updated_json = oss_client.read_data(json_key, as_json=True) | |
print(f"Updated JSON data: {updated_json}") | |
# Test 9: Overwrite existing data | |
print("\nTest 9: Overwrite existing data") | |
result = oss_client.upload_data("This is the overwritten text", text_key) | |
print(f"Overwrite existing data: {'Success: ' + result if result else 'Failed'}") | |
# View overwritten data | |
new_content = oss_client.read_text(text_key) | |
print(f"Overwritten text data: {new_content}") | |
# Test 10: List objects | |
print("\nTest 10: List objects") | |
objects = oss_client.list_objects(prefix=TEST_PREFIX) | |
print(f"Found {len(objects)} objects:") | |
for obj in objects: | |
print(f" - {obj['key']} (Size: {obj['size']} bytes, Modified: {obj['last_modified']})") | |
# Test 11: Generate temporary URL | |
print("\nTest 11: Generate temporary URL") | |
url = oss_client.get_object_url(text_key, expires=300) # 5 minutes expiration | |
print(f"Temporary URL: {url}") | |
# Test 12: Copy object | |
print("\nTest 12: Copy object") | |
copy_key = f"{TEST_PREFIX}/copy_of_text.txt" | |
result = oss_client.copy_object(text_key, copy_key) | |
print(f"Copy object: {'Success: ' + copy_key if result else 'Failed'}") | |
# Test 13: Check if object exists | |
print("\nTest 13: Check if object exists") | |
exists = oss_client.exists(text_key) | |
print(f"Object {text_key} exists: {exists}") | |
non_existent_key = f"{TEST_PREFIX}/non_existent.txt" | |
exists = oss_client.exists(non_existent_key) | |
print(f"Object {non_existent_key} exists: {exists}") | |
# Test 14: Delete objects | |
print("\nTest 14: Delete objects") | |
for obj in objects: | |
success = oss_client.delete_data(obj['key']) | |
print(f"Delete object {obj['key']}: {'Success' if success else 'Failed'}") | |
# Cleanup: Delete copied object (may not be included in the previous list) | |
oss_client.delete_data(copy_key) | |
print("\nTests completed!") |