Mezura / pipelines /unified_benchmark.py
nmmursit's picture
initial commit
3232d64 verified
"""
Unified Benchmark Pipeline
This module provides a unified interface to run all benchmark pipelines
with a single model name.
"""
import logging
import json
import os
import re
from typing import Dict, Any, List, Optional, Tuple
import traceback
import hashlib
# Import email validator library
try:
from email_validator import validate_email, EmailNotValidError
logger = logging.getLogger(__name__)
except ImportError as e:
# If library is not installed, log warning
validate_email = None
EmailNotValidError = Exception
logger = logging.getLogger(__name__)
logger.warning(f"email-validator library not installed: {e}")
from api.clients.airflow_client import AirflowClient
from api.config import get_api_config_for_type, get_airflow_config
from pipelines.benchmark_configs import get_unified_benchmark_config
from src.utils import log_model_submission
# Import formatting utilities
from src.display.formatting import styled_error, styled_message, styled_warning
from src.submission.check_model_type import check_model_type
from src.submission.check_validity import determine_model_type
# Set up logger
logger = logging.getLogger(__name__)
# Configure logging to be minimal
logging.getLogger("api.clients.airflow_client").setLevel(logging.ERROR)
logging.getLogger("mezura.http").setLevel(logging.ERROR)
logging.getLogger("api.config").setLevel(logging.ERROR)
# Get DAG IDs from configuration
def get_dag_id(benchmark_type: str) -> str:
"""
Get the DAG ID for a benchmark type from configuration
Args:
benchmark_type: Type of benchmark
Returns:
str: DAG ID for the benchmark
"""
config = get_api_config_for_type(benchmark_type)
return config.get("dag_id", f"{benchmark_type}_benchmark")
# Map benchmark types to their DAG IDs
DAG_IDS = {
"unified": "accept_request_dag",
"hybrid": "evalmix",
# "lmharness": "lmharness_benchmark", # LM Harness removed
"rag": "rag_benchmark",
"snake": "snake_benchmark",
"arena": "arena_evaluation",
"light": "lighteval_benchmark"
}
class BenchmarkRunner:
"""
Runner class for unified benchmark execution
"""
def __init__(self):
"""Initialize the benchmark runner"""
self.client = AirflowClient()
def run_all_benchmarks(self, hf_repo: str, base_model: str = None) -> Dict[str, Any]:
"""
Run the unified benchmark pipeline for a single model
Args:
hf_repo: Name of the model repository to evaluate
base_model: Base model information (optional)
Returns:
Dict[str, Any]: Dictionary with benchmark results
"""
# Log the benchmark start
logger.info("Preparing benchmark task")
# Get the unified benchmark configuration
benchmark_config = get_unified_benchmark_config(hf_repo, base_model)
# Send the benchmark request
try:
logger.info("Submitting benchmark task to execution system")
result = self.client.send_dag_request(
dag_id=DAG_IDS["unified"],
conf=benchmark_config["conf"]
)
return {
"status": "success",
"message": f"Benchmark started successfully",
"results": {"unified": result}
}
except Exception as e:
logger.error("Benchmark submission failed")
return {
"status": "error",
"message": f"Error running benchmark: {str(e)}",
"results": {}
}
def run_all_benchmarks_with_config(self, benchmark_config: Dict[str, Any]) -> Dict[str, Any]:
"""
Run the unified benchmark pipeline for a single model with a given benchmark configuration
Args:
benchmark_config: Dictionary with benchmark configuration
Returns:
Dict[str, Any]: Dictionary with benchmark results
"""
# Get the model name
model_name = benchmark_config.get("conf", {}).get("repo_id", "model")
if "hf_repo" in benchmark_config.get("conf", {}):
model_name = benchmark_config["conf"]["hf_repo"]
logger.info(f"Preparing benchmark with configuration for model: {model_name}")
# SECURITY: Commented out to prevent potential credential exposure
# logger.info(f"Benchmark configuration: {json.dumps(benchmark_config)}")
# SECURITY: Commented out to prevent potential credential exposure
# logger.info(f"POST payload: {json.dumps(benchmark_config['conf'])}")
# Add specific logging for base model and repo ID
repo_id = benchmark_config.get('conf', {}).get('repo_id', 'unknown')
base_model = benchmark_config.get('conf', {}).get('base_model', 'unknown')
# logger.info(f"SENDING TO AIRFLOW - REPO_ID: {repo_id}, BASE_MODEL: {base_model}")
# Log to dedicated submissions log file
log_model_submission(repo_id, base_model)
# Send the benchmark request
try:
logger.info("Submitting benchmark task to execution system")
result = self.client.send_dag_request(
dag_id=DAG_IDS["unified"],
conf=benchmark_config["conf"]
)
return {
"status": "success",
"message": "Benchmark started successfully",
"results": {"unified": result}
}
except Exception as e:
logger.error(f"Benchmark submission failed: {str(e)}")
logger.error(f"Exception details: {traceback.format_exc()}")
return {
"status": "error",
"message": f"Error running benchmark: {str(e)}",
"results": {}
}
# Email validation function with email-validator
def is_valid_email(email: str) -> bool:
"""
Validate email using email-validator library
Args:
email: Email string to validate
Returns:
bool: True if email is valid according to email-validator
"""
if not email:
return False
# Use email-validator library if available
if validate_email is not None:
try:
# Validate the email (no deliverability check needed for our case)
emailinfo = validate_email(email, check_deliverability=False)
logger.info(f"Email validation successful")
# Store the normalized form of the email address
return True
except EmailNotValidError as e:
# Log the specific validation error
logger.info(f"Email validation failed")
return False
# If library not installed, fall back to simple regex validation
logger.warning("Using fallback email validation")
basic_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
if bool(basic_pattern.match(email)):
logger.info(f"Email validation successful")
return True
logger.info(f"Email validation failed")
return False
def submit_unified_benchmark(hf_repo: str, base_model: str = None, reasoning: bool = False, email: str = None, profile=None):
"""
Submit unified benchmark for a single model
Args:
hf_repo: Name of the model repository to evaluate
base_model: Base model information (artık kullanılmıyor)
reasoning: Whether to enable reasoning capability during evaluation
email: Email address for notification (required)
profile: User profile object from Hugging Face login (required)
Returns:
Tuple[str, Dict]: A tuple containing:
- result_text: A markdown string with the status message
- dag_run_info: A dictionary with the DAG run information
"""
try:
# Verify user is logged in before allowing submission
if profile is None:
return styled_error("Authentication required. Please log in with your Hugging Face account to submit models."), {}
# Get username from profile for logging
username = None
try:
if hasattr(profile, 'username'):
username = profile.username
elif hasattr(profile, 'name'):
username = profile.name
elif isinstance(profile, str) and profile.strip():
username = profile
# Extract username from "Logout (username)" format if present
logout_pattern = re.compile(r'Logout \(([^)]+)\)')
match = logout_pattern.search(username)
if match:
username = match.group(1)
# Also try other common formats
# Handle any string with parentheses containing the username
elif '(' in username and ')' in username:
# Extract text between last opening and first closing parenthesis
start = username.rindex('(') + 1
end = username.find(')', start)
if start < end:
extracted = username[start:end].strip()
if extracted: # Only use if not empty
username = extracted
# If none of the above conditions are met, keep username as None
# If username is not None, ensure it's a string
if username is not None:
if not isinstance(username, str):
username = str(username)
except Exception as e:
username = None
logger.warning(f"Failed to extract username from profile: {str(e)}")
# Log successful auth
logger.info(f"Submission authorized for user: {username}")
logger.info(f"Benchmark process started")
# Validate email if provided
valid_email = None
if email:
try:
# Use the full email validation to get normalized form
if validate_email is not None:
# Validate and normalize the email
emailinfo = validate_email(email, check_deliverability=False)
valid_email = emailinfo.normalized # Use normalized form
logger.info(f"Email validation completed")
else:
# Fallback if library not available
is_valid = is_valid_email(email)
if is_valid:
valid_email = email
logger.info(f"Email validation completed")
else:
logger.warning(f"Email validation failed")
return styled_warning("Invalid email address. Please enter a valid email address."), {}
except EmailNotValidError as e:
logger.warning(f"Email validation failed")
return styled_warning(f"Invalid email address: {str(e)}"), {}
else:
# Email is required
logger.warning(f"Email required but not provided")
return styled_warning("Please provide an email address to receive benchmark results."), {}
# First, analyze the model to get information without displaying details
_, model_data = get_model_information(hf_repo, display_full_info=False)
# Base model algılama ve kontrol işlemleri tamamen kaldırıldı
# Determine model type
model_type, type_message = determine_model_type(hf_repo)
logger.info(f"Model type determination completed")
if model_type == "unknown":
return styled_warning(f"Could not determine model type. Benchmark not submitted."), {}
# New check: Don't allow merged models
if model_type == "merge" or model_type == "merged_model":
logger.warning(f"Merged model detected. Currently not supported.")
return styled_warning(f"Merged models are not supported yet. Benchmark not submitted."), {}
# Step 3: Generate a model evaluation name - short, with "eval", under 28 chars
# Extract short name from repo ID
if "/" in hf_repo:
org, model_name = hf_repo.split("/", 1)
short_name = model_name
else:
short_name = hf_repo
# Clean the name and make it shorter if needed
# Remove special characters and replace with hyphens
clean_name = re.sub(r'[^a-zA-Z0-9]', '-', short_name)
# Truncate if too long
if len(clean_name) > 20: # Leave room for eval suffix
clean_name = clean_name[:20]
# Add eval suffix if not already present
if "eval" not in clean_name.lower():
eval_name = f"{clean_name}-eval"
else:
eval_name = clean_name
# Ensure the final name is under 28 characters
if len(eval_name) > 28:
eval_name = eval_name[:28]
logger.info(f"Evaluation name generation completed")
# Create benchmark runner
runner = BenchmarkRunner()
# Get the benchmark configuration and add model type parameter
benchmark_config = get_unified_benchmark_config(hf_repo, base_model)
# Make sure hf_repo is set correctly in the configuration
if "conf" in benchmark_config:
# Ensure hf_repo is set properly
benchmark_config["conf"]["hf_repo"] = hf_repo
# Also set repo_id for backwards compatibility
benchmark_config["conf"]["repo_id"] = hf_repo
# Add model type and model name to the configuration
benchmark_config["conf"]["model_type"] = model_type
benchmark_config["conf"]["unique_model_name"] = eval_name
benchmark_config["conf"]["reasoning"] = reasoning
# Set base_model
benchmark_config["conf"]["base_model"] = base_model
# Add email if valid
if valid_email:
benchmark_config["conf"]["email"] = valid_email
# Create a unique user_id based ONLY on username
if username is not None:
# Ensure username is a simple string
if not isinstance(username, str):
username = str(username)
# Limit username length to avoid issues
if len(username) > 100:
username = username[:100]
# Create a unique hash from username only
user_id = hashlib.md5(username.encode()).hexdigest()
# Add user_id to the configuration
benchmark_config["conf"]["user_id"] = user_id
# Create a separate request_id based on repo_id, base_model and reasoning
request_hash_input = f"{hf_repo}_{base_model}_{reasoning}"
request_id = hashlib.md5(request_hash_input.encode()).hexdigest()
# Add request_id to the configuration
benchmark_config["conf"]["request_id"] = request_id
# Still add username for backward compatibility
benchmark_config["conf"]["username"] = username
else:
# Username is required for the request, so don't proceed
logger.error("Username not available, cannot submit benchmark request")
return styled_error("Authentication error. Username not available."), {}
# Execute the unified benchmark request
logger.info("Submitting benchmark task")
results = runner.run_all_benchmarks_with_config(benchmark_config)
# Format result for UI display
dag_run_info = {}
if results.get("status") == "success" and "unified" in results.get("results", {}):
unified_result = results["results"]["unified"]
if "run_id" in unified_result:
dag_run_info = {
"dag_run_id": unified_result["run_id"],
"dag_id": DAG_IDS["unified"],
"status": "queued"
}
# Create simple success/error message
if results["status"] == "success":
success_msg = f"Benchmark started for {hf_repo} (Type: {model_type})"
if valid_email:
success_msg += f". Results will be sent to {valid_email}."
result_message = styled_message(success_msg)
logger.info("Benchmark successfully submitted")
else:
# Log the error but show simplified message
logger.error(f"Benchmark submission failed")
result_message = styled_error("Failed to start benchmark")
# Return message and run info
return result_message, dag_run_info
except Exception as e:
# Log the full error
logger.error(f"Error during benchmark submission: {str(e)}")
logger.error(f"Exception details: {traceback.format_exc()}")
# Return simplified error message
return styled_error("An error occurred while submitting the benchmark"), {}
def get_model_information(hf_repo: str, display_full_info: bool = True) -> Tuple[str, dict]:
"""
Get model type and information.
Args:
hf_repo: Model repository ID
display_full_info: Whether to include detailed information in the returned message
Returns:
Tuple[str, dict]: A tuple containing:
- message: Formatted message about the model
- model_info: Dictionary with model information
"""
try:
logger.info("Analyzing model information")
model_data = check_model_type(hf_repo)
if "error" in model_data.get("info", {}):
error_message = model_data["info"]["error"]
logger.error("Model analysis failed")
return styled_error("Failed to analyze model"), {}
# If we don't need to display full info, return minimal message
if not display_full_info:
logger.info("Model analysis completed")
return f"Model analysis completed.", model_data
model_type = model_data.get("model_type", "unknown")
info = model_data.get("info", {})
# Format a nice message with full information
message = f"<div style='text-align: left;'>"
message += f"<h3>Model: {hf_repo}</h3>"
message += f"<p><b>Type:</b> {model_type.capitalize()}</p>"
if "base_model" in info:
message += f"<p><b>Base Model:</b> {info['base_model']}</p>"
message += "</div>"
logger.info("Model analysis completed with full information")
return message, model_data
except Exception as e:
# Log the error but don't show details to user
logger.error("Error during model analysis")
return styled_error("Failed to get model information"), {}
def check_user_login(profile):
if profile is None:
return False, "Please log in with your Hugging Face account to submit models for benchmarking."
# In some environments, profile may be a string instead of a profile object
if isinstance(profile, str):
if profile == "":
return False, "Please log in with your Hugging Face account to submit models for benchmarking."
return True, f"Logged in as {profile}"
# Normal case where profile is an object with username attribute
return True, f"Logged in as {profile.username}"