import time import logging from typing import List, Dict, Optional, Union from core.providers.base import LLMProvider from utils.config import config logger = logging.getLogger(__name__) try: from openai import OpenAI OPENAI_SDK_AVAILABLE = True except ImportError: OPENAI_SDK_AVAILABLE = False OpenAI = None class OpenAIProvider(LLMProvider): """OpenAI LLM provider implementation""" def __init__(self, model_name: str, timeout: int = 30, max_retries: int = 3): super().__init__(model_name, timeout, max_retries) if not OPENAI_SDK_AVAILABLE: raise ImportError("OpenAI provider requires 'openai' package") if not config.openai_api_key: raise ValueError("OPENAI_API_KEY not set - required for OpenAI provider") self.client = OpenAI(api_key=config.openai_api_key) def generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[str]: """Generate a response synchronously""" try: return self._retry_with_backoff(self._generate_impl, prompt, conversation_history) except Exception as e: logger.error(f"OpenAI generation failed: {e}") return None def stream_generate(self, prompt: str, conversation_history: List[Dict]) -> Optional[Union[str, List[str]]]: """Generate a response with streaming support""" try: return self._retry_with_backoff(self._stream_generate_impl, prompt, conversation_history) except Exception as e: logger.error(f"OpenAI stream generation failed: {e}") return None def validate_model(self) -> bool: """Validate if the model is available""" try: models = self.client.models.list() model_ids = [model.id for model in models.data] return self.model_name in model_ids except Exception as e: logger.warning(f"OpenAI model validation failed: {e}") return False def _generate_impl(self, prompt: str, conversation_history: List[Dict]) -> str: """Implementation of synchronous generation""" response = self.client.chat.completions.create( model=self.model_name, messages=conversation_history, max_tokens=500, temperature=0.7 ) return response.choices[0].message.content def _stream_generate_impl(self, prompt: str, conversation_history: List[Dict]) -> List[str]: """Implementation of streaming generation""" response = self.client.chat.completions.create( model=self.model_name, messages=conversation_history, max_tokens=500, temperature=0.7, stream=True ) chunks = [] for chunk in response: content = chunk.choices[0].delta.content if content: chunks.append(content) return chunks