# DEPENDENCIES import sys import json import time import requests from enum import Enum from typing import Any from typing import Dict from typing import List from pathlib import Path from typing import Literal from typing import Optional from dataclasses import dataclass from config.settings import settings # Add parent directory to path for imports sys.path.append(str(Path(__file__).parent.parent)) from utils.logger import log_info from utils.logger import log_error from config.model_config import ModelConfig from utils.logger import ContractAnalyzerLogger # Optional imports for API providers try: import openai OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False try: import anthropic ANTHROPIC_AVAILABLE = True except ImportError: ANTHROPIC_AVAILABLE = False class LLMProvider(Enum): """ Supported LLM providers """ OLLAMA = "ollama" OPENAI = "openai" ANTHROPIC = "anthropic" @dataclass class LLMResponse: """ Standardized LLM response """ text : str provider : str model : str tokens_used : int latency_seconds : float success : bool error_message : Optional[str] = None raw_response : Optional[Dict[str, Any]] = None def to_dict(self) -> Dict[str, Any]: """ Convert to dictionary """ return {"text" : self.text, "provider" : self.provider, "model" : self.model, "tokens_used" : self.tokens_used, "latency_seconds" : round(self.latency_seconds, 3), "success" : self.success, "error_message" : self.error_message, } class LLMManager: """ Unified LLM manager for multiple providers : handles Ollama (local), OpenAI API, and Anthropic API """ def __init__(self, default_provider: LLMProvider = LLMProvider.OLLAMA, ollama_base_url: Optional[str] = None, openai_api_key: Optional[str] = None, anthropic_api_key: Optional[str] = None): """ Initialize LLM Manager Arguments: ---------- default_provider : Default LLM provider to use ollama_base_url : Ollama server URL (default: from settings) openai_api_key : OpenAI API key (or set OPENAI_API_KEY env var) anthropic_api_key : Anthropic API key (or set ANTHROPIC_API_KEY env var) """ self.default_provider = default_provider self.logger = ContractAnalyzerLogger.get_logger() # Configuration Variables Initialization self.config = ModelConfig() # Ollama configuration self.ollama_base_url = ollama_base_url or settings.OLLAMA_BASE_URL self.ollama_model = settings.OLLAMA_MODEL self.ollama_timeout = settings.OLLAMA_TIMEOUT self.ollama_temperature = settings.OLLAMA_TEMPERATURE # OpenAI configuration self.openai_api_key = openai_api_key or settings.OPENAI_API_KEY if (OPENAI_AVAILABLE and self.openai_api_key): openai.api_key = self.openai_api_key # Anthropic configuration self.anthropic_api_key = anthropic_api_key or settings.ANTHROPIC_API_KEY if (ANTHROPIC_AVAILABLE and self.anthropic_api_key): self.anthropic_client = anthropic.Anthropic(api_key = self.anthropic_api_key) else: self.anthropic_client = None # Rate limiting (simple token bucket) self._rate_limit_tokens = settings.RATE_LIMIT_REQUESTS self._rate_limit_last_refill = time.time() self._rate_limit_refill_rate = settings.RATE_LIMIT_REQUESTS / settings.RATE_LIMIT_PERIOD # Generation settings self.generation_config = self.config.LLM_GENERATION log_info("LLMManager initialized", default_provider = default_provider.value, ollama_base_url = self.ollama_base_url, ollama_model = self.ollama_model, ollama_timeout = self.ollama_timeout, ollama_temperature = self.ollama_temperature, openai_available = OPENAI_AVAILABLE and bool(self.openai_api_key), anthropic_available = ANTHROPIC_AVAILABLE and bool(self.anthropic_api_key), rate_limit_requests = settings.RATE_LIMIT_REQUESTS, rate_limit_period = settings.RATE_LIMIT_PERIOD, ) # Provider Availability Check def _check_ollama_available(self) -> bool: """ Check if Ollama server is available """ try: response = requests.get(f"{self.ollama_base_url}/api/tags", timeout = 30) available = (response.status_code == 200) if available: log_info("Ollama server is available", base_url = self.ollama_base_url) return available except Exception as e: log_error(e, context = {"component" : "LLMManager", "operation" : "check_ollama"}) return False def get_available_providers(self) -> List[LLMProvider]: """ Get list of available providers """ available = list() if self._check_ollama_available(): available.append(LLMProvider.OLLAMA) if OPENAI_AVAILABLE and self.openai_api_key: available.append(LLMProvider.OPENAI) if ANTHROPIC_AVAILABLE and self.anthropic_api_key: available.append(LLMProvider.ANTHROPIC) log_info("Available LLM providers", providers = [p.value for p in available]) return available # Rate Limiting def _check_rate_limit(self) -> bool: """ Check if rate limit allows request (simple token bucket) """ now = time.time() time_passed = now - self._rate_limit_last_refill # Refill tokens self._rate_limit_tokens = min(settings.RATE_LIMIT_REQUESTS, self._rate_limit_tokens + time_passed * self._rate_limit_refill_rate) self._rate_limit_last_refill = now if (self._rate_limit_tokens >= 1): self._rate_limit_tokens -= 1 return True log_info("Rate limit hit, waiting...", tokens_remaining = self._rate_limit_tokens) return False def _wait_for_rate_limit(self): """ Wait until rate limit allows request """ while not self._check_rate_limit(): time.sleep(0.5) # UNIFIED COMPLETION METHOD @ContractAnalyzerLogger.log_execution_time("llm_complete") def complete(self, prompt: str, provider: Optional[LLMProvider] = None, model: Optional[str] = None, temperature: Optional[float] = None, max_tokens: Optional[int] = None, system_prompt: Optional[str] = None, json_mode: bool = False, retry_on_error: bool = True, fallback_providers: Optional[List[LLMProvider]] = None) -> LLMResponse: """ Unified completion method for all providers Arguments: ---------- prompt : User prompt provider : LLM provider (default: self.default_provider) model : Model name (provider-specific) temperature : Sampling temperature (0.0-1.0, default from settings/config) max_tokens : Maximum tokens to generate (default from config) system_prompt : System prompt (if supported) json_mode : Force JSON output (if supported) retry_on_error : Retry with fallback providers on error fallback_providers : List of fallback providers to try Returns: -------- { LLMResponse } : LLMResponse object """ provider = provider or self.default_provider temperature = temperature or self.ollama_temperature max_tokens = max_tokens or self.generation_config["max_tokens"] log_info("LLM completion request", provider = provider.value, prompt_length = len(prompt), temperature = temperature, max_tokens = max_tokens, json_mode = json_mode, ) # Rate limiting self._wait_for_rate_limit() # Try primary provider try: if (provider == LLMProvider.OLLAMA): return self._complete_ollama(prompt = prompt, model = model, temperature = temperature, max_tokens = max_tokens, system_prompt = system_prompt, json_mode = json_mode, ) elif (provider == LLMProvider.OPENAI): return self._complete_openai(prompt = prompt, model = model, temperature = temperature, max_tokens = max_tokens, system_prompt = system_prompt, json_mode = json_mode, ) elif (provider == LLMProvider.ANTHROPIC): return self._complete_anthropic(prompt = prompt, model = model, temperature = temperature, max_tokens = max_tokens, system_prompt = system_prompt, ) else: raise ValueError(f"Unsupported provider: {provider}") except Exception as e: log_error(e, context = {"component" : "LLMManager", "operation" : "complete", "provider" : provider.value}) # Try fallback providers if (retry_on_error and fallback_providers): log_info("Trying fallback providers", fallbacks = [p.value for p in fallback_providers]) for fallback_provider in fallback_providers: if (fallback_provider == provider): continue try: log_info(f"Attempting fallback to {fallback_provider.value}") # Prevent infinite recursion return self.complete(prompt = prompt, provider = fallback_provider, model = model, temperature = temperature, max_tokens = max_tokens, system_prompt = system_prompt, json_mode = json_mode, retry_on_error = False, ) except Exception as fallback_error: log_error(fallback_error, context = {"component" : "LLMManager", "operation" : "fallback_complete", "provider" : fallback_provider.value}) continue # All attempts failed return LLMResponse(text = "", provider = provider.value, model = model or "unknown", tokens_used = 0, latency_seconds = 0.0, success = False, error_message = str(e), ) # OLLAMA Provider def _complete_ollama(self, prompt: str, model: Optional[str], temperature: float, max_tokens: int, system_prompt: Optional[str], json_mode: bool) -> LLMResponse: """ Complete using local Ollama """ start_time = time.time() model = model or self.ollama_model # Construct full prompt with system prompt full_prompt = prompt if system_prompt: full_prompt = f"<|system|>\n{system_prompt}\n<|user|>\n{prompt}\n<|assistant|>" payload = {"model" : model, "prompt" : full_prompt, "stream" : False, "options" : {"temperature": temperature, "num_predict": max_tokens}, } if json_mode: payload["format"] = "json" log_info("Calling Ollama API", model = model, base_url = self.ollama_base_url, json_mode = json_mode, ) response = requests.post(f"{self.ollama_base_url}/api/generate", json = payload, timeout = self.ollama_timeout) response.raise_for_status() result = response.json() generated_text = result.get('response', '') latency = time.time() - start_time # Estimate tokens (rough approximation) tokens_used = len(prompt.split()) + len(generated_text.split()) log_info("Ollama completion successful", model = model, tokens_used = tokens_used, latency_seconds = round(latency, 3), ) return LLMResponse(text = generated_text, provider = "ollama", model = model, tokens_used = tokens_used, latency_seconds = latency, success = True, raw_response = result, ) # Open-AI Provider def _complete_openai(self, prompt: str, model: Optional[str], temperature: float, max_tokens: int, system_prompt: Optional[str], json_mode: bool) -> LLMResponse: """ Complete using OpenAI API """ if not OPENAI_AVAILABLE or not self.openai_api_key: raise ValueError("OpenAI not available. Install with: pip install openai") start_time = time.time() model = model or "gpt-3.5-turbo" # Construct messages messages = list() if system_prompt: messages.append({"role" : "system", "content" : system_prompt, }) messages.append({"role" : "user", "content" : prompt, }) log_info("Calling OpenAI API", model = model, json_mode = json_mode) # API call parameters api_params = {"model" : model, "messages" : messages, "temperature" : temperature, "max_tokens" : max_tokens, } if json_mode: api_params["response_format"] = {"type": "json_object"} response = openai.ChatCompletion.create(**api_params) generated_text = response.choices[0].message.content tokens_used = response.usage.total_tokens latency = time.time() - start_time log_info("OpenAI completion successful", model = model, tokens_used = tokens_used, latency_seconds = round(latency, 3)) return LLMResponse(text = generated_text, provider = "openai", model = model, tokens_used = tokens_used, latency_seconds = latency, success = True, raw_response = response.to_dict(), ) # Anthropic Provider def _complete_anthropic(self, prompt: str, model: Optional[str], temperature: float, max_tokens: int, system_prompt: Optional[str]) -> LLMResponse: """ Complete using Anthropic (Claude) API """ if not ANTHROPIC_AVAILABLE or not self.anthropic_client: raise ValueError("Anthropic not available. Install with: pip install anthropic") start_time = time.time() model = model or "claude-3-sonnet-20240229" log_info("Calling Anthropic API", model = model) # API call message = self.anthropic_client.messages.create(model = model, max_tokens = max_tokens, temperature = temperature, system = system_prompt or "", messages = [{"role": "user", "content": prompt}], ) generated_text = message.content[0].text tokens_used = message.usage.input_tokens + message.usage.output_tokens latency = time.time() - start_time log_info("Anthropic completion successful", model = model, tokens_used = tokens_used, latency_seconds = round(latency, 3)) return LLMResponse(text = generated_text, provider = "anthropic", model = model, tokens_used = tokens_used, latency_seconds = latency, success = True, raw_response = message.dict(), ) # Specialized Methods def generate_structured_json(self, prompt: str, schema_description: str, provider: Optional[LLMProvider] = None, **kwargs) -> Dict[str, Any]: """ Generate structured JSON output Arguments: ---------- prompt : User prompt schema_description : Description of expected JSON schema provider : LLM provider **kwargs : Additional arguments for complete() Returns: -------- { dict } : Parsed JSON dictionary """ system_prompt = (f"You are a helpful assistant that returns valid JSON.\n" f"Expected schema:\n{schema_description}\n\n" f"Return ONLY valid JSON, no markdown, no explanation." ) response = self.complete(prompt = prompt, provider = provider, system_prompt = system_prompt, json_mode = True, **kwargs, ) if not response.success: raise ValueError(f"LLM completion failed: {response.error_message}") # Parse JSON try: # Clean response (remove markdown code blocks if present) text = response.text.strip() text = text.replace("```json", "").replace("```", "").strip() parsed = json.loads(text) log_info("JSON parsing successful", keys = list(parsed.keys())) return parsed except json.JSONDecodeError as e: log_error(e, context = {"component" : "LLMManager", "operation" : "parse_json", "response_text" : response.text}) raise ValueError(f"Failed to parse JSON response: {e}") def batch_complete(self, prompts: List[str], provider: Optional[LLMProvider] = None, **kwargs) -> List[LLMResponse]: """ Complete multiple prompts (sequential for now) Arguments: ---------- prompts : List of prompts provider : LLM provider **kwargs : Additional arguments for complete() Returns: -------- { list } : List of LLMResponse objects """ log_info("Batch completion started", batch_size=len(prompts)) responses = list() for i, prompt in enumerate(prompts): log_info(f"Processing prompt {i+1}/{len(prompts)}") response = self.complete(prompt = prompt, provider = provider, **kwargs, ) responses.append(response) successful = sum(1 for r in responses if r.success) log_info("Batch completion finished", total = len(prompts), successful = successful, failed = len(prompts) - successful, ) return responses # OLLAMA-Specific Methods def list_ollama_models(self) -> List[str]: """ List available local Ollama models """ try: response = requests.get(f"{self.ollama_base_url}/api/tags", timeout = 30) response.raise_for_status() models = [model['name'] for model in response.json().get('models', [])] log_info("Ollama models listed", count = len(models), models = models) return models except Exception as e: log_error(e, context = {"component" : "LLMManager", "operation" : "list_ollama_models"}) return [] def pull_ollama_model(self, model_name: str) -> bool: """ Pull/download an Ollama model """ try: log_info(f"Pulling Ollama model: {model_name}") response = requests.post(f"{self.ollama_base_url}/api/pull", json = {"name": model_name}, stream = True, timeout = 600, # 10 minutes for download ) response.raise_for_status() # Stream response to track progress for line in response.iter_lines(): if line: data = json.loads(line) if ('status' in data): log_info(f"Pull status: {data['status']}") log_info(f"Model pulled successfully: {model_name}") return True except Exception as e: log_error(e, context = {"component" : "LLMManager", "operation" : "pull_ollama_model", "model" : model_name}) return False # Utility Methods def get_provider_info(self, provider: LLMProvider) -> Dict[str, Any]: """ Get information about a provider """ info = {"provider" : provider.value, "available" : False, "models" : [], } if (provider == LLMProvider.OLLAMA): info["available"] = self._check_ollama_available() if info["available"]: info["models"] = self.list_ollama_models() info["base_url"] = self.ollama_base_url elif (provider == LLMProvider.OPENAI): info["available"] = OPENAI_AVAILABLE and bool(self.openai_api_key) if info["available"]: info["models"] = ["gpt-3.5-turbo", "gpt-4", "gpt-4-turbo-preview", ] elif (provider == LLMProvider.ANTHROPIC): info["available"] = ANTHROPIC_AVAILABLE and bool(self.anthropic_client) if info["available"]: info["models"] = ["claude-3-opus-20240229", "claude-3-sonnet-20240229", "claude-3-haiku-20240307", ] return info def estimate_cost(self, prompt_tokens: int, completion_tokens: int, provider: LLMProvider, model: str) -> float: """ Estimate API cost in USD Arguments: ---------- prompt_tokens : Number of prompt tokens completion_tokens : Number of completion tokens provider : LLM provider model : Model name Returns: -------- { float } : Estimated cost in USD """ # Pricing per 1K tokens (as of 2025) pricing = {"openai" : {"gpt-3.5-turbo" : {"prompt": 0.0015, "completion": 0.002}, "gpt-4" : {"prompt": 0.03, "completion": 0.06}, "gpt-4-turbo-preview" : {"prompt": 0.01, "completion": 0.03}, }, "anthropic" : {"claude-3-opus-20240229" : {"prompt": 0.015, "completion": 0.075}, "claude-3-sonnet-20240229" : {"prompt": 0.003, "completion": 0.015}, "claude-3-haiku-20240307" : {"prompt": 0.00025, "completion": 0.00125}, } } if (provider == LLMProvider.OLLAMA): # Local models are free return 0.0 provider_pricing = pricing.get(provider.value, {}).get(model) if not provider_pricing: return 0.0 cost = ((prompt_tokens / 1000) * provider_pricing["prompt"] + (completion_tokens / 1000) * provider_pricing["completion"]) return round(cost, 6)