Spaces:
Running
Running
| # DEPENDENCIES | |
| import sys | |
| import json | |
| import time | |
| import requests | |
| from enum import Enum | |
| from typing import Any | |
| from typing import Dict | |
| from typing import List | |
| from pathlib import Path | |
| from typing import Literal | |
| from typing import Optional | |
| from dataclasses import dataclass | |
| from config.settings import settings | |
| # Add parent directory to path for imports | |
| sys.path.append(str(Path(__file__).parent.parent)) | |
| from utils.logger import log_info | |
| from utils.logger import log_error | |
| from config.model_config import ModelConfig | |
| from utils.logger import ContractAnalyzerLogger | |
| # Optional imports for API providers | |
| try: | |
| import openai | |
| OPENAI_AVAILABLE = True | |
| except ImportError: | |
| OPENAI_AVAILABLE = False | |
| try: | |
| import anthropic | |
| ANTHROPIC_AVAILABLE = True | |
| except ImportError: | |
| ANTHROPIC_AVAILABLE = False | |
| class LLMProvider(Enum): | |
| """ | |
| Supported LLM providers | |
| """ | |
| OLLAMA = "ollama" | |
| OPENAI = "openai" | |
| ANTHROPIC = "anthropic" | |
| class LLMResponse: | |
| """ | |
| Standardized LLM response | |
| """ | |
| text : str | |
| provider : str | |
| model : str | |
| tokens_used : int | |
| latency_seconds : float | |
| success : bool | |
| error_message : Optional[str] = None | |
| raw_response : Optional[Dict[str, Any]] = None | |
| def to_dict(self) -> Dict[str, Any]: | |
| """ | |
| Convert to dictionary | |
| """ | |
| return {"text" : self.text, | |
| "provider" : self.provider, | |
| "model" : self.model, | |
| "tokens_used" : self.tokens_used, | |
| "latency_seconds" : round(self.latency_seconds, 3), | |
| "success" : self.success, | |
| "error_message" : self.error_message, | |
| } | |
| class LLMManager: | |
| """ | |
| Unified LLM manager for multiple providers : handles Ollama (local), OpenAI API, and Anthropic API | |
| """ | |
| def __init__(self, default_provider: LLMProvider = LLMProvider.OLLAMA, ollama_base_url: Optional[str] = None, | |
| openai_api_key: Optional[str] = None, anthropic_api_key: Optional[str] = None): | |
| """ | |
| Initialize LLM Manager | |
| Arguments: | |
| ---------- | |
| default_provider : Default LLM provider to use | |
| ollama_base_url : Ollama server URL (default: from settings) | |
| openai_api_key : OpenAI API key (or set OPENAI_API_KEY env var) | |
| anthropic_api_key : Anthropic API key (or set ANTHROPIC_API_KEY env var) | |
| """ | |
| self.default_provider = default_provider | |
| self.logger = ContractAnalyzerLogger.get_logger() | |
| # Configuration Variables Initialization | |
| self.config = ModelConfig() | |
| # Ollama configuration | |
| self.ollama_base_url = ollama_base_url or settings.OLLAMA_BASE_URL | |
| self.ollama_model = settings.OLLAMA_MODEL | |
| self.ollama_timeout = settings.OLLAMA_TIMEOUT | |
| self.ollama_temperature = settings.OLLAMA_TEMPERATURE | |
| # OpenAI configuration | |
| self.openai_api_key = openai_api_key or settings.OPENAI_API_KEY | |
| if (OPENAI_AVAILABLE and self.openai_api_key): | |
| openai.api_key = self.openai_api_key | |
| # Anthropic configuration | |
| self.anthropic_api_key = anthropic_api_key or settings.ANTHROPIC_API_KEY | |
| if (ANTHROPIC_AVAILABLE and self.anthropic_api_key): | |
| self.anthropic_client = anthropic.Anthropic(api_key = self.anthropic_api_key) | |
| else: | |
| self.anthropic_client = None | |
| # Rate limiting (simple token bucket) | |
| self._rate_limit_tokens = settings.RATE_LIMIT_REQUESTS | |
| self._rate_limit_last_refill = time.time() | |
| self._rate_limit_refill_rate = settings.RATE_LIMIT_REQUESTS / settings.RATE_LIMIT_PERIOD | |
| # Generation settings | |
| self.generation_config = self.config.LLM_GENERATION | |
| log_info("LLMManager initialized", | |
| default_provider = default_provider.value, | |
| ollama_base_url = self.ollama_base_url, | |
| ollama_model = self.ollama_model, | |
| ollama_timeout = self.ollama_timeout, | |
| ollama_temperature = self.ollama_temperature, | |
| openai_available = OPENAI_AVAILABLE and bool(self.openai_api_key), | |
| anthropic_available = ANTHROPIC_AVAILABLE and bool(self.anthropic_api_key), | |
| rate_limit_requests = settings.RATE_LIMIT_REQUESTS, | |
| rate_limit_period = settings.RATE_LIMIT_PERIOD, | |
| ) | |
| # Provider Availability Check | |
| def _check_ollama_available(self) -> bool: | |
| """ | |
| Check if Ollama server is available | |
| """ | |
| try: | |
| response = requests.get(f"{self.ollama_base_url}/api/tags", timeout = 30) | |
| available = (response.status_code == 200) | |
| if available: | |
| log_info("Ollama server is available", base_url = self.ollama_base_url) | |
| return available | |
| except Exception as e: | |
| log_error(e, context = {"component" : "LLMManager", "operation" : "check_ollama"}) | |
| return False | |
| def get_available_providers(self) -> List[LLMProvider]: | |
| """ | |
| Get list of available providers | |
| """ | |
| available = list() | |
| if self._check_ollama_available(): | |
| available.append(LLMProvider.OLLAMA) | |
| if OPENAI_AVAILABLE and self.openai_api_key: | |
| available.append(LLMProvider.OPENAI) | |
| if ANTHROPIC_AVAILABLE and self.anthropic_api_key: | |
| available.append(LLMProvider.ANTHROPIC) | |
| log_info("Available LLM providers", providers = [p.value for p in available]) | |
| return available | |
| # Rate Limiting | |
| def _check_rate_limit(self) -> bool: | |
| """ | |
| Check if rate limit allows request (simple token bucket) | |
| """ | |
| now = time.time() | |
| time_passed = now - self._rate_limit_last_refill | |
| # Refill tokens | |
| self._rate_limit_tokens = min(settings.RATE_LIMIT_REQUESTS, self._rate_limit_tokens + time_passed * self._rate_limit_refill_rate) | |
| self._rate_limit_last_refill = now | |
| if (self._rate_limit_tokens >= 1): | |
| self._rate_limit_tokens -= 1 | |
| return True | |
| log_info("Rate limit hit, waiting...", tokens_remaining = self._rate_limit_tokens) | |
| return False | |
| def _wait_for_rate_limit(self): | |
| """ | |
| Wait until rate limit allows request | |
| """ | |
| while not self._check_rate_limit(): | |
| time.sleep(0.5) | |
| # UNIFIED COMPLETION METHOD | |
| def complete(self, prompt: str, provider: Optional[LLMProvider] = None, model: Optional[str] = None, temperature: Optional[float] = None, | |
| max_tokens: Optional[int] = None, system_prompt: Optional[str] = None, json_mode: bool = False, retry_on_error: bool = True, | |
| fallback_providers: Optional[List[LLMProvider]] = None) -> LLMResponse: | |
| """ | |
| Unified completion method for all providers | |
| Arguments: | |
| ---------- | |
| prompt : User prompt | |
| provider : LLM provider (default: self.default_provider) | |
| model : Model name (provider-specific) | |
| temperature : Sampling temperature (0.0-1.0, default from settings/config) | |
| max_tokens : Maximum tokens to generate (default from config) | |
| system_prompt : System prompt (if supported) | |
| json_mode : Force JSON output (if supported) | |
| retry_on_error : Retry with fallback providers on error | |
| fallback_providers : List of fallback providers to try | |
| Returns: | |
| -------- | |
| { LLMResponse } : LLMResponse object | |
| """ | |
| provider = provider or self.default_provider | |
| temperature = temperature or self.ollama_temperature | |
| max_tokens = max_tokens or self.generation_config["max_tokens"] | |
| log_info("LLM completion request", | |
| provider = provider.value, | |
| prompt_length = len(prompt), | |
| temperature = temperature, | |
| max_tokens = max_tokens, | |
| json_mode = json_mode, | |
| ) | |
| # Rate limiting | |
| self._wait_for_rate_limit() | |
| # Try primary provider | |
| try: | |
| if (provider == LLMProvider.OLLAMA): | |
| return self._complete_ollama(prompt = prompt, | |
| model = model, | |
| temperature = temperature, | |
| max_tokens = max_tokens, | |
| system_prompt = system_prompt, | |
| json_mode = json_mode, | |
| ) | |
| elif (provider == LLMProvider.OPENAI): | |
| return self._complete_openai(prompt = prompt, | |
| model = model, | |
| temperature = temperature, | |
| max_tokens = max_tokens, | |
| system_prompt = system_prompt, | |
| json_mode = json_mode, | |
| ) | |
| elif (provider == LLMProvider.ANTHROPIC): | |
| return self._complete_anthropic(prompt = prompt, | |
| model = model, | |
| temperature = temperature, | |
| max_tokens = max_tokens, | |
| system_prompt = system_prompt, | |
| ) | |
| else: | |
| raise ValueError(f"Unsupported provider: {provider}") | |
| except Exception as e: | |
| log_error(e, context = {"component" : "LLMManager", "operation" : "complete", "provider" : provider.value}) | |
| # Try fallback providers | |
| if (retry_on_error and fallback_providers): | |
| log_info("Trying fallback providers", fallbacks = [p.value for p in fallback_providers]) | |
| for fallback_provider in fallback_providers: | |
| if (fallback_provider == provider): | |
| continue | |
| try: | |
| log_info(f"Attempting fallback to {fallback_provider.value}") | |
| # Prevent infinite recursion | |
| return self.complete(prompt = prompt, | |
| provider = fallback_provider, | |
| model = model, | |
| temperature = temperature, | |
| max_tokens = max_tokens, | |
| system_prompt = system_prompt, | |
| json_mode = json_mode, | |
| retry_on_error = False, | |
| ) | |
| except Exception as fallback_error: | |
| log_error(fallback_error, context = {"component" : "LLMManager", "operation" : "fallback_complete", "provider" : fallback_provider.value}) | |
| continue | |
| # All attempts failed | |
| return LLMResponse(text = "", | |
| provider = provider.value, | |
| model = model or "unknown", | |
| tokens_used = 0, | |
| latency_seconds = 0.0, | |
| success = False, | |
| error_message = str(e), | |
| ) | |
| # OLLAMA Provider | |
| def _complete_ollama(self, prompt: str, model: Optional[str], temperature: float, max_tokens: int, system_prompt: Optional[str], json_mode: bool) -> LLMResponse: | |
| """ | |
| Complete using local Ollama | |
| """ | |
| start_time = time.time() | |
| model = model or self.ollama_model | |
| # Construct full prompt with system prompt | |
| full_prompt = prompt | |
| if system_prompt: | |
| full_prompt = f"<|system|>\n{system_prompt}\n<|user|>\n{prompt}\n<|assistant|>" | |
| payload = {"model" : model, | |
| "prompt" : full_prompt, | |
| "stream" : False, | |
| "options" : {"temperature": temperature, "num_predict": max_tokens}, | |
| } | |
| if json_mode: | |
| payload["format"] = "json" | |
| log_info("Calling Ollama API", | |
| model = model, | |
| base_url = self.ollama_base_url, | |
| json_mode = json_mode, | |
| ) | |
| response = requests.post(f"{self.ollama_base_url}/api/generate", json = payload, timeout = self.ollama_timeout) | |
| response.raise_for_status() | |
| result = response.json() | |
| generated_text = result.get('response', '') | |
| latency = time.time() - start_time | |
| # Estimate tokens (rough approximation) | |
| tokens_used = len(prompt.split()) + len(generated_text.split()) | |
| log_info("Ollama completion successful", | |
| model = model, | |
| tokens_used = tokens_used, | |
| latency_seconds = round(latency, 3), | |
| ) | |
| return LLMResponse(text = generated_text, | |
| provider = "ollama", | |
| model = model, | |
| tokens_used = tokens_used, | |
| latency_seconds = latency, | |
| success = True, | |
| raw_response = result, | |
| ) | |
| # Open-AI Provider | |
| def _complete_openai(self, prompt: str, model: Optional[str], temperature: float, max_tokens: int, system_prompt: Optional[str], json_mode: bool) -> LLMResponse: | |
| """ | |
| Complete using OpenAI API | |
| """ | |
| if not OPENAI_AVAILABLE or not self.openai_api_key: | |
| raise ValueError("OpenAI not available. Install with: pip install openai") | |
| start_time = time.time() | |
| model = model or "gpt-3.5-turbo" | |
| # Construct messages | |
| messages = list() | |
| if system_prompt: | |
| messages.append({"role" : "system", | |
| "content" : system_prompt, | |
| }) | |
| messages.append({"role" : "user", | |
| "content" : prompt, | |
| }) | |
| log_info("Calling OpenAI API", model = model, json_mode = json_mode) | |
| # API call parameters | |
| api_params = {"model" : model, | |
| "messages" : messages, | |
| "temperature" : temperature, | |
| "max_tokens" : max_tokens, | |
| } | |
| if json_mode: | |
| api_params["response_format"] = {"type": "json_object"} | |
| response = openai.ChatCompletion.create(**api_params) | |
| generated_text = response.choices[0].message.content | |
| tokens_used = response.usage.total_tokens | |
| latency = time.time() - start_time | |
| log_info("OpenAI completion successful", model = model, tokens_used = tokens_used, latency_seconds = round(latency, 3)) | |
| return LLMResponse(text = generated_text, | |
| provider = "openai", | |
| model = model, | |
| tokens_used = tokens_used, | |
| latency_seconds = latency, | |
| success = True, | |
| raw_response = response.to_dict(), | |
| ) | |
| # Anthropic Provider | |
| def _complete_anthropic(self, prompt: str, model: Optional[str], temperature: float, max_tokens: int, system_prompt: Optional[str]) -> LLMResponse: | |
| """ | |
| Complete using Anthropic (Claude) API | |
| """ | |
| if not ANTHROPIC_AVAILABLE or not self.anthropic_client: | |
| raise ValueError("Anthropic not available. Install with: pip install anthropic") | |
| start_time = time.time() | |
| model = model or "claude-3-sonnet-20240229" | |
| log_info("Calling Anthropic API", model = model) | |
| # API call | |
| message = self.anthropic_client.messages.create(model = model, | |
| max_tokens = max_tokens, | |
| temperature = temperature, | |
| system = system_prompt or "", | |
| messages = [{"role": "user", "content": prompt}], | |
| ) | |
| generated_text = message.content[0].text | |
| tokens_used = message.usage.input_tokens + message.usage.output_tokens | |
| latency = time.time() - start_time | |
| log_info("Anthropic completion successful", model = model, tokens_used = tokens_used, latency_seconds = round(latency, 3)) | |
| return LLMResponse(text = generated_text, | |
| provider = "anthropic", | |
| model = model, | |
| tokens_used = tokens_used, | |
| latency_seconds = latency, | |
| success = True, | |
| raw_response = message.dict(), | |
| ) | |
| # Specialized Methods | |
| def generate_structured_json(self, prompt: str, schema_description: str, provider: Optional[LLMProvider] = None, **kwargs) -> Dict[str, Any]: | |
| """ | |
| Generate structured JSON output | |
| Arguments: | |
| ---------- | |
| prompt : User prompt | |
| schema_description : Description of expected JSON schema | |
| provider : LLM provider | |
| **kwargs : Additional arguments for complete() | |
| Returns: | |
| -------- | |
| { dict } : Parsed JSON dictionary | |
| """ | |
| system_prompt = (f"You are a helpful assistant that returns valid JSON.\n" | |
| f"Expected schema:\n{schema_description}\n\n" | |
| f"Return ONLY valid JSON, no markdown, no explanation." | |
| ) | |
| response = self.complete(prompt = prompt, | |
| provider = provider, | |
| system_prompt = system_prompt, | |
| json_mode = True, | |
| **kwargs, | |
| ) | |
| if not response.success: | |
| raise ValueError(f"LLM completion failed: {response.error_message}") | |
| # Parse JSON | |
| try: | |
| # Clean response (remove markdown code blocks if present) | |
| text = response.text.strip() | |
| text = text.replace("```json", "").replace("```", "").strip() | |
| parsed = json.loads(text) | |
| log_info("JSON parsing successful", keys = list(parsed.keys())) | |
| return parsed | |
| except json.JSONDecodeError as e: | |
| log_error(e, context = {"component" : "LLMManager", "operation" : "parse_json", "response_text" : response.text}) | |
| raise ValueError(f"Failed to parse JSON response: {e}") | |
| def batch_complete(self, prompts: List[str], provider: Optional[LLMProvider] = None, **kwargs) -> List[LLMResponse]: | |
| """ | |
| Complete multiple prompts (sequential for now) | |
| Arguments: | |
| ---------- | |
| prompts : List of prompts | |
| provider : LLM provider | |
| **kwargs : Additional arguments for complete() | |
| Returns: | |
| -------- | |
| { list } : List of LLMResponse objects | |
| """ | |
| log_info("Batch completion started", batch_size=len(prompts)) | |
| responses = list() | |
| for i, prompt in enumerate(prompts): | |
| log_info(f"Processing prompt {i+1}/{len(prompts)}") | |
| response = self.complete(prompt = prompt, | |
| provider = provider, | |
| **kwargs, | |
| ) | |
| responses.append(response) | |
| successful = sum(1 for r in responses if r.success) | |
| log_info("Batch completion finished", | |
| total = len(prompts), | |
| successful = successful, | |
| failed = len(prompts) - successful, | |
| ) | |
| return responses | |
| # OLLAMA-Specific Methods | |
| def list_ollama_models(self) -> List[str]: | |
| """ | |
| List available local Ollama models | |
| """ | |
| try: | |
| response = requests.get(f"{self.ollama_base_url}/api/tags", timeout = 30) | |
| response.raise_for_status() | |
| models = [model['name'] for model in response.json().get('models', [])] | |
| log_info("Ollama models listed", count = len(models), models = models) | |
| return models | |
| except Exception as e: | |
| log_error(e, context = {"component" : "LLMManager", "operation" : "list_ollama_models"}) | |
| return [] | |
| def pull_ollama_model(self, model_name: str) -> bool: | |
| """ | |
| Pull/download an Ollama model | |
| """ | |
| try: | |
| log_info(f"Pulling Ollama model: {model_name}") | |
| response = requests.post(f"{self.ollama_base_url}/api/pull", | |
| json = {"name": model_name}, | |
| stream = True, | |
| timeout = 600, # 10 minutes for download | |
| ) | |
| response.raise_for_status() | |
| # Stream response to track progress | |
| for line in response.iter_lines(): | |
| if line: | |
| data = json.loads(line) | |
| if ('status' in data): | |
| log_info(f"Pull status: {data['status']}") | |
| log_info(f"Model pulled successfully: {model_name}") | |
| return True | |
| except Exception as e: | |
| log_error(e, context = {"component" : "LLMManager", "operation" : "pull_ollama_model", "model" : model_name}) | |
| return False | |
| # Utility Methods | |
| def get_provider_info(self, provider: LLMProvider) -> Dict[str, Any]: | |
| """ | |
| Get information about a provider | |
| """ | |
| info = {"provider" : provider.value, | |
| "available" : False, | |
| "models" : [], | |
| } | |
| if (provider == LLMProvider.OLLAMA): | |
| info["available"] = self._check_ollama_available() | |
| if info["available"]: | |
| info["models"] = self.list_ollama_models() | |
| info["base_url"] = self.ollama_base_url | |
| elif (provider == LLMProvider.OPENAI): | |
| info["available"] = OPENAI_AVAILABLE and bool(self.openai_api_key) | |
| if info["available"]: | |
| info["models"] = ["gpt-3.5-turbo", | |
| "gpt-4", | |
| "gpt-4-turbo-preview", | |
| ] | |
| elif (provider == LLMProvider.ANTHROPIC): | |
| info["available"] = ANTHROPIC_AVAILABLE and bool(self.anthropic_client) | |
| if info["available"]: | |
| info["models"] = ["claude-3-opus-20240229", | |
| "claude-3-sonnet-20240229", | |
| "claude-3-haiku-20240307", | |
| ] | |
| return info | |
| def estimate_cost(self, prompt_tokens: int, completion_tokens: int, provider: LLMProvider, model: str) -> float: | |
| """ | |
| Estimate API cost in USD | |
| Arguments: | |
| ---------- | |
| prompt_tokens : Number of prompt tokens | |
| completion_tokens : Number of completion tokens | |
| provider : LLM provider | |
| model : Model name | |
| Returns: | |
| -------- | |
| { float } : Estimated cost in USD | |
| """ | |
| # Pricing per 1K tokens (as of 2025) | |
| pricing = {"openai" : {"gpt-3.5-turbo" : {"prompt": 0.0015, "completion": 0.002}, | |
| "gpt-4" : {"prompt": 0.03, "completion": 0.06}, | |
| "gpt-4-turbo-preview" : {"prompt": 0.01, "completion": 0.03}, | |
| }, | |
| "anthropic" : {"claude-3-opus-20240229" : {"prompt": 0.015, "completion": 0.075}, | |
| "claude-3-sonnet-20240229" : {"prompt": 0.003, "completion": 0.015}, | |
| "claude-3-haiku-20240307" : {"prompt": 0.00025, "completion": 0.00125}, | |
| } | |
| } | |
| if (provider == LLMProvider.OLLAMA): | |
| # Local models are free | |
| return 0.0 | |
| provider_pricing = pricing.get(provider.value, {}).get(model) | |
| if not provider_pricing: | |
| return 0.0 | |
| cost = ((prompt_tokens / 1000) * provider_pricing["prompt"] + (completion_tokens / 1000) * provider_pricing["completion"]) | |
| return round(cost, 6) | |