Spaces:
Running
Running
from __future__ import annotations | |
import os | |
import inspect | |
import logging | |
import datetime | |
import functools | |
from typing import TYPE_CHECKING, Any, Union, Generic, TypeVar, Callable, Iterator, AsyncIterator, cast, overload | |
from typing_extensions import Awaitable, ParamSpec, override, deprecated, get_origin | |
import anyio | |
import httpx | |
import pydantic | |
from ._types import NoneType | |
from ._utils import is_given, extract_type_arg, is_annotated_type | |
from ._models import BaseModel, is_basemodel | |
from ._constants import RAW_RESPONSE_HEADER | |
from ._streaming import Stream, AsyncStream, is_stream_class_type, extract_stream_chunk_type | |
from ._exceptions import APIResponseValidationError | |
if TYPE_CHECKING: | |
from ._models import FinalRequestOptions | |
from ._base_client import BaseClient | |
P = ParamSpec("P") | |
R = TypeVar("R") | |
_T = TypeVar("_T") | |
log: logging.Logger = logging.getLogger(__name__) | |
class LegacyAPIResponse(Generic[R]): | |
"""This is a legacy class as it will be replaced by `APIResponse` | |
and `AsyncAPIResponse` in the `_response.py` file in the next major | |
release. | |
For the sync client this will mostly be the same with the exception | |
of `content` & `text` will be methods instead of properties. In the | |
async client, all methods will be async. | |
A migration script will be provided & the migration in general should | |
be smooth. | |
""" | |
_cast_to: type[R] | |
_client: BaseClient[Any, Any] | |
_parsed_by_type: dict[type[Any], Any] | |
_stream: bool | |
_stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None | |
_options: FinalRequestOptions | |
http_response: httpx.Response | |
def __init__( | |
self, | |
*, | |
raw: httpx.Response, | |
cast_to: type[R], | |
client: BaseClient[Any, Any], | |
stream: bool, | |
stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None, | |
options: FinalRequestOptions, | |
) -> None: | |
self._cast_to = cast_to | |
self._client = client | |
self._parsed_by_type = {} | |
self._stream = stream | |
self._stream_cls = stream_cls | |
self._options = options | |
self.http_response = raw | |
def request_id(self) -> str | None: | |
return self.http_response.headers.get("request-id") # type: ignore[no-any-return] | |
def parse(self, *, to: type[_T]) -> _T: | |
... | |
def parse(self) -> R: | |
... | |
def parse(self, *, to: type[_T] | None = None) -> R | _T: | |
"""Returns the rich python representation of this response's data. | |
NOTE: For the async client: this will become a coroutine in the next major version. | |
For lower-level control, see `.read()`, `.json()`, `.iter_bytes()`. | |
You can customise the type that the response is parsed into through | |
the `to` argument, e.g. | |
```py | |
from anthropic import BaseModel | |
class MyModel(BaseModel): | |
foo: str | |
obj = response.parse(to=MyModel) | |
print(obj.foo) | |
``` | |
We support parsing: | |
- `BaseModel` | |
- `dict` | |
- `list` | |
- `Union` | |
- `str` | |
- `int` | |
- `float` | |
- `httpx.Response` | |
""" | |
cache_key = to if to is not None else self._cast_to | |
cached = self._parsed_by_type.get(cache_key) | |
if cached is not None: | |
return cached # type: ignore[no-any-return] | |
parsed = self._parse(to=to) | |
if is_given(self._options.post_parser): | |
parsed = self._options.post_parser(parsed) | |
self._parsed_by_type[cache_key] = parsed | |
return parsed | |
def headers(self) -> httpx.Headers: | |
return self.http_response.headers | |
def http_request(self) -> httpx.Request: | |
return self.http_response.request | |
def status_code(self) -> int: | |
return self.http_response.status_code | |
def url(self) -> httpx.URL: | |
return self.http_response.url | |
def method(self) -> str: | |
return self.http_request.method | |
def content(self) -> bytes: | |
"""Return the binary response content. | |
NOTE: this will be removed in favour of `.read()` in the | |
next major version. | |
""" | |
return self.http_response.content | |
def text(self) -> str: | |
"""Return the decoded response content. | |
NOTE: this will be turned into a method in the next major version. | |
""" | |
return self.http_response.text | |
def http_version(self) -> str: | |
return self.http_response.http_version | |
def is_closed(self) -> bool: | |
return self.http_response.is_closed | |
def elapsed(self) -> datetime.timedelta: | |
"""The time taken for the complete request/response cycle to complete.""" | |
return self.http_response.elapsed | |
def _parse(self, *, to: type[_T] | None = None) -> R | _T: | |
# unwrap `Annotated[T, ...]` -> `T` | |
if to and is_annotated_type(to): | |
to = extract_type_arg(to, 0) | |
if self._stream: | |
if to: | |
if not is_stream_class_type(to): | |
raise TypeError(f"Expected custom parse type to be a subclass of {Stream} or {AsyncStream}") | |
return cast( | |
_T, | |
to( | |
cast_to=extract_stream_chunk_type( | |
to, | |
failure_message="Expected custom stream type to be passed with a type argument, e.g. Stream[ChunkType]", | |
), | |
response=self.http_response, | |
client=cast(Any, self._client), | |
), | |
) | |
if self._stream_cls: | |
return cast( | |
R, | |
self._stream_cls( | |
cast_to=extract_stream_chunk_type(self._stream_cls), | |
response=self.http_response, | |
client=cast(Any, self._client), | |
), | |
) | |
stream_cls = cast("type[Stream[Any]] | type[AsyncStream[Any]] | None", self._client._default_stream_cls) | |
if stream_cls is None: | |
raise MissingStreamClassError() | |
return cast( | |
R, | |
stream_cls( | |
cast_to=self._cast_to, | |
response=self.http_response, | |
client=cast(Any, self._client), | |
), | |
) | |
cast_to = to if to is not None else self._cast_to | |
# unwrap `Annotated[T, ...]` -> `T` | |
if is_annotated_type(cast_to): | |
cast_to = extract_type_arg(cast_to, 0) | |
if cast_to is NoneType: | |
return cast(R, None) | |
response = self.http_response | |
if cast_to == str: | |
return cast(R, response.text) | |
if cast_to == int: | |
return cast(R, int(response.text)) | |
if cast_to == float: | |
return cast(R, float(response.text)) | |
origin = get_origin(cast_to) or cast_to | |
if inspect.isclass(origin) and issubclass(origin, HttpxBinaryResponseContent): | |
return cast(R, cast_to(response)) # type: ignore | |
if origin == LegacyAPIResponse: | |
raise RuntimeError("Unexpected state - cast_to is `APIResponse`") | |
if inspect.isclass(origin) and issubclass(origin, httpx.Response): | |
# Because of the invariance of our ResponseT TypeVar, users can subclass httpx.Response | |
# and pass that class to our request functions. We cannot change the variance to be either | |
# covariant or contravariant as that makes our usage of ResponseT illegal. We could construct | |
# the response class ourselves but that is something that should be supported directly in httpx | |
# as it would be easy to incorrectly construct the Response object due to the multitude of arguments. | |
if cast_to != httpx.Response: | |
raise ValueError(f"Subclasses of httpx.Response cannot be passed to `cast_to`") | |
return cast(R, response) | |
if inspect.isclass(origin) and not issubclass(origin, BaseModel) and issubclass(origin, pydantic.BaseModel): | |
raise TypeError("Pydantic models must subclass our base model type, e.g. `from anthropic import BaseModel`") | |
if ( | |
cast_to is not object | |
and not origin is list | |
and not origin is dict | |
and not origin is Union | |
and not issubclass(origin, BaseModel) | |
): | |
raise RuntimeError( | |
f"Unsupported type, expected {cast_to} to be a subclass of {BaseModel}, {dict}, {list}, {Union}, {NoneType}, {str} or {httpx.Response}." | |
) | |
# split is required to handle cases where additional information is included | |
# in the response, e.g. application/json; charset=utf-8 | |
content_type, *_ = response.headers.get("content-type", "*").split(";") | |
if content_type != "application/json": | |
if is_basemodel(cast_to): | |
try: | |
data = response.json() | |
except Exception as exc: | |
log.debug("Could not read JSON from response data due to %s - %s", type(exc), exc) | |
else: | |
return self._client._process_response_data( | |
data=data, | |
cast_to=cast_to, # type: ignore | |
response=response, | |
) | |
if self._client._strict_response_validation: | |
raise APIResponseValidationError( | |
response=response, | |
message=f"Expected Content-Type response header to be `application/json` but received `{content_type}` instead.", | |
body=response.text, | |
) | |
# If the API responds with content that isn't JSON then we just return | |
# the (decoded) text without performing any parsing so that you can still | |
# handle the response however you need to. | |
return response.text # type: ignore | |
data = response.json() | |
return self._client._process_response_data( | |
data=data, | |
cast_to=cast_to, # type: ignore | |
response=response, | |
) | |
def __repr__(self) -> str: | |
return f"<APIResponse [{self.status_code} {self.http_response.reason_phrase}] type={self._cast_to}>" | |
class MissingStreamClassError(TypeError): | |
def __init__(self) -> None: | |
super().__init__( | |
"The `stream` argument was set to `True` but the `stream_cls` argument was not given. See `anthropic._streaming` for reference", | |
) | |
def to_raw_response_wrapper(func: Callable[P, R]) -> Callable[P, LegacyAPIResponse[R]]: | |
"""Higher order function that takes one of our bound API methods and wraps it | |
to support returning the raw `APIResponse` object directly. | |
""" | |
def wrapped(*args: P.args, **kwargs: P.kwargs) -> LegacyAPIResponse[R]: | |
extra_headers: dict[str, str] = {**(cast(Any, kwargs.get("extra_headers")) or {})} | |
extra_headers[RAW_RESPONSE_HEADER] = "true" | |
kwargs["extra_headers"] = extra_headers | |
return cast(LegacyAPIResponse[R], func(*args, **kwargs)) | |
return wrapped | |
def async_to_raw_response_wrapper(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[LegacyAPIResponse[R]]]: | |
"""Higher order function that takes one of our bound API methods and wraps it | |
to support returning the raw `APIResponse` object directly. | |
""" | |
async def wrapped(*args: P.args, **kwargs: P.kwargs) -> LegacyAPIResponse[R]: | |
extra_headers: dict[str, str] = {**(cast(Any, kwargs.get("extra_headers")) or {})} | |
extra_headers[RAW_RESPONSE_HEADER] = "true" | |
kwargs["extra_headers"] = extra_headers | |
return cast(LegacyAPIResponse[R], await func(*args, **kwargs)) | |
return wrapped | |
class HttpxBinaryResponseContent: | |
response: httpx.Response | |
def __init__(self, response: httpx.Response) -> None: | |
self.response = response | |
def content(self) -> bytes: | |
return self.response.content | |
def text(self) -> str: | |
return self.response.text | |
def encoding(self) -> str | None: | |
return self.response.encoding | |
def charset_encoding(self) -> str | None: | |
return self.response.charset_encoding | |
def json(self, **kwargs: Any) -> Any: | |
return self.response.json(**kwargs) | |
def read(self) -> bytes: | |
return self.response.read() | |
def iter_bytes(self, chunk_size: int | None = None) -> Iterator[bytes]: | |
return self.response.iter_bytes(chunk_size) | |
def iter_text(self, chunk_size: int | None = None) -> Iterator[str]: | |
return self.response.iter_text(chunk_size) | |
def iter_lines(self) -> Iterator[str]: | |
return self.response.iter_lines() | |
def iter_raw(self, chunk_size: int | None = None) -> Iterator[bytes]: | |
return self.response.iter_raw(chunk_size) | |
def write_to_file( | |
self, | |
file: str | os.PathLike[str], | |
) -> None: | |
"""Write the output to the given file. | |
Accepts a filename or any path-like object, e.g. pathlib.Path | |
Note: if you want to stream the data to the file instead of writing | |
all at once then you should use `.with_streaming_response` when making | |
the API request, e.g. `client.with_streaming_response.foo().stream_to_file('my_filename.txt')` | |
""" | |
with open(file, mode="wb") as f: | |
for data in self.response.iter_bytes(): | |
f.write(data) | |
def stream_to_file( | |
self, | |
file: str | os.PathLike[str], | |
*, | |
chunk_size: int | None = None, | |
) -> None: | |
with open(file, mode="wb") as f: | |
for data in self.response.iter_bytes(chunk_size): | |
f.write(data) | |
def close(self) -> None: | |
return self.response.close() | |
async def aread(self) -> bytes: | |
return await self.response.aread() | |
async def aiter_bytes(self, chunk_size: int | None = None) -> AsyncIterator[bytes]: | |
return self.response.aiter_bytes(chunk_size) | |
async def aiter_text(self, chunk_size: int | None = None) -> AsyncIterator[str]: | |
return self.response.aiter_text(chunk_size) | |
async def aiter_lines(self) -> AsyncIterator[str]: | |
return self.response.aiter_lines() | |
async def aiter_raw(self, chunk_size: int | None = None) -> AsyncIterator[bytes]: | |
return self.response.aiter_raw(chunk_size) | |
async def astream_to_file( | |
self, | |
file: str | os.PathLike[str], | |
*, | |
chunk_size: int | None = None, | |
) -> None: | |
path = anyio.Path(file) | |
async with await path.open(mode="wb") as f: | |
async for data in self.response.aiter_bytes(chunk_size): | |
await f.write(data) | |
async def aclose(self) -> None: | |
return await self.response.aclose() | |