Spaces:
Running
Running
# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"). You | |
# may not use this file except in compliance with the License. A copy of | |
# the License is located at | |
# | |
# https://aws.amazon.com/apache2.0/ | |
# | |
# or in the "license" file accompanying this file. This file is | |
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF | |
# ANY KIND, either express or implied. See the License for the specific | |
# language governing permissions and limitations under the License. | |
""" | |
This file contains private functionality for interacting with the AWS | |
Common Runtime library (awscrt) in boto3. | |
All code contained within this file is for internal usage within this | |
project and is not intended for external consumption. All interfaces | |
contained within are subject to abrupt breaking changes. | |
""" | |
import threading | |
import botocore.exceptions | |
from botocore.session import Session | |
from s3transfer.crt import ( | |
BotocoreCRTCredentialsWrapper, | |
BotocoreCRTRequestSerializer, | |
CRTTransferManager, | |
acquire_crt_s3_process_lock, | |
create_s3_crt_client, | |
) | |
# Singletons for CRT-backed transfers | |
CRT_S3_CLIENT = None | |
BOTOCORE_CRT_SERIALIZER = None | |
CLIENT_CREATION_LOCK = threading.Lock() | |
PROCESS_LOCK_NAME = 'boto3' | |
def _create_crt_client(session, config, region_name, cred_provider): | |
"""Create a CRT S3 Client for file transfer. | |
Instantiating many of these may lead to degraded performance or | |
system resource exhaustion. | |
""" | |
create_crt_client_kwargs = { | |
'region': region_name, | |
'use_ssl': True, | |
'crt_credentials_provider': cred_provider, | |
} | |
return create_s3_crt_client(**create_crt_client_kwargs) | |
def _create_crt_request_serializer(session, region_name): | |
return BotocoreCRTRequestSerializer( | |
session, {'region_name': region_name, 'endpoint_url': None} | |
) | |
def _create_crt_s3_client( | |
session, config, region_name, credentials, lock, **kwargs | |
): | |
"""Create boto3 wrapper class to manage crt lock reference and S3 client.""" | |
cred_wrapper = BotocoreCRTCredentialsWrapper(credentials) | |
cred_provider = cred_wrapper.to_crt_credentials_provider() | |
return CRTS3Client( | |
_create_crt_client(session, config, region_name, cred_provider), | |
lock, | |
region_name, | |
cred_wrapper, | |
) | |
def _initialize_crt_transfer_primatives(client, config): | |
lock = acquire_crt_s3_process_lock(PROCESS_LOCK_NAME) | |
if lock is None: | |
# If we're unable to acquire the lock, we cannot | |
# use the CRT in this process and should default to | |
# the classic s3transfer manager. | |
return None, None | |
session = Session() | |
region_name = client.meta.region_name | |
credentials = client._get_credentials() | |
serializer = _create_crt_request_serializer(session, region_name) | |
s3_client = _create_crt_s3_client( | |
session, config, region_name, credentials, lock | |
) | |
return serializer, s3_client | |
def get_crt_s3_client(client, config): | |
global CRT_S3_CLIENT | |
global BOTOCORE_CRT_SERIALIZER | |
with CLIENT_CREATION_LOCK: | |
if CRT_S3_CLIENT is None: | |
serializer, s3_client = _initialize_crt_transfer_primatives( | |
client, config | |
) | |
BOTOCORE_CRT_SERIALIZER = serializer | |
CRT_S3_CLIENT = s3_client | |
return CRT_S3_CLIENT | |
class CRTS3Client: | |
""" | |
This wrapper keeps track of our underlying CRT client, the lock used to | |
acquire it and the region we've used to instantiate the client. | |
Due to limitations in the existing CRT interfaces, we can only make calls | |
in a single region and does not support redirects. We track the region to | |
ensure we don't use the CRT client when a successful request cannot be made. | |
""" | |
def __init__(self, crt_client, process_lock, region, cred_provider): | |
self.crt_client = crt_client | |
self.process_lock = process_lock | |
self.region = region | |
self.cred_provider = cred_provider | |
def is_crt_compatible_request(client, crt_s3_client): | |
""" | |
Boto3 client must use same signing region and credentials | |
as the CRT_S3_CLIENT singleton. Otherwise fallback to classic. | |
""" | |
if crt_s3_client is None: | |
return False | |
boto3_creds = client._get_credentials() | |
if boto3_creds is None: | |
return False | |
is_same_identity = compare_identity( | |
boto3_creds.get_frozen_credentials(), crt_s3_client.cred_provider | |
) | |
is_same_region = client.meta.region_name == crt_s3_client.region | |
return is_same_region and is_same_identity | |
def compare_identity(boto3_creds, crt_s3_creds): | |
try: | |
crt_creds = crt_s3_creds() | |
except botocore.exceptions.NoCredentialsError: | |
return False | |
is_matching_identity = ( | |
boto3_creds.access_key == crt_creds.access_key_id | |
and boto3_creds.secret_key == crt_creds.secret_access_key | |
and boto3_creds.token == crt_creds.session_token | |
) | |
return is_matching_identity | |
def create_crt_transfer_manager(client, config): | |
"""Create a CRTTransferManager for optimized data transfer.""" | |
crt_s3_client = get_crt_s3_client(client, config) | |
if is_crt_compatible_request(client, crt_s3_client): | |
return CRTTransferManager( | |
crt_s3_client.crt_client, BOTOCORE_CRT_SERIALIZER | |
) | |
return None | |