Spaces:
Sleeping
Sleeping
import logging | |
from dataclasses import dataclass | |
from typing import Dict, Optional | |
from datetime import datetime | |
logger = logging.getLogger(__name__) | |
class ModelPricing: | |
"""Pricing information for Azure OpenAI models.""" | |
model_name: str | |
input_cost_per_1k_tokens: float # Cost per 1000 input tokens | |
output_cost_per_1k_tokens: float # Cost per 1000 output tokens | |
description: str | |
class TokenUsage: | |
"""Token usage statistics for a single API call.""" | |
prompt_tokens: int | |
completion_tokens: int | |
total_tokens: int | |
model: str | |
timestamp: datetime | |
class CostAnalysis: | |
"""Cost analysis for document processing.""" | |
total_input_tokens: int | |
total_output_tokens: int | |
total_cost: float | |
model_breakdown: Dict[str, Dict[str, float]] # {model: {"input_cost": x, "output_cost": y, "total_cost": z}} | |
processing_time: float | |
timestamp: datetime | |
class CostTracker: | |
"""Tracks token usage and calculates costs for Azure OpenAI API calls.""" | |
# Hardcoded pricing for Azure OpenAI models (current as of 2024) | |
# Source: https://azure.microsoft.com/en-us/pricing/details/cognitive-services/openai-service/ | |
MODEL_PRICING = { | |
# Standard model names | |
"gpt-4o-mini": ModelPricing( | |
model_name="gpt-4o-mini", | |
input_cost_per_1k_tokens=0.00015, # $0.00015 per 1K input tokens | |
output_cost_per_1k_tokens=0.0006, # $0.0006 per 1K output tokens | |
description="GPT-4o Mini (O3 Mini)" | |
), | |
"gpt-4o": ModelPricing( | |
model_name="gpt-4o", | |
input_cost_per_1k_tokens=0.0025, # $0.0025 per 1K input tokens | |
output_cost_per_1k_tokens=0.01, # $0.01 per 1K output tokens | |
description="GPT-4o (O4)" | |
), | |
"gpt-35-turbo": ModelPricing( | |
model_name="gpt-35-turbo", | |
input_cost_per_1k_tokens=0.0005, # $0.0005 per 1K input tokens | |
output_cost_per_1k_tokens=0.0015, # $0.0015 per 1K output tokens | |
description="GPT-3.5 Turbo (O3)" | |
), | |
# Azure deployment names (custom names set in Azure) | |
"o3-mini": ModelPricing( | |
model_name="o3-mini", | |
input_cost_per_1k_tokens=0.00015, # $0.00015 per 1K input tokens | |
output_cost_per_1k_tokens=0.0006, # $0.0006 per 1K output tokens | |
description="O3 Mini (GPT-4o Mini)" | |
), | |
"o4-mini": ModelPricing( | |
model_name="o4-mini", | |
input_cost_per_1k_tokens=0.00015, # $0.00015 per 1K input tokens | |
output_cost_per_1k_tokens=0.0006, # $0.0006 per 1K output tokens | |
description="O4 Mini (GPT-4o Mini)" | |
), | |
"o3": ModelPricing( | |
model_name="o3", | |
input_cost_per_1k_tokens=0.0005, # $0.0005 per 1K input tokens | |
output_cost_per_1k_tokens=0.0015, # $0.0015 per 1K output tokens | |
description="O3 (GPT-3.5 Turbo)" | |
), | |
"o4": ModelPricing( | |
model_name="o4", | |
input_cost_per_1k_tokens=0.0025, # $0.0025 per 1K input tokens | |
output_cost_per_1k_tokens=0.01, # $0.01 per 1K output tokens | |
description="O4 (GPT-4o)" | |
), | |
# Alternative model names that might be used in Azure deployments | |
"gpt-4o-mini-2024-07-18": ModelPricing( | |
model_name="gpt-4o-mini-2024-07-18", | |
input_cost_per_1k_tokens=0.00015, # $0.00015 per 1K input tokens | |
output_cost_per_1k_tokens=0.0006, # $0.0006 per 1K output tokens | |
description="GPT-4o Mini (O3 Mini) - Latest" | |
), | |
"gpt-4o-2024-05-13": ModelPricing( | |
model_name="gpt-4o-2024-05-13", | |
input_cost_per_1k_tokens=0.0025, # $0.0025 per 1K input tokens | |
output_cost_per_1k_tokens=0.01, # $0.01 per 1K output tokens | |
description="GPT-4o (O4) - Latest" | |
), | |
"gpt-35-turbo-0125": ModelPricing( | |
model_name="gpt-35-turbo-0125", | |
input_cost_per_1k_tokens=0.0005, # $0.0005 per 1K input tokens | |
output_cost_per_1k_tokens=0.0015, # $0.0015 per 1K output tokens | |
description="GPT-3.5 Turbo (O3) - Latest" | |
), | |
} | |
def __init__(self): | |
self.usage_history: list[TokenUsage] = [] | |
self.current_session_tokens = 0 | |
self.current_session_cost = 0.0 | |
def record_usage(self, prompt_tokens: int, completion_tokens: int, model: str) -> TokenUsage: | |
"""Record token usage from an API call.""" | |
total_tokens = prompt_tokens + completion_tokens | |
usage = TokenUsage( | |
prompt_tokens=prompt_tokens, | |
completion_tokens=completion_tokens, | |
total_tokens=total_tokens, | |
model=model, | |
timestamp=datetime.now() | |
) | |
self.usage_history.append(usage) | |
self.current_session_tokens += total_tokens | |
# Calculate cost for this usage | |
cost = self._calculate_cost(prompt_tokens, completion_tokens, model) | |
self.current_session_cost += cost | |
logger.info(f"Recorded usage: {prompt_tokens} input + {completion_tokens} output = {total_tokens} total tokens " | |
f"for model {model}, cost: ${cost:.6f}") | |
return usage | |
def _calculate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float: | |
"""Calculate cost for given token usage and model.""" | |
if model not in self.MODEL_PRICING: | |
logger.warning(f"Unknown model pricing for {model}, using default pricing") | |
# Try to guess the model type based on the name | |
if "mini" in model.lower(): | |
# Assume it's a mini model (cheapest) | |
model = "o3-mini" | |
elif "o4" in model.lower(): | |
# Assume it's O4 (most expensive) | |
model = "o4" | |
elif "o3" in model.lower(): | |
# Assume it's O3 (medium) | |
model = "o3" | |
else: | |
# Default to cheapest option | |
model = "o3-mini" | |
pricing = self.MODEL_PRICING[model] | |
input_cost = (input_tokens / 1000) * pricing.input_cost_per_1k_tokens | |
output_cost = (output_tokens / 1000) * pricing.output_cost_per_1k_tokens | |
return input_cost + output_cost | |
def get_session_summary(self) -> Dict[str, any]: | |
"""Get summary of current session usage.""" | |
if not self.usage_history: | |
return { | |
"total_tokens": 0, | |
"total_cost": 0.0, | |
"model_breakdown": {}, | |
"usage_count": 0 | |
} | |
model_breakdown = {} | |
for usage in self.usage_history: | |
if usage.model not in model_breakdown: | |
model_breakdown[usage.model] = { | |
"input_tokens": 0, | |
"output_tokens": 0, | |
"total_tokens": 0, | |
"cost": 0.0, | |
"usage_count": 0 | |
} | |
model_breakdown[usage.model]["input_tokens"] += usage.prompt_tokens | |
model_breakdown[usage.model]["output_tokens"] += usage.completion_tokens | |
model_breakdown[usage.model]["total_tokens"] += usage.total_tokens | |
model_breakdown[usage.model]["usage_count"] += 1 | |
model_breakdown[usage.model]["cost"] += self._calculate_cost( | |
usage.prompt_tokens, usage.completion_tokens, usage.model | |
) | |
return { | |
"total_tokens": self.current_session_tokens, | |
"total_cost": self.current_session_cost, | |
"model_breakdown": model_breakdown, | |
"usage_count": len(self.usage_history) | |
} | |
def reset_session(self): | |
"""Reset current session statistics.""" | |
self.usage_history = [] | |
self.current_session_tokens = 0 | |
self.current_session_cost = 0.0 | |
logger.info("Cost tracker session reset") | |
def get_available_models(self) -> list[str]: | |
"""Get list of available models with pricing.""" | |
return list(self.MODEL_PRICING.keys()) | |
def get_model_info(self, model: str) -> Optional[ModelPricing]: | |
"""Get pricing information for a specific model.""" | |
return self.MODEL_PRICING.get(model) | |
def add_deployment_pricing(self, deployment_name: str, model_type: str = "o3-mini"): | |
"""Add pricing for a custom deployment name by mapping it to an existing model type.""" | |
if deployment_name in self.MODEL_PRICING: | |
return # Already exists | |
# Map deployment name to existing model pricing | |
if model_type in self.MODEL_PRICING: | |
base_pricing = self.MODEL_PRICING[model_type] | |
self.MODEL_PRICING[deployment_name] = ModelPricing( | |
model_name=deployment_name, | |
input_cost_per_1k_tokens=base_pricing.input_cost_per_1k_tokens, | |
output_cost_per_1k_tokens=base_pricing.output_cost_per_1k_tokens, | |
description=f"{deployment_name} ({base_pricing.description})" | |
) | |
logger.info(f"Added pricing for deployment {deployment_name} based on {model_type}") | |
else: | |
logger.warning(f"Unknown model type {model_type} for deployment {deployment_name}") | |
def guess_model_type(self, deployment_name: str) -> str: | |
"""Guess the model type based on deployment name.""" | |
deployment_lower = deployment_name.lower() | |
if "mini" in deployment_lower: | |
return "o3-mini" | |
elif "o4" in deployment_lower: | |
return "o4" | |
elif "o3" in deployment_lower: | |
return "o3" | |
else: | |
return "o3-mini" # Default to cheapest | |
# Global cost tracker instance | |
cost_tracker = CostTracker() |