Spaces:
Running
Running
from __future__ import annotations | |
import json | |
import time | |
import uuid | |
import email | |
import asyncio | |
import inspect | |
import logging | |
import platform | |
import warnings | |
import email.utils | |
from types import TracebackType | |
from random import random | |
from typing import ( | |
TYPE_CHECKING, | |
Any, | |
Dict, | |
Type, | |
Union, | |
Generic, | |
Mapping, | |
TypeVar, | |
Iterable, | |
Iterator, | |
Optional, | |
Generator, | |
AsyncIterator, | |
cast, | |
overload, | |
) | |
from typing_extensions import Literal, override, get_origin | |
import anyio | |
import httpx | |
import distro | |
import pydantic | |
from httpx import URL, Limits | |
from pydantic import PrivateAttr | |
from . import _exceptions | |
from ._qs import Querystring | |
from ._files import to_httpx_files, async_to_httpx_files | |
from ._types import ( | |
NOT_GIVEN, | |
Body, | |
Omit, | |
Query, | |
Headers, | |
Timeout, | |
NotGiven, | |
ResponseT, | |
Transport, | |
AnyMapping, | |
PostParser, | |
ProxiesTypes, | |
RequestFiles, | |
HttpxSendArgs, | |
AsyncTransport, | |
RequestOptions, | |
HttpxRequestFiles, | |
ModelBuilderProtocol, | |
) | |
from ._utils import is_dict, is_list, asyncify, is_given, lru_cache, is_mapping | |
from ._compat import model_copy, model_dump | |
from ._models import GenericModel, FinalRequestOptions, validate_type, construct_type | |
from ._response import ( | |
APIResponse, | |
BaseAPIResponse, | |
AsyncAPIResponse, | |
extract_response_type, | |
) | |
from ._constants import ( | |
DEFAULT_TIMEOUT, | |
MAX_RETRY_DELAY, | |
DEFAULT_MAX_RETRIES, | |
INITIAL_RETRY_DELAY, | |
RAW_RESPONSE_HEADER, | |
OVERRIDE_CAST_TO_HEADER, | |
DEFAULT_CONNECTION_LIMITS, | |
) | |
from ._streaming import Stream, SSEDecoder, AsyncStream, SSEBytesDecoder | |
from ._exceptions import ( | |
APIStatusError, | |
APITimeoutError, | |
APIConnectionError, | |
APIResponseValidationError, | |
) | |
from ._legacy_response import LegacyAPIResponse | |
log: logging.Logger = logging.getLogger(__name__) | |
# TODO: make base page type vars covariant | |
SyncPageT = TypeVar("SyncPageT", bound="BaseSyncPage[Any]") | |
AsyncPageT = TypeVar("AsyncPageT", bound="BaseAsyncPage[Any]") | |
_T = TypeVar("_T") | |
_T_co = TypeVar("_T_co", covariant=True) | |
_StreamT = TypeVar("_StreamT", bound=Stream[Any]) | |
_AsyncStreamT = TypeVar("_AsyncStreamT", bound=AsyncStream[Any]) | |
if TYPE_CHECKING: | |
from httpx._config import DEFAULT_TIMEOUT_CONFIG as HTTPX_DEFAULT_TIMEOUT | |
else: | |
try: | |
from httpx._config import DEFAULT_TIMEOUT_CONFIG as HTTPX_DEFAULT_TIMEOUT | |
except ImportError: | |
# taken from https://github.com/encode/httpx/blob/3ba5fe0d7ac70222590e759c31442b1cab263791/httpx/_config.py#L366 | |
HTTPX_DEFAULT_TIMEOUT = Timeout(5.0) | |
class PageInfo: | |
"""Stores the necessary information to build the request to retrieve the next page. | |
Either `url` or `params` must be set. | |
""" | |
url: URL | NotGiven | |
params: Query | NotGiven | |
def __init__( | |
self, | |
*, | |
url: URL, | |
) -> None: | |
... | |
def __init__( | |
self, | |
*, | |
params: Query, | |
) -> None: | |
... | |
def __init__( | |
self, | |
*, | |
url: URL | NotGiven = NOT_GIVEN, | |
params: Query | NotGiven = NOT_GIVEN, | |
) -> None: | |
self.url = url | |
self.params = params | |
class BasePage(GenericModel, Generic[_T]): | |
""" | |
Defines the core interface for pagination. | |
Type Args: | |
ModelT: The pydantic model that represents an item in the response. | |
Methods: | |
has_next_page(): Check if there is another page available | |
next_page_info(): Get the necessary information to make a request for the next page | |
""" | |
_options: FinalRequestOptions = PrivateAttr() | |
_model: Type[_T] = PrivateAttr() | |
def has_next_page(self) -> bool: | |
items = self._get_page_items() | |
if not items: | |
return False | |
return self.next_page_info() is not None | |
def next_page_info(self) -> Optional[PageInfo]: | |
... | |
def _get_page_items(self) -> Iterable[_T]: # type: ignore[empty-body] | |
... | |
def _params_from_url(self, url: URL) -> httpx.QueryParams: | |
# TODO: do we have to preprocess params here? | |
return httpx.QueryParams(cast(Any, self._options.params)).merge(url.params) | |
def _info_to_options(self, info: PageInfo) -> FinalRequestOptions: | |
options = model_copy(self._options) | |
options._strip_raw_response_header() | |
if not isinstance(info.params, NotGiven): | |
options.params = {**options.params, **info.params} | |
return options | |
if not isinstance(info.url, NotGiven): | |
params = self._params_from_url(info.url) | |
url = info.url.copy_with(params=params) | |
options.params = dict(url.params) | |
options.url = str(url) | |
return options | |
raise ValueError("Unexpected PageInfo state") | |
class BaseSyncPage(BasePage[_T], Generic[_T]): | |
_client: SyncAPIClient = pydantic.PrivateAttr() | |
def _set_private_attributes( | |
self, | |
client: SyncAPIClient, | |
model: Type[_T], | |
options: FinalRequestOptions, | |
) -> None: | |
self._model = model | |
self._client = client | |
self._options = options | |
# Pydantic uses a custom `__iter__` method to support casting BaseModels | |
# to dictionaries. e.g. dict(model). | |
# As we want to support `for item in page`, this is inherently incompatible | |
# with the default pydantic behaviour. It is not possible to support both | |
# use cases at once. Fortunately, this is not a big deal as all other pydantic | |
# methods should continue to work as expected as there is an alternative method | |
# to cast a model to a dictionary, model.dict(), which is used internally | |
# by pydantic. | |
def __iter__(self) -> Iterator[_T]: # type: ignore | |
for page in self.iter_pages(): | |
for item in page._get_page_items(): | |
yield item | |
def iter_pages(self: SyncPageT) -> Iterator[SyncPageT]: | |
page = self | |
while True: | |
yield page | |
if page.has_next_page(): | |
page = page.get_next_page() | |
else: | |
return | |
def get_next_page(self: SyncPageT) -> SyncPageT: | |
info = self.next_page_info() | |
if not info: | |
raise RuntimeError( | |
"No next page expected; please check `.has_next_page()` before calling `.get_next_page()`." | |
) | |
options = self._info_to_options(info) | |
return self._client._request_api_list(self._model, page=self.__class__, options=options) | |
class AsyncPaginator(Generic[_T, AsyncPageT]): | |
def __init__( | |
self, | |
client: AsyncAPIClient, | |
options: FinalRequestOptions, | |
page_cls: Type[AsyncPageT], | |
model: Type[_T], | |
) -> None: | |
self._model = model | |
self._client = client | |
self._options = options | |
self._page_cls = page_cls | |
def __await__(self) -> Generator[Any, None, AsyncPageT]: | |
return self._get_page().__await__() | |
async def _get_page(self) -> AsyncPageT: | |
def _parser(resp: AsyncPageT) -> AsyncPageT: | |
resp._set_private_attributes( | |
model=self._model, | |
options=self._options, | |
client=self._client, | |
) | |
return resp | |
self._options.post_parser = _parser | |
return await self._client.request(self._page_cls, self._options) | |
async def __aiter__(self) -> AsyncIterator[_T]: | |
# https://github.com/microsoft/pyright/issues/3464 | |
page = cast( | |
AsyncPageT, | |
await self, # type: ignore | |
) | |
async for item in page: | |
yield item | |
class BaseAsyncPage(BasePage[_T], Generic[_T]): | |
_client: AsyncAPIClient = pydantic.PrivateAttr() | |
def _set_private_attributes( | |
self, | |
model: Type[_T], | |
client: AsyncAPIClient, | |
options: FinalRequestOptions, | |
) -> None: | |
self._model = model | |
self._client = client | |
self._options = options | |
async def __aiter__(self) -> AsyncIterator[_T]: | |
async for page in self.iter_pages(): | |
for item in page._get_page_items(): | |
yield item | |
async def iter_pages(self: AsyncPageT) -> AsyncIterator[AsyncPageT]: | |
page = self | |
while True: | |
yield page | |
if page.has_next_page(): | |
page = await page.get_next_page() | |
else: | |
return | |
async def get_next_page(self: AsyncPageT) -> AsyncPageT: | |
info = self.next_page_info() | |
if not info: | |
raise RuntimeError( | |
"No next page expected; please check `.has_next_page()` before calling `.get_next_page()`." | |
) | |
options = self._info_to_options(info) | |
return await self._client._request_api_list(self._model, page=self.__class__, options=options) | |
_HttpxClientT = TypeVar("_HttpxClientT", bound=Union[httpx.Client, httpx.AsyncClient]) | |
_DefaultStreamT = TypeVar("_DefaultStreamT", bound=Union[Stream[Any], AsyncStream[Any]]) | |
class BaseClient(Generic[_HttpxClientT, _DefaultStreamT]): | |
_client: _HttpxClientT | |
_version: str | |
_base_url: URL | |
max_retries: int | |
timeout: Union[float, Timeout, None] | |
_limits: httpx.Limits | |
_proxies: ProxiesTypes | None | |
_transport: Transport | AsyncTransport | None | |
_strict_response_validation: bool | |
_idempotency_header: str | None | |
_default_stream_cls: type[_DefaultStreamT] | None = None | |
def __init__( | |
self, | |
*, | |
version: str, | |
base_url: str | URL, | |
_strict_response_validation: bool, | |
max_retries: int = DEFAULT_MAX_RETRIES, | |
timeout: float | Timeout | None = DEFAULT_TIMEOUT, | |
limits: httpx.Limits, | |
transport: Transport | AsyncTransport | None, | |
proxies: ProxiesTypes | None, | |
custom_headers: Mapping[str, str] | None = None, | |
custom_query: Mapping[str, object] | None = None, | |
) -> None: | |
self._version = version | |
self._base_url = self._enforce_trailing_slash(URL(base_url)) | |
self.max_retries = max_retries | |
self.timeout = timeout | |
self._limits = limits | |
self._proxies = proxies | |
self._transport = transport | |
self._custom_headers = custom_headers or {} | |
self._custom_query = custom_query or {} | |
self._strict_response_validation = _strict_response_validation | |
self._idempotency_header = None | |
self._platform: Platform | None = None | |
if max_retries is None: # pyright: ignore[reportUnnecessaryComparison] | |
raise TypeError( | |
"max_retries cannot be None. If you want to disable retries, pass `0`; if you want unlimited retries, pass `math.inf` or a very high number; if you want the default behavior, pass `anthropic.DEFAULT_MAX_RETRIES`" | |
) | |
def _enforce_trailing_slash(self, url: URL) -> URL: | |
if url.raw_path.endswith(b"/"): | |
return url | |
return url.copy_with(raw_path=url.raw_path + b"/") | |
def _make_status_error_from_response( | |
self, | |
response: httpx.Response, | |
) -> APIStatusError: | |
if response.is_closed and not response.is_stream_consumed: | |
# We can't read the response body as it has been closed | |
# before it was read. This can happen if an event hook | |
# raises a status error. | |
body = None | |
err_msg = f"Error code: {response.status_code}" | |
else: | |
err_text = response.text.strip() | |
body = err_text | |
try: | |
body = json.loads(err_text) | |
err_msg = f"Error code: {response.status_code} - {body}" | |
except Exception: | |
err_msg = err_text or f"Error code: {response.status_code}" | |
return self._make_status_error(err_msg, body=body, response=response) | |
def _make_status_error( | |
self, | |
err_msg: str, | |
*, | |
body: object, | |
response: httpx.Response, | |
) -> _exceptions.APIStatusError: | |
raise NotImplementedError() | |
def _remaining_retries( | |
self, | |
remaining_retries: Optional[int], | |
options: FinalRequestOptions, | |
) -> int: | |
return remaining_retries if remaining_retries is not None else options.get_max_retries(self.max_retries) | |
def _build_headers(self, options: FinalRequestOptions) -> httpx.Headers: | |
custom_headers = options.headers or {} | |
headers_dict = _merge_mappings(self.default_headers, custom_headers) | |
self._validate_headers(headers_dict, custom_headers) | |
# headers are case-insensitive while dictionaries are not. | |
headers = httpx.Headers(headers_dict) | |
idempotency_header = self._idempotency_header | |
if idempotency_header and options.method.lower() != "get" and idempotency_header not in headers: | |
headers[idempotency_header] = options.idempotency_key or self._idempotency_key() | |
return headers | |
def _prepare_url(self, url: str) -> URL: | |
""" | |
Merge a URL argument together with any 'base_url' on the client, | |
to create the URL used for the outgoing request. | |
""" | |
# Copied from httpx's `_merge_url` method. | |
merge_url = URL(url) | |
if merge_url.is_relative_url: | |
merge_raw_path = self.base_url.raw_path + merge_url.raw_path.lstrip(b"/") | |
return self.base_url.copy_with(raw_path=merge_raw_path) | |
return merge_url | |
def _make_sse_decoder(self) -> SSEDecoder | SSEBytesDecoder: | |
return SSEDecoder() | |
def _build_request( | |
self, | |
options: FinalRequestOptions, | |
) -> httpx.Request: | |
if log.isEnabledFor(logging.DEBUG): | |
log.debug("Request options: %s", model_dump(options, exclude_unset=True)) | |
kwargs: dict[str, Any] = {} | |
json_data = options.json_data | |
if options.extra_json is not None: | |
if json_data is None: | |
json_data = cast(Body, options.extra_json) | |
elif is_mapping(json_data): | |
json_data = _merge_mappings(json_data, options.extra_json) | |
else: | |
raise RuntimeError(f"Unexpected JSON data type, {type(json_data)}, cannot merge with `extra_body`") | |
headers = self._build_headers(options) | |
params = _merge_mappings(self.default_query, options.params) | |
content_type = headers.get("Content-Type") | |
files = options.files | |
# If the given Content-Type header is multipart/form-data then it | |
# has to be removed so that httpx can generate the header with | |
# additional information for us as it has to be in this form | |
# for the server to be able to correctly parse the request: | |
# multipart/form-data; boundary=---abc-- | |
if content_type is not None and content_type.startswith("multipart/form-data"): | |
if "boundary" not in content_type: | |
# only remove the header if the boundary hasn't been explicitly set | |
# as the caller doesn't want httpx to come up with their own boundary | |
headers.pop("Content-Type") | |
# As we are now sending multipart/form-data instead of application/json | |
# we need to tell httpx to use it, https://www.python-httpx.org/advanced/clients/#multipart-file-encoding | |
if json_data: | |
if not is_dict(json_data): | |
raise TypeError( | |
f"Expected query input to be a dictionary for multipart requests but got {type(json_data)} instead." | |
) | |
kwargs["data"] = self._serialize_multipartform(json_data) | |
# httpx determines whether or not to send a "multipart/form-data" | |
# request based on the truthiness of the "files" argument. | |
# This gets around that issue by generating a dict value that | |
# evaluates to true. | |
# | |
# https://github.com/encode/httpx/discussions/2399#discussioncomment-3814186 | |
if not files: | |
files = cast(HttpxRequestFiles, ForceMultipartDict()) | |
# TODO: report this error to httpx | |
return self._client.build_request( # pyright: ignore[reportUnknownMemberType] | |
headers=headers, | |
timeout=self.timeout if isinstance(options.timeout, NotGiven) else options.timeout, | |
method=options.method, | |
url=self._prepare_url(options.url), | |
# the `Query` type that we use is incompatible with qs' | |
# `Params` type as it needs to be typed as `Mapping[str, object]` | |
# so that passing a `TypedDict` doesn't cause an error. | |
# https://github.com/microsoft/pyright/issues/3526#event-6715453066 | |
params=self.qs.stringify(cast(Mapping[str, Any], params)) if params else None, | |
json=json_data, | |
files=files, | |
**kwargs, | |
) | |
def _serialize_multipartform(self, data: Mapping[object, object]) -> dict[str, object]: | |
items = self.qs.stringify_items( | |
# TODO: type ignore is required as stringify_items is well typed but we can't be | |
# well typed without heavy validation. | |
data, # type: ignore | |
array_format="brackets", | |
) | |
serialized: dict[str, object] = {} | |
for key, value in items: | |
existing = serialized.get(key) | |
if not existing: | |
serialized[key] = value | |
continue | |
# If a value has already been set for this key then that | |
# means we're sending data like `array[]=[1, 2, 3]` and we | |
# need to tell httpx that we want to send multiple values with | |
# the same key which is done by using a list or a tuple. | |
# | |
# Note: 2d arrays should never result in the same key at both | |
# levels so it's safe to assume that if the value is a list, | |
# it was because we changed it to be a list. | |
if is_list(existing): | |
existing.append(value) | |
else: | |
serialized[key] = [existing, value] | |
return serialized | |
def _maybe_override_cast_to(self, cast_to: type[ResponseT], options: FinalRequestOptions) -> type[ResponseT]: | |
if not is_given(options.headers): | |
return cast_to | |
# make a copy of the headers so we don't mutate user-input | |
headers = dict(options.headers) | |
# we internally support defining a temporary header to override the | |
# default `cast_to` type for use with `.with_raw_response` and `.with_streaming_response` | |
# see _response.py for implementation details | |
override_cast_to = headers.pop(OVERRIDE_CAST_TO_HEADER, NOT_GIVEN) | |
if is_given(override_cast_to): | |
options.headers = headers | |
return cast(Type[ResponseT], override_cast_to) | |
return cast_to | |
def _should_stream_response_body(self, request: httpx.Request) -> bool: | |
return request.headers.get(RAW_RESPONSE_HEADER) == "stream" # type: ignore[no-any-return] | |
def _process_response_data( | |
self, | |
*, | |
data: object, | |
cast_to: type[ResponseT], | |
response: httpx.Response, | |
) -> ResponseT: | |
if data is None: | |
return cast(ResponseT, None) | |
if cast_to is object: | |
return cast(ResponseT, data) | |
try: | |
if inspect.isclass(cast_to) and issubclass(cast_to, ModelBuilderProtocol): | |
return cast(ResponseT, cast_to.build(response=response, data=data)) | |
if self._strict_response_validation: | |
return cast(ResponseT, validate_type(type_=cast_to, value=data)) | |
return cast(ResponseT, construct_type(type_=cast_to, value=data)) | |
except pydantic.ValidationError as err: | |
raise APIResponseValidationError(response=response, body=data) from err | |
def qs(self) -> Querystring: | |
return Querystring() | |
def custom_auth(self) -> httpx.Auth | None: | |
return None | |
def auth_headers(self) -> dict[str, str]: | |
return {} | |
def default_headers(self) -> dict[str, str | Omit]: | |
return { | |
"Accept": "application/json", | |
"Content-Type": "application/json", | |
"User-Agent": self.user_agent, | |
**self.platform_headers(), | |
**self.auth_headers, | |
**self._custom_headers, | |
} | |
def default_query(self) -> dict[str, object]: | |
return { | |
**self._custom_query, | |
} | |
def _validate_headers( | |
self, | |
headers: Headers, # noqa: ARG002 | |
custom_headers: Headers, # noqa: ARG002 | |
) -> None: | |
"""Validate the given default headers and custom headers. | |
Does nothing by default. | |
""" | |
return | |
def user_agent(self) -> str: | |
return f"{self.__class__.__name__}/Python {self._version}" | |
def base_url(self) -> URL: | |
return self._base_url | |
def base_url(self, url: URL | str) -> None: | |
self._base_url = self._enforce_trailing_slash(url if isinstance(url, URL) else URL(url)) | |
def platform_headers(self) -> Dict[str, str]: | |
# the actual implementation is in a separate `lru_cache` decorated | |
# function because adding `lru_cache` to methods will leak memory | |
# https://github.com/python/cpython/issues/88476 | |
return platform_headers(self._version, platform=self._platform) | |
def _parse_retry_after_header(self, response_headers: Optional[httpx.Headers] = None) -> float | None: | |
"""Returns a float of the number of seconds (not milliseconds) to wait after retrying, or None if unspecified. | |
About the Retry-After header: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After | |
See also https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After#syntax | |
""" | |
if response_headers is None: | |
return None | |
# First, try the non-standard `retry-after-ms` header for milliseconds, | |
# which is more precise than integer-seconds `retry-after` | |
try: | |
retry_ms_header = response_headers.get("retry-after-ms", None) | |
return float(retry_ms_header) / 1000 | |
except (TypeError, ValueError): | |
pass | |
# Next, try parsing `retry-after` header as seconds (allowing nonstandard floats). | |
retry_header = response_headers.get("retry-after") | |
try: | |
# note: the spec indicates that this should only ever be an integer | |
# but if someone sends a float there's no reason for us to not respect it | |
return float(retry_header) | |
except (TypeError, ValueError): | |
pass | |
# Last, try parsing `retry-after` as a date. | |
retry_date_tuple = email.utils.parsedate_tz(retry_header) | |
if retry_date_tuple is None: | |
return None | |
retry_date = email.utils.mktime_tz(retry_date_tuple) | |
return float(retry_date - time.time()) | |
def _calculate_retry_timeout( | |
self, | |
remaining_retries: int, | |
options: FinalRequestOptions, | |
response_headers: Optional[httpx.Headers] = None, | |
) -> float: | |
max_retries = options.get_max_retries(self.max_retries) | |
# If the API asks us to wait a certain amount of time (and it's a reasonable amount), just do what it says. | |
retry_after = self._parse_retry_after_header(response_headers) | |
if retry_after is not None and 0 < retry_after <= 60: | |
return retry_after | |
nb_retries = max_retries - remaining_retries | |
# Apply exponential backoff, but not more than the max. | |
sleep_seconds = min(INITIAL_RETRY_DELAY * pow(2.0, nb_retries), MAX_RETRY_DELAY) | |
# Apply some jitter, plus-or-minus half a second. | |
jitter = 1 - 0.25 * random() | |
timeout = sleep_seconds * jitter | |
return timeout if timeout >= 0 else 0 | |
def _should_retry(self, response: httpx.Response) -> bool: | |
# Note: this is not a standard header | |
should_retry_header = response.headers.get("x-should-retry") | |
# If the server explicitly says whether or not to retry, obey. | |
if should_retry_header == "true": | |
log.debug("Retrying as header `x-should-retry` is set to `true`") | |
return True | |
if should_retry_header == "false": | |
log.debug("Not retrying as header `x-should-retry` is set to `false`") | |
return False | |
# Retry on request timeouts. | |
if response.status_code == 408: | |
log.debug("Retrying due to status code %i", response.status_code) | |
return True | |
# Retry on lock timeouts. | |
if response.status_code == 409: | |
log.debug("Retrying due to status code %i", response.status_code) | |
return True | |
# Retry on rate limits. | |
if response.status_code == 429: | |
log.debug("Retrying due to status code %i", response.status_code) | |
return True | |
# Retry internal errors. | |
if response.status_code >= 500: | |
log.debug("Retrying due to status code %i", response.status_code) | |
return True | |
log.debug("Not retrying") | |
return False | |
def _idempotency_key(self) -> str: | |
return f"stainless-python-retry-{uuid.uuid4()}" | |
class _DefaultHttpxClient(httpx.Client): | |
def __init__(self, **kwargs: Any) -> None: | |
kwargs.setdefault("timeout", DEFAULT_TIMEOUT) | |
kwargs.setdefault("limits", DEFAULT_CONNECTION_LIMITS) | |
kwargs.setdefault("follow_redirects", True) | |
super().__init__(**kwargs) | |
if TYPE_CHECKING: | |
DefaultHttpxClient = httpx.Client | |
"""An alias to `httpx.Client` that provides the same defaults that this SDK | |
uses internally. | |
This is useful because overriding the `http_client` with your own instance of | |
`httpx.Client` will result in httpx's defaults being used, not ours. | |
""" | |
else: | |
DefaultHttpxClient = _DefaultHttpxClient | |
class SyncHttpxClientWrapper(DefaultHttpxClient): | |
def __del__(self) -> None: | |
try: | |
self.close() | |
except Exception: | |
pass | |
class SyncAPIClient(BaseClient[httpx.Client, Stream[Any]]): | |
_client: httpx.Client | |
_default_stream_cls: type[Stream[Any]] | None = None | |
def __init__( | |
self, | |
*, | |
version: str, | |
base_url: str | URL, | |
max_retries: int = DEFAULT_MAX_RETRIES, | |
timeout: float | Timeout | None | NotGiven = NOT_GIVEN, | |
transport: Transport | None = None, | |
proxies: ProxiesTypes | None = None, | |
limits: Limits | None = None, | |
http_client: httpx.Client | None = None, | |
custom_headers: Mapping[str, str] | None = None, | |
custom_query: Mapping[str, object] | None = None, | |
_strict_response_validation: bool, | |
) -> None: | |
if limits is not None: | |
warnings.warn( | |
"The `connection_pool_limits` argument is deprecated. The `http_client` argument should be passed instead", | |
category=DeprecationWarning, | |
stacklevel=3, | |
) | |
if http_client is not None: | |
raise ValueError("The `http_client` argument is mutually exclusive with `connection_pool_limits`") | |
else: | |
limits = DEFAULT_CONNECTION_LIMITS | |
if transport is not None: | |
warnings.warn( | |
"The `transport` argument is deprecated. The `http_client` argument should be passed instead", | |
category=DeprecationWarning, | |
stacklevel=3, | |
) | |
if http_client is not None: | |
raise ValueError("The `http_client` argument is mutually exclusive with `transport`") | |
if proxies is not None: | |
warnings.warn( | |
"The `proxies` argument is deprecated. The `http_client` argument should be passed instead", | |
category=DeprecationWarning, | |
stacklevel=3, | |
) | |
if http_client is not None: | |
raise ValueError("The `http_client` argument is mutually exclusive with `proxies`") | |
if not is_given(timeout): | |
# if the user passed in a custom http client with a non-default | |
# timeout set then we use that timeout. | |
# | |
# note: there is an edge case here where the user passes in a client | |
# where they've explicitly set the timeout to match the default timeout | |
# as this check is structural, meaning that we'll think they didn't | |
# pass in a timeout and will ignore it | |
if http_client and http_client.timeout != HTTPX_DEFAULT_TIMEOUT: | |
timeout = http_client.timeout | |
else: | |
timeout = DEFAULT_TIMEOUT | |
if http_client is not None and not isinstance(http_client, httpx.Client): # pyright: ignore[reportUnnecessaryIsInstance] | |
raise TypeError( | |
f"Invalid `http_client` argument; Expected an instance of `httpx.Client` but got {type(http_client)}" | |
) | |
super().__init__( | |
version=version, | |
limits=limits, | |
# cast to a valid type because mypy doesn't understand our type narrowing | |
timeout=cast(Timeout, timeout), | |
proxies=proxies, | |
base_url=base_url, | |
transport=transport, | |
max_retries=max_retries, | |
custom_query=custom_query, | |
custom_headers=custom_headers, | |
_strict_response_validation=_strict_response_validation, | |
) | |
self._client = http_client or SyncHttpxClientWrapper( | |
base_url=base_url, | |
# cast to a valid type because mypy doesn't understand our type narrowing | |
timeout=cast(Timeout, timeout), | |
proxies=proxies, | |
transport=transport, | |
limits=limits, | |
follow_redirects=True, | |
) | |
def is_closed(self) -> bool: | |
return self._client.is_closed | |
def close(self) -> None: | |
"""Close the underlying HTTPX client. | |
The client will *not* be usable after this. | |
""" | |
# If an error is thrown while constructing a client, self._client | |
# may not be present | |
if hasattr(self, "_client"): | |
self._client.close() | |
def __enter__(self: _T) -> _T: | |
return self | |
def __exit__( | |
self, | |
exc_type: type[BaseException] | None, | |
exc: BaseException | None, | |
exc_tb: TracebackType | None, | |
) -> None: | |
self.close() | |
def _prepare_options( | |
self, | |
options: FinalRequestOptions, # noqa: ARG002 | |
) -> FinalRequestOptions: | |
"""Hook for mutating the given options""" | |
return options | |
def _prepare_request( | |
self, | |
request: httpx.Request, # noqa: ARG002 | |
) -> None: | |
"""This method is used as a callback for mutating the `Request` object | |
after it has been constructed. | |
This is useful for cases where you want to add certain headers based off of | |
the request properties, e.g. `url`, `method` etc. | |
""" | |
return None | |
def request( | |
self, | |
cast_to: Type[ResponseT], | |
options: FinalRequestOptions, | |
remaining_retries: Optional[int] = None, | |
*, | |
stream: Literal[True], | |
stream_cls: Type[_StreamT], | |
) -> _StreamT: | |
... | |
def request( | |
self, | |
cast_to: Type[ResponseT], | |
options: FinalRequestOptions, | |
remaining_retries: Optional[int] = None, | |
*, | |
stream: Literal[False] = False, | |
) -> ResponseT: | |
... | |
def request( | |
self, | |
cast_to: Type[ResponseT], | |
options: FinalRequestOptions, | |
remaining_retries: Optional[int] = None, | |
*, | |
stream: bool = False, | |
stream_cls: Type[_StreamT] | None = None, | |
) -> ResponseT | _StreamT: | |
... | |
def request( | |
self, | |
cast_to: Type[ResponseT], | |
options: FinalRequestOptions, | |
remaining_retries: Optional[int] = None, | |
*, | |
stream: bool = False, | |
stream_cls: type[_StreamT] | None = None, | |
) -> ResponseT | _StreamT: | |
return self._request( | |
cast_to=cast_to, | |
options=options, | |
stream=stream, | |
stream_cls=stream_cls, | |
remaining_retries=remaining_retries, | |
) | |
def _request( | |
self, | |
*, | |
cast_to: Type[ResponseT], | |
options: FinalRequestOptions, | |
remaining_retries: int | None, | |
stream: bool, | |
stream_cls: type[_StreamT] | None, | |
) -> ResponseT | _StreamT: | |
# create a copy of the options we were given so that if the | |
# options are mutated later & we then retry, the retries are | |
# given the original options | |
input_options = model_copy(options) | |
cast_to = self._maybe_override_cast_to(cast_to, options) | |
options = self._prepare_options(options) | |
retries = self._remaining_retries(remaining_retries, options) | |
request = self._build_request(options) | |
self._prepare_request(request) | |
kwargs: HttpxSendArgs = {} | |
if self.custom_auth is not None: | |
kwargs["auth"] = self.custom_auth | |
log.debug("Sending HTTP Request: %s %s", request.method, request.url) | |
try: | |
response = self._client.send( | |
request, | |
stream=stream or self._should_stream_response_body(request=request), | |
**kwargs, | |
) | |
except httpx.TimeoutException as err: | |
log.debug("Encountered httpx.TimeoutException", exc_info=True) | |
if retries > 0: | |
return self._retry_request( | |
input_options, | |
cast_to, | |
retries, | |
stream=stream, | |
stream_cls=stream_cls, | |
response_headers=None, | |
) | |
log.debug("Raising timeout error") | |
raise APITimeoutError(request=request) from err | |
except Exception as err: | |
log.debug("Encountered Exception", exc_info=True) | |
if retries > 0: | |
return self._retry_request( | |
input_options, | |
cast_to, | |
retries, | |
stream=stream, | |
stream_cls=stream_cls, | |
response_headers=None, | |
) | |
log.debug("Raising connection error") | |
raise APIConnectionError(request=request) from err | |
log.debug( | |
'HTTP Response: %s %s "%i %s" %s', | |
request.method, | |
request.url, | |
response.status_code, | |
response.reason_phrase, | |
response.headers, | |
) | |
log.debug("request_id: %s", response.headers.get("request-id")) | |
try: | |
response.raise_for_status() | |
except httpx.HTTPStatusError as err: # thrown on 4xx and 5xx status code | |
log.debug("Encountered httpx.HTTPStatusError", exc_info=True) | |
if retries > 0 and self._should_retry(err.response): | |
err.response.close() | |
return self._retry_request( | |
input_options, | |
cast_to, | |
retries, | |
err.response.headers, | |
stream=stream, | |
stream_cls=stream_cls, | |
) | |
# If the response is streamed then we need to explicitly read the response | |
# to completion before attempting to access the response text. | |
if not err.response.is_closed: | |
err.response.read() | |
log.debug("Re-raising status error") | |
raise self._make_status_error_from_response(err.response) from None | |
return self._process_response( | |
cast_to=cast_to, | |
options=options, | |
response=response, | |
stream=stream, | |
stream_cls=stream_cls, | |
) | |
def _retry_request( | |
self, | |
options: FinalRequestOptions, | |
cast_to: Type[ResponseT], | |
remaining_retries: int, | |
response_headers: httpx.Headers | None, | |
*, | |
stream: bool, | |
stream_cls: type[_StreamT] | None, | |
) -> ResponseT | _StreamT: | |
remaining = remaining_retries - 1 | |
if remaining == 1: | |
log.debug("1 retry left") | |
else: | |
log.debug("%i retries left", remaining) | |
timeout = self._calculate_retry_timeout(remaining, options, response_headers) | |
log.info("Retrying request to %s in %f seconds", options.url, timeout) | |
# In a synchronous context we are blocking the entire thread. Up to the library user to run the client in a | |
# different thread if necessary. | |
time.sleep(timeout) | |
return self._request( | |
options=options, | |
cast_to=cast_to, | |
remaining_retries=remaining, | |
stream=stream, | |
stream_cls=stream_cls, | |
) | |
def _process_response( | |
self, | |
*, | |
cast_to: Type[ResponseT], | |
options: FinalRequestOptions, | |
response: httpx.Response, | |
stream: bool, | |
stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None, | |
) -> ResponseT: | |
if response.request.headers.get(RAW_RESPONSE_HEADER) == "true": | |
return cast( | |
ResponseT, | |
LegacyAPIResponse( | |
raw=response, | |
client=self, | |
cast_to=cast_to, | |
stream=stream, | |
stream_cls=stream_cls, | |
options=options, | |
), | |
) | |
origin = get_origin(cast_to) or cast_to | |
if inspect.isclass(origin) and issubclass(origin, BaseAPIResponse): | |
if not issubclass(origin, APIResponse): | |
raise TypeError(f"API Response types must subclass {APIResponse}; Received {origin}") | |
response_cls = cast("type[BaseAPIResponse[Any]]", cast_to) | |
return cast( | |
ResponseT, | |
response_cls( | |
raw=response, | |
client=self, | |
cast_to=extract_response_type(response_cls), | |
stream=stream, | |
stream_cls=stream_cls, | |
options=options, | |
), | |
) | |
if cast_to == httpx.Response: | |
return cast(ResponseT, response) | |
api_response = APIResponse( | |
raw=response, | |
client=self, | |
cast_to=cast("type[ResponseT]", cast_to), # pyright: ignore[reportUnnecessaryCast] | |
stream=stream, | |
stream_cls=stream_cls, | |
options=options, | |
) | |
if bool(response.request.headers.get(RAW_RESPONSE_HEADER)): | |
return cast(ResponseT, api_response) | |
return api_response.parse() | |
def _request_api_list( | |
self, | |
model: Type[object], | |
page: Type[SyncPageT], | |
options: FinalRequestOptions, | |
) -> SyncPageT: | |
def _parser(resp: SyncPageT) -> SyncPageT: | |
resp._set_private_attributes( | |
client=self, | |
model=model, | |
options=options, | |
) | |
return resp | |
options.post_parser = _parser | |
return self.request(page, options, stream=False) | |
def get( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
options: RequestOptions = {}, | |
stream: Literal[False] = False, | |
) -> ResponseT: | |
... | |
def get( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
options: RequestOptions = {}, | |
stream: Literal[True], | |
stream_cls: type[_StreamT], | |
) -> _StreamT: | |
... | |
def get( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
options: RequestOptions = {}, | |
stream: bool, | |
stream_cls: type[_StreamT] | None = None, | |
) -> ResponseT | _StreamT: | |
... | |
def get( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
options: RequestOptions = {}, | |
stream: bool = False, | |
stream_cls: type[_StreamT] | None = None, | |
) -> ResponseT | _StreamT: | |
opts = FinalRequestOptions.construct(method="get", url=path, **options) | |
# cast is required because mypy complains about returning Any even though | |
# it understands the type variables | |
return cast(ResponseT, self.request(cast_to, opts, stream=stream, stream_cls=stream_cls)) | |
def post( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
body: Body | None = None, | |
options: RequestOptions = {}, | |
files: RequestFiles | None = None, | |
stream: Literal[False] = False, | |
) -> ResponseT: | |
... | |
def post( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
body: Body | None = None, | |
options: RequestOptions = {}, | |
files: RequestFiles | None = None, | |
stream: Literal[True], | |
stream_cls: type[_StreamT], | |
) -> _StreamT: | |
... | |
def post( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
body: Body | None = None, | |
options: RequestOptions = {}, | |
files: RequestFiles | None = None, | |
stream: bool, | |
stream_cls: type[_StreamT] | None = None, | |
) -> ResponseT | _StreamT: | |
... | |
def post( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
body: Body | None = None, | |
options: RequestOptions = {}, | |
files: RequestFiles | None = None, | |
stream: bool = False, | |
stream_cls: type[_StreamT] | None = None, | |
) -> ResponseT | _StreamT: | |
opts = FinalRequestOptions.construct( | |
method="post", url=path, json_data=body, files=to_httpx_files(files), **options | |
) | |
return cast(ResponseT, self.request(cast_to, opts, stream=stream, stream_cls=stream_cls)) | |
def patch( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
body: Body | None = None, | |
options: RequestOptions = {}, | |
) -> ResponseT: | |
opts = FinalRequestOptions.construct(method="patch", url=path, json_data=body, **options) | |
return self.request(cast_to, opts) | |
def put( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
body: Body | None = None, | |
files: RequestFiles | None = None, | |
options: RequestOptions = {}, | |
) -> ResponseT: | |
opts = FinalRequestOptions.construct( | |
method="put", url=path, json_data=body, files=to_httpx_files(files), **options | |
) | |
return self.request(cast_to, opts) | |
def delete( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
body: Body | None = None, | |
options: RequestOptions = {}, | |
) -> ResponseT: | |
opts = FinalRequestOptions.construct(method="delete", url=path, json_data=body, **options) | |
return self.request(cast_to, opts) | |
def get_api_list( | |
self, | |
path: str, | |
*, | |
model: Type[object], | |
page: Type[SyncPageT], | |
body: Body | None = None, | |
options: RequestOptions = {}, | |
method: str = "get", | |
) -> SyncPageT: | |
opts = FinalRequestOptions.construct(method=method, url=path, json_data=body, **options) | |
return self._request_api_list(model, page, opts) | |
class _DefaultAsyncHttpxClient(httpx.AsyncClient): | |
def __init__(self, **kwargs: Any) -> None: | |
kwargs.setdefault("timeout", DEFAULT_TIMEOUT) | |
kwargs.setdefault("limits", DEFAULT_CONNECTION_LIMITS) | |
kwargs.setdefault("follow_redirects", True) | |
super().__init__(**kwargs) | |
if TYPE_CHECKING: | |
DefaultAsyncHttpxClient = httpx.AsyncClient | |
"""An alias to `httpx.AsyncClient` that provides the same defaults that this SDK | |
uses internally. | |
This is useful because overriding the `http_client` with your own instance of | |
`httpx.AsyncClient` will result in httpx's defaults being used, not ours. | |
""" | |
else: | |
DefaultAsyncHttpxClient = _DefaultAsyncHttpxClient | |
class AsyncHttpxClientWrapper(DefaultAsyncHttpxClient): | |
def __del__(self) -> None: | |
try: | |
# TODO(someday): support non asyncio runtimes here | |
asyncio.get_running_loop().create_task(self.aclose()) | |
except Exception: | |
pass | |
class AsyncAPIClient(BaseClient[httpx.AsyncClient, AsyncStream[Any]]): | |
_client: httpx.AsyncClient | |
_default_stream_cls: type[AsyncStream[Any]] | None = None | |
def __init__( | |
self, | |
*, | |
version: str, | |
base_url: str | URL, | |
_strict_response_validation: bool, | |
max_retries: int = DEFAULT_MAX_RETRIES, | |
timeout: float | Timeout | None | NotGiven = NOT_GIVEN, | |
transport: AsyncTransport | None = None, | |
proxies: ProxiesTypes | None = None, | |
limits: Limits | None = None, | |
http_client: httpx.AsyncClient | None = None, | |
custom_headers: Mapping[str, str] | None = None, | |
custom_query: Mapping[str, object] | None = None, | |
) -> None: | |
if limits is not None: | |
warnings.warn( | |
"The `connection_pool_limits` argument is deprecated. The `http_client` argument should be passed instead", | |
category=DeprecationWarning, | |
stacklevel=3, | |
) | |
if http_client is not None: | |
raise ValueError("The `http_client` argument is mutually exclusive with `connection_pool_limits`") | |
else: | |
limits = DEFAULT_CONNECTION_LIMITS | |
if transport is not None: | |
warnings.warn( | |
"The `transport` argument is deprecated. The `http_client` argument should be passed instead", | |
category=DeprecationWarning, | |
stacklevel=3, | |
) | |
if http_client is not None: | |
raise ValueError("The `http_client` argument is mutually exclusive with `transport`") | |
if proxies is not None: | |
warnings.warn( | |
"The `proxies` argument is deprecated. The `http_client` argument should be passed instead", | |
category=DeprecationWarning, | |
stacklevel=3, | |
) | |
if http_client is not None: | |
raise ValueError("The `http_client` argument is mutually exclusive with `proxies`") | |
if not is_given(timeout): | |
# if the user passed in a custom http client with a non-default | |
# timeout set then we use that timeout. | |
# | |
# note: there is an edge case here where the user passes in a client | |
# where they've explicitly set the timeout to match the default timeout | |
# as this check is structural, meaning that we'll think they didn't | |
# pass in a timeout and will ignore it | |
if http_client and http_client.timeout != HTTPX_DEFAULT_TIMEOUT: | |
timeout = http_client.timeout | |
else: | |
timeout = DEFAULT_TIMEOUT | |
if http_client is not None and not isinstance(http_client, httpx.AsyncClient): # pyright: ignore[reportUnnecessaryIsInstance] | |
raise TypeError( | |
f"Invalid `http_client` argument; Expected an instance of `httpx.AsyncClient` but got {type(http_client)}" | |
) | |
super().__init__( | |
version=version, | |
base_url=base_url, | |
limits=limits, | |
# cast to a valid type because mypy doesn't understand our type narrowing | |
timeout=cast(Timeout, timeout), | |
proxies=proxies, | |
transport=transport, | |
max_retries=max_retries, | |
custom_query=custom_query, | |
custom_headers=custom_headers, | |
_strict_response_validation=_strict_response_validation, | |
) | |
self._client = http_client or AsyncHttpxClientWrapper( | |
base_url=base_url, | |
# cast to a valid type because mypy doesn't understand our type narrowing | |
timeout=cast(Timeout, timeout), | |
proxies=proxies, | |
transport=transport, | |
limits=limits, | |
follow_redirects=True, | |
) | |
def is_closed(self) -> bool: | |
return self._client.is_closed | |
async def close(self) -> None: | |
"""Close the underlying HTTPX client. | |
The client will *not* be usable after this. | |
""" | |
await self._client.aclose() | |
async def __aenter__(self: _T) -> _T: | |
return self | |
async def __aexit__( | |
self, | |
exc_type: type[BaseException] | None, | |
exc: BaseException | None, | |
exc_tb: TracebackType | None, | |
) -> None: | |
await self.close() | |
async def _prepare_options( | |
self, | |
options: FinalRequestOptions, # noqa: ARG002 | |
) -> FinalRequestOptions: | |
"""Hook for mutating the given options""" | |
return options | |
async def _prepare_request( | |
self, | |
request: httpx.Request, # noqa: ARG002 | |
) -> None: | |
"""This method is used as a callback for mutating the `Request` object | |
after it has been constructed. | |
This is useful for cases where you want to add certain headers based off of | |
the request properties, e.g. `url`, `method` etc. | |
""" | |
return None | |
async def request( | |
self, | |
cast_to: Type[ResponseT], | |
options: FinalRequestOptions, | |
*, | |
stream: Literal[False] = False, | |
remaining_retries: Optional[int] = None, | |
) -> ResponseT: | |
... | |
async def request( | |
self, | |
cast_to: Type[ResponseT], | |
options: FinalRequestOptions, | |
*, | |
stream: Literal[True], | |
stream_cls: type[_AsyncStreamT], | |
remaining_retries: Optional[int] = None, | |
) -> _AsyncStreamT: | |
... | |
async def request( | |
self, | |
cast_to: Type[ResponseT], | |
options: FinalRequestOptions, | |
*, | |
stream: bool, | |
stream_cls: type[_AsyncStreamT] | None = None, | |
remaining_retries: Optional[int] = None, | |
) -> ResponseT | _AsyncStreamT: | |
... | |
async def request( | |
self, | |
cast_to: Type[ResponseT], | |
options: FinalRequestOptions, | |
*, | |
stream: bool = False, | |
stream_cls: type[_AsyncStreamT] | None = None, | |
remaining_retries: Optional[int] = None, | |
) -> ResponseT | _AsyncStreamT: | |
return await self._request( | |
cast_to=cast_to, | |
options=options, | |
stream=stream, | |
stream_cls=stream_cls, | |
remaining_retries=remaining_retries, | |
) | |
async def _request( | |
self, | |
cast_to: Type[ResponseT], | |
options: FinalRequestOptions, | |
*, | |
stream: bool, | |
stream_cls: type[_AsyncStreamT] | None, | |
remaining_retries: int | None, | |
) -> ResponseT | _AsyncStreamT: | |
if self._platform is None: | |
# `get_platform` can make blocking IO calls so we | |
# execute it earlier while we are in an async context | |
self._platform = await asyncify(get_platform)() | |
# create a copy of the options we were given so that if the | |
# options are mutated later & we then retry, the retries are | |
# given the original options | |
input_options = model_copy(options) | |
cast_to = self._maybe_override_cast_to(cast_to, options) | |
options = await self._prepare_options(options) | |
retries = self._remaining_retries(remaining_retries, options) | |
request = self._build_request(options) | |
await self._prepare_request(request) | |
kwargs: HttpxSendArgs = {} | |
if self.custom_auth is not None: | |
kwargs["auth"] = self.custom_auth | |
try: | |
response = await self._client.send( | |
request, | |
stream=stream or self._should_stream_response_body(request=request), | |
**kwargs, | |
) | |
except httpx.TimeoutException as err: | |
log.debug("Encountered httpx.TimeoutException", exc_info=True) | |
if retries > 0: | |
return await self._retry_request( | |
input_options, | |
cast_to, | |
retries, | |
stream=stream, | |
stream_cls=stream_cls, | |
response_headers=None, | |
) | |
log.debug("Raising timeout error") | |
raise APITimeoutError(request=request) from err | |
except Exception as err: | |
log.debug("Encountered Exception", exc_info=True) | |
if retries > 0: | |
return await self._retry_request( | |
input_options, | |
cast_to, | |
retries, | |
stream=stream, | |
stream_cls=stream_cls, | |
response_headers=None, | |
) | |
log.debug("Raising connection error") | |
raise APIConnectionError(request=request) from err | |
log.debug( | |
'HTTP Request: %s %s "%i %s"', request.method, request.url, response.status_code, response.reason_phrase | |
) | |
try: | |
response.raise_for_status() | |
except httpx.HTTPStatusError as err: # thrown on 4xx and 5xx status code | |
log.debug("Encountered httpx.HTTPStatusError", exc_info=True) | |
if retries > 0 and self._should_retry(err.response): | |
await err.response.aclose() | |
return await self._retry_request( | |
input_options, | |
cast_to, | |
retries, | |
err.response.headers, | |
stream=stream, | |
stream_cls=stream_cls, | |
) | |
# If the response is streamed then we need to explicitly read the response | |
# to completion before attempting to access the response text. | |
if not err.response.is_closed: | |
await err.response.aread() | |
log.debug("Re-raising status error") | |
raise self._make_status_error_from_response(err.response) from None | |
return await self._process_response( | |
cast_to=cast_to, | |
options=options, | |
response=response, | |
stream=stream, | |
stream_cls=stream_cls, | |
) | |
async def _retry_request( | |
self, | |
options: FinalRequestOptions, | |
cast_to: Type[ResponseT], | |
remaining_retries: int, | |
response_headers: httpx.Headers | None, | |
*, | |
stream: bool, | |
stream_cls: type[_AsyncStreamT] | None, | |
) -> ResponseT | _AsyncStreamT: | |
remaining = remaining_retries - 1 | |
if remaining == 1: | |
log.debug("1 retry left") | |
else: | |
log.debug("%i retries left", remaining) | |
timeout = self._calculate_retry_timeout(remaining, options, response_headers) | |
log.info("Retrying request to %s in %f seconds", options.url, timeout) | |
await anyio.sleep(timeout) | |
return await self._request( | |
options=options, | |
cast_to=cast_to, | |
remaining_retries=remaining, | |
stream=stream, | |
stream_cls=stream_cls, | |
) | |
async def _process_response( | |
self, | |
*, | |
cast_to: Type[ResponseT], | |
options: FinalRequestOptions, | |
response: httpx.Response, | |
stream: bool, | |
stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None, | |
) -> ResponseT: | |
if response.request.headers.get(RAW_RESPONSE_HEADER) == "true": | |
return cast( | |
ResponseT, | |
LegacyAPIResponse( | |
raw=response, | |
client=self, | |
cast_to=cast_to, | |
stream=stream, | |
stream_cls=stream_cls, | |
options=options, | |
), | |
) | |
origin = get_origin(cast_to) or cast_to | |
if inspect.isclass(origin) and issubclass(origin, BaseAPIResponse): | |
if not issubclass(origin, AsyncAPIResponse): | |
raise TypeError(f"API Response types must subclass {AsyncAPIResponse}; Received {origin}") | |
response_cls = cast("type[BaseAPIResponse[Any]]", cast_to) | |
return cast( | |
"ResponseT", | |
response_cls( | |
raw=response, | |
client=self, | |
cast_to=extract_response_type(response_cls), | |
stream=stream, | |
stream_cls=stream_cls, | |
options=options, | |
), | |
) | |
if cast_to == httpx.Response: | |
return cast(ResponseT, response) | |
api_response = AsyncAPIResponse( | |
raw=response, | |
client=self, | |
cast_to=cast("type[ResponseT]", cast_to), # pyright: ignore[reportUnnecessaryCast] | |
stream=stream, | |
stream_cls=stream_cls, | |
options=options, | |
) | |
if bool(response.request.headers.get(RAW_RESPONSE_HEADER)): | |
return cast(ResponseT, api_response) | |
return await api_response.parse() | |
def _request_api_list( | |
self, | |
model: Type[_T], | |
page: Type[AsyncPageT], | |
options: FinalRequestOptions, | |
) -> AsyncPaginator[_T, AsyncPageT]: | |
return AsyncPaginator(client=self, options=options, page_cls=page, model=model) | |
async def get( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
options: RequestOptions = {}, | |
stream: Literal[False] = False, | |
) -> ResponseT: | |
... | |
async def get( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
options: RequestOptions = {}, | |
stream: Literal[True], | |
stream_cls: type[_AsyncStreamT], | |
) -> _AsyncStreamT: | |
... | |
async def get( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
options: RequestOptions = {}, | |
stream: bool, | |
stream_cls: type[_AsyncStreamT] | None = None, | |
) -> ResponseT | _AsyncStreamT: | |
... | |
async def get( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
options: RequestOptions = {}, | |
stream: bool = False, | |
stream_cls: type[_AsyncStreamT] | None = None, | |
) -> ResponseT | _AsyncStreamT: | |
opts = FinalRequestOptions.construct(method="get", url=path, **options) | |
return await self.request(cast_to, opts, stream=stream, stream_cls=stream_cls) | |
async def post( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
body: Body | None = None, | |
files: RequestFiles | None = None, | |
options: RequestOptions = {}, | |
stream: Literal[False] = False, | |
) -> ResponseT: | |
... | |
async def post( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
body: Body | None = None, | |
files: RequestFiles | None = None, | |
options: RequestOptions = {}, | |
stream: Literal[True], | |
stream_cls: type[_AsyncStreamT], | |
) -> _AsyncStreamT: | |
... | |
async def post( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
body: Body | None = None, | |
files: RequestFiles | None = None, | |
options: RequestOptions = {}, | |
stream: bool, | |
stream_cls: type[_AsyncStreamT] | None = None, | |
) -> ResponseT | _AsyncStreamT: | |
... | |
async def post( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
body: Body | None = None, | |
files: RequestFiles | None = None, | |
options: RequestOptions = {}, | |
stream: bool = False, | |
stream_cls: type[_AsyncStreamT] | None = None, | |
) -> ResponseT | _AsyncStreamT: | |
opts = FinalRequestOptions.construct( | |
method="post", url=path, json_data=body, files=await async_to_httpx_files(files), **options | |
) | |
return await self.request(cast_to, opts, stream=stream, stream_cls=stream_cls) | |
async def patch( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
body: Body | None = None, | |
options: RequestOptions = {}, | |
) -> ResponseT: | |
opts = FinalRequestOptions.construct(method="patch", url=path, json_data=body, **options) | |
return await self.request(cast_to, opts) | |
async def put( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
body: Body | None = None, | |
files: RequestFiles | None = None, | |
options: RequestOptions = {}, | |
) -> ResponseT: | |
opts = FinalRequestOptions.construct( | |
method="put", url=path, json_data=body, files=await async_to_httpx_files(files), **options | |
) | |
return await self.request(cast_to, opts) | |
async def delete( | |
self, | |
path: str, | |
*, | |
cast_to: Type[ResponseT], | |
body: Body | None = None, | |
options: RequestOptions = {}, | |
) -> ResponseT: | |
opts = FinalRequestOptions.construct(method="delete", url=path, json_data=body, **options) | |
return await self.request(cast_to, opts) | |
def get_api_list( | |
self, | |
path: str, | |
*, | |
model: Type[_T], | |
page: Type[AsyncPageT], | |
body: Body | None = None, | |
options: RequestOptions = {}, | |
method: str = "get", | |
) -> AsyncPaginator[_T, AsyncPageT]: | |
opts = FinalRequestOptions.construct(method=method, url=path, json_data=body, **options) | |
return self._request_api_list(model, page, opts) | |
def make_request_options( | |
*, | |
query: Query | None = None, | |
extra_headers: Headers | None = None, | |
extra_query: Query | None = None, | |
extra_body: Body | None = None, | |
idempotency_key: str | None = None, | |
timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN, | |
post_parser: PostParser | NotGiven = NOT_GIVEN, | |
) -> RequestOptions: | |
"""Create a dict of type RequestOptions without keys of NotGiven values.""" | |
options: RequestOptions = {} | |
if extra_headers is not None: | |
options["headers"] = extra_headers | |
if extra_body is not None: | |
options["extra_json"] = cast(AnyMapping, extra_body) | |
if query is not None: | |
options["params"] = query | |
if extra_query is not None: | |
options["params"] = {**options.get("params", {}), **extra_query} | |
if not isinstance(timeout, NotGiven): | |
options["timeout"] = timeout | |
if idempotency_key is not None: | |
options["idempotency_key"] = idempotency_key | |
if is_given(post_parser): | |
# internal | |
options["post_parser"] = post_parser # type: ignore | |
return options | |
class ForceMultipartDict(Dict[str, None]): | |
def __bool__(self) -> bool: | |
return True | |
class OtherPlatform: | |
def __init__(self, name: str) -> None: | |
self.name = name | |
def __str__(self) -> str: | |
return f"Other:{self.name}" | |
Platform = Union[ | |
OtherPlatform, | |
Literal[ | |
"MacOS", | |
"Linux", | |
"Windows", | |
"FreeBSD", | |
"OpenBSD", | |
"iOS", | |
"Android", | |
"Unknown", | |
], | |
] | |
def get_platform() -> Platform: | |
try: | |
system = platform.system().lower() | |
platform_name = platform.platform().lower() | |
except Exception: | |
return "Unknown" | |
if "iphone" in platform_name or "ipad" in platform_name: | |
# Tested using Python3IDE on an iPhone 11 and Pythonista on an iPad 7 | |
# system is Darwin and platform_name is a string like: | |
# - Darwin-21.6.0-iPhone12,1-64bit | |
# - Darwin-21.6.0-iPad7,11-64bit | |
return "iOS" | |
if system == "darwin": | |
return "MacOS" | |
if system == "windows": | |
return "Windows" | |
if "android" in platform_name: | |
# Tested using Pydroid 3 | |
# system is Linux and platform_name is a string like 'Linux-5.10.81-android12-9-00001-geba40aecb3b7-ab8534902-aarch64-with-libc' | |
return "Android" | |
if system == "linux": | |
# https://distro.readthedocs.io/en/latest/#distro.id | |
distro_id = distro.id() | |
if distro_id == "freebsd": | |
return "FreeBSD" | |
if distro_id == "openbsd": | |
return "OpenBSD" | |
return "Linux" | |
if platform_name: | |
return OtherPlatform(platform_name) | |
return "Unknown" | |
def platform_headers(version: str, *, platform: Platform | None) -> Dict[str, str]: | |
return { | |
"X-Stainless-Lang": "python", | |
"X-Stainless-Package-Version": version, | |
"X-Stainless-OS": str(platform or get_platform()), | |
"X-Stainless-Arch": str(get_architecture()), | |
"X-Stainless-Runtime": get_python_runtime(), | |
"X-Stainless-Runtime-Version": get_python_version(), | |
} | |
class OtherArch: | |
def __init__(self, name: str) -> None: | |
self.name = name | |
def __str__(self) -> str: | |
return f"other:{self.name}" | |
Arch = Union[OtherArch, Literal["x32", "x64", "arm", "arm64", "unknown"]] | |
def get_python_runtime() -> str: | |
try: | |
return platform.python_implementation() | |
except Exception: | |
return "unknown" | |
def get_python_version() -> str: | |
try: | |
return platform.python_version() | |
except Exception: | |
return "unknown" | |
def get_architecture() -> Arch: | |
try: | |
python_bitness, _ = platform.architecture() | |
machine = platform.machine().lower() | |
except Exception: | |
return "unknown" | |
if machine in ("arm64", "aarch64"): | |
return "arm64" | |
# TODO: untested | |
if machine == "arm": | |
return "arm" | |
if machine == "x86_64": | |
return "x64" | |
# TODO: untested | |
if python_bitness == "32bit": | |
return "x32" | |
if machine: | |
return OtherArch(machine) | |
return "unknown" | |
def _merge_mappings( | |
obj1: Mapping[_T_co, Union[_T, Omit]], | |
obj2: Mapping[_T_co, Union[_T, Omit]], | |
) -> Dict[_T_co, _T]: | |
"""Merge two mappings of the same type, removing any values that are instances of `Omit`. | |
In cases with duplicate keys the second mapping takes precedence. | |
""" | |
merged = {**obj1, **obj2} | |
return {key: value for key, value in merged.items() if not isinstance(value, Omit)} | |