Spaces:
Paused
Paused
| import asyncio | |
| import functools | |
| from datetime import datetime | |
| from typing import TYPE_CHECKING, Any, List, Optional, Union | |
| from litellm._logging import verbose_logger | |
| from litellm.types.utils import ( | |
| ModelResponse, | |
| ModelResponseStream, | |
| TextCompletionResponse, | |
| ) | |
| if TYPE_CHECKING: | |
| from litellm import ModelResponse as _ModelResponse | |
| from litellm.litellm_core_utils.litellm_logging import ( | |
| Logging as LiteLLMLoggingObject, | |
| ) | |
| LiteLLMModelResponse = _ModelResponse | |
| else: | |
| LiteLLMModelResponse = Any | |
| LiteLLMLoggingObject = Any | |
| import litellm | |
| """ | |
| Helper utils used for logging callbacks | |
| """ | |
| def convert_litellm_response_object_to_str( | |
| response_obj: Union[Any, LiteLLMModelResponse] | |
| ) -> Optional[str]: | |
| """ | |
| Get the string of the response object from LiteLLM | |
| """ | |
| if isinstance(response_obj, litellm.ModelResponse): | |
| response_str = "" | |
| for choice in response_obj.choices: | |
| if isinstance(choice, litellm.Choices): | |
| if choice.message.content and isinstance(choice.message.content, str): | |
| response_str += choice.message.content | |
| return response_str | |
| return None | |
| def _assemble_complete_response_from_streaming_chunks( | |
| result: Union[ModelResponse, TextCompletionResponse, ModelResponseStream], | |
| start_time: datetime, | |
| end_time: datetime, | |
| request_kwargs: dict, | |
| streaming_chunks: List[Any], | |
| is_async: bool, | |
| ): | |
| """ | |
| Assemble a complete response from a streaming chunks | |
| - assemble a complete streaming response if result.choices[0].finish_reason is not None | |
| - else append the chunk to the streaming_chunks | |
| Args: | |
| result: ModelResponse | |
| start_time: datetime | |
| end_time: datetime | |
| request_kwargs: dict | |
| streaming_chunks: List[Any] | |
| is_async: bool | |
| Returns: | |
| Optional[Union[ModelResponse, TextCompletionResponse]]: Complete streaming response | |
| """ | |
| complete_streaming_response: Optional[ | |
| Union[ModelResponse, TextCompletionResponse] | |
| ] = None | |
| if isinstance(result, ModelResponse): | |
| return result | |
| if result.choices[0].finish_reason is not None: # if it's the last chunk | |
| streaming_chunks.append(result) | |
| try: | |
| complete_streaming_response = litellm.stream_chunk_builder( | |
| chunks=streaming_chunks, | |
| messages=request_kwargs.get("messages", None), | |
| start_time=start_time, | |
| end_time=end_time, | |
| ) | |
| except Exception as e: | |
| log_message = ( | |
| "Error occurred building stream chunk in {} success logging: {}".format( | |
| "async" if is_async else "sync", str(e) | |
| ) | |
| ) | |
| verbose_logger.exception(log_message) | |
| complete_streaming_response = None | |
| else: | |
| streaming_chunks.append(result) | |
| return complete_streaming_response | |
| def _set_duration_in_model_call_details( | |
| logging_obj: Any, # we're not guaranteed this will be `LiteLLMLoggingObject` | |
| start_time: datetime, | |
| end_time: datetime, | |
| ): | |
| """Helper to set duration in model_call_details, with error handling""" | |
| try: | |
| duration_ms = (end_time - start_time).total_seconds() * 1000 | |
| if logging_obj and hasattr(logging_obj, "model_call_details"): | |
| logging_obj.model_call_details["llm_api_duration_ms"] = duration_ms | |
| else: | |
| verbose_logger.debug( | |
| "`logging_obj` not found - unable to track `llm_api_duration_ms" | |
| ) | |
| except Exception as e: | |
| verbose_logger.warning(f"Error setting `llm_api_duration_ms`: {str(e)}") | |
| def track_llm_api_timing(): | |
| """ | |
| Decorator to track LLM API call timing for both sync and async functions. | |
| The logging_obj is expected to be passed as an argument to the decorated function. | |
| """ | |
| def decorator(func): | |
| async def async_wrapper(*args, **kwargs): | |
| start_time = datetime.now() | |
| try: | |
| result = await func(*args, **kwargs) | |
| return result | |
| finally: | |
| end_time = datetime.now() | |
| _set_duration_in_model_call_details( | |
| logging_obj=kwargs.get("logging_obj", None), | |
| start_time=start_time, | |
| end_time=end_time, | |
| ) | |
| def sync_wrapper(*args, **kwargs): | |
| start_time = datetime.now() | |
| try: | |
| result = func(*args, **kwargs) | |
| return result | |
| finally: | |
| end_time = datetime.now() | |
| _set_duration_in_model_call_details( | |
| logging_obj=kwargs.get("logging_obj", None), | |
| start_time=start_time, | |
| end_time=end_time, | |
| ) | |
| # Check if the function is async or sync | |
| if asyncio.iscoroutinefunction(func): | |
| return async_wrapper | |
| return sync_wrapper | |
| return decorator | |