""" Unified Benchmark Pipeline This module provides a unified interface to run all benchmark pipelines with a single model name. """ import logging import json import os import re from typing import Dict, Any, List, Optional, Tuple import traceback import hashlib # Import email validator library try: from email_validator import validate_email, EmailNotValidError logger = logging.getLogger(__name__) except ImportError as e: # If library is not installed, log warning validate_email = None EmailNotValidError = Exception logger = logging.getLogger(__name__) logger.warning(f"email-validator library not installed: {e}") from api.clients.airflow_client import AirflowClient from api.config import get_api_config_for_type, get_airflow_config from pipelines.benchmark_configs import get_unified_benchmark_config from src.utils import log_model_submission # Import formatting utilities from src.display.formatting import styled_error, styled_message, styled_warning from src.submission.check_model_type import check_model_type from src.submission.check_validity import determine_model_type # Set up logger logger = logging.getLogger(__name__) # Configure logging to be minimal logging.getLogger("api.clients.airflow_client").setLevel(logging.ERROR) logging.getLogger("mezura.http").setLevel(logging.ERROR) logging.getLogger("api.config").setLevel(logging.ERROR) # Get DAG IDs from configuration def get_dag_id(benchmark_type: str) -> str: """ Get the DAG ID for a benchmark type from configuration Args: benchmark_type: Type of benchmark Returns: str: DAG ID for the benchmark """ config = get_api_config_for_type(benchmark_type) return config.get("dag_id", f"{benchmark_type}_benchmark") # Map benchmark types to their DAG IDs DAG_IDS = { "unified": "accept_request_dag", "hybrid": "evalmix", # "lmharness": "lmharness_benchmark", # LM Harness removed "rag": "rag_benchmark", "snake": "snake_benchmark", "arena": "arena_evaluation", "light": "lighteval_benchmark" } class BenchmarkRunner: """ Runner class for unified benchmark execution """ def __init__(self): """Initialize the benchmark runner""" self.client = AirflowClient() def run_all_benchmarks(self, hf_repo: str, base_model: str = None) -> Dict[str, Any]: """ Run the unified benchmark pipeline for a single model Args: hf_repo: Name of the model repository to evaluate base_model: Base model information (optional) Returns: Dict[str, Any]: Dictionary with benchmark results """ # Log the benchmark start logger.info("Preparing benchmark task") # Get the unified benchmark configuration benchmark_config = get_unified_benchmark_config(hf_repo, base_model) # Send the benchmark request try: logger.info("Submitting benchmark task to execution system") result = self.client.send_dag_request( dag_id=DAG_IDS["unified"], conf=benchmark_config["conf"] ) return { "status": "success", "message": f"Benchmark started successfully", "results": {"unified": result} } except Exception as e: logger.error("Benchmark submission failed") return { "status": "error", "message": f"Error running benchmark: {str(e)}", "results": {} } def run_all_benchmarks_with_config(self, benchmark_config: Dict[str, Any]) -> Dict[str, Any]: """ Run the unified benchmark pipeline for a single model with a given benchmark configuration Args: benchmark_config: Dictionary with benchmark configuration Returns: Dict[str, Any]: Dictionary with benchmark results """ # Get the model name model_name = benchmark_config.get("conf", {}).get("repo_id", "model") if "hf_repo" in benchmark_config.get("conf", {}): model_name = benchmark_config["conf"]["hf_repo"] logger.info(f"Preparing benchmark with configuration for model: {model_name}") # SECURITY: Commented out to prevent potential credential exposure # logger.info(f"Benchmark configuration: {json.dumps(benchmark_config)}") # SECURITY: Commented out to prevent potential credential exposure # logger.info(f"POST payload: {json.dumps(benchmark_config['conf'])}") # Add specific logging for base model and repo ID repo_id = benchmark_config.get('conf', {}).get('repo_id', 'unknown') base_model = benchmark_config.get('conf', {}).get('base_model', 'unknown') # logger.info(f"SENDING TO AIRFLOW - REPO_ID: {repo_id}, BASE_MODEL: {base_model}") # Log to dedicated submissions log file log_model_submission(repo_id, base_model) # Send the benchmark request try: logger.info("Submitting benchmark task to execution system") result = self.client.send_dag_request( dag_id=DAG_IDS["unified"], conf=benchmark_config["conf"] ) return { "status": "success", "message": "Benchmark started successfully", "results": {"unified": result} } except Exception as e: logger.error(f"Benchmark submission failed: {str(e)}") logger.error(f"Exception details: {traceback.format_exc()}") return { "status": "error", "message": f"Error running benchmark: {str(e)}", "results": {} } # Email validation function with email-validator def is_valid_email(email: str) -> bool: """ Validate email using email-validator library Args: email: Email string to validate Returns: bool: True if email is valid according to email-validator """ if not email: return False # Use email-validator library if available if validate_email is not None: try: # Validate the email (no deliverability check needed for our case) emailinfo = validate_email(email, check_deliverability=False) logger.info(f"Email validation successful") # Store the normalized form of the email address return True except EmailNotValidError as e: # Log the specific validation error logger.info(f"Email validation failed") return False # If library not installed, fall back to simple regex validation logger.warning("Using fallback email validation") basic_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$') if bool(basic_pattern.match(email)): logger.info(f"Email validation successful") return True logger.info(f"Email validation failed") return False def submit_unified_benchmark(hf_repo: str, base_model: str = None, reasoning: bool = False, email: str = None, profile=None): """ Submit unified benchmark for a single model Args: hf_repo: Name of the model repository to evaluate base_model: Base model information (artık kullanılmıyor) reasoning: Whether to enable reasoning capability during evaluation email: Email address for notification (required) profile: User profile object from Hugging Face login (required) Returns: Tuple[str, Dict]: A tuple containing: - result_text: A markdown string with the status message - dag_run_info: A dictionary with the DAG run information """ try: # Verify user is logged in before allowing submission if profile is None: return styled_error("Authentication required. Please log in with your Hugging Face account to submit models."), {} # Get username from profile for logging username = None try: if hasattr(profile, 'username'): username = profile.username elif hasattr(profile, 'name'): username = profile.name elif isinstance(profile, str) and profile.strip(): username = profile # Extract username from "Logout (username)" format if present logout_pattern = re.compile(r'Logout \(([^)]+)\)') match = logout_pattern.search(username) if match: username = match.group(1) # Also try other common formats # Handle any string with parentheses containing the username elif '(' in username and ')' in username: # Extract text between last opening and first closing parenthesis start = username.rindex('(') + 1 end = username.find(')', start) if start < end: extracted = username[start:end].strip() if extracted: # Only use if not empty username = extracted # If none of the above conditions are met, keep username as None # If username is not None, ensure it's a string if username is not None: if not isinstance(username, str): username = str(username) except Exception as e: username = None logger.warning(f"Failed to extract username from profile: {str(e)}") # Log successful auth logger.info(f"Submission authorized for user: {username}") logger.info(f"Benchmark process started") # Validate email if provided valid_email = None if email: try: # Use the full email validation to get normalized form if validate_email is not None: # Validate and normalize the email emailinfo = validate_email(email, check_deliverability=False) valid_email = emailinfo.normalized # Use normalized form logger.info(f"Email validation completed") else: # Fallback if library not available is_valid = is_valid_email(email) if is_valid: valid_email = email logger.info(f"Email validation completed") else: logger.warning(f"Email validation failed") return styled_warning("Invalid email address. Please enter a valid email address."), {} except EmailNotValidError as e: logger.warning(f"Email validation failed") return styled_warning(f"Invalid email address: {str(e)}"), {} else: # Email is required logger.warning(f"Email required but not provided") return styled_warning("Please provide an email address to receive benchmark results."), {} # First, analyze the model to get information without displaying details _, model_data = get_model_information(hf_repo, display_full_info=False) # Base model algılama ve kontrol işlemleri tamamen kaldırıldı # Determine model type model_type, type_message = determine_model_type(hf_repo) logger.info(f"Model type determination completed") if model_type == "unknown": return styled_warning(f"Could not determine model type. Benchmark not submitted."), {} # New check: Don't allow merged models if model_type == "merge" or model_type == "merged_model": logger.warning(f"Merged model detected. Currently not supported.") return styled_warning(f"Merged models are not supported yet. Benchmark not submitted."), {} # Step 3: Generate a model evaluation name - short, with "eval", under 28 chars # Extract short name from repo ID if "/" in hf_repo: org, model_name = hf_repo.split("/", 1) short_name = model_name else: short_name = hf_repo # Clean the name and make it shorter if needed # Remove special characters and replace with hyphens clean_name = re.sub(r'[^a-zA-Z0-9]', '-', short_name) # Truncate if too long if len(clean_name) > 20: # Leave room for eval suffix clean_name = clean_name[:20] # Add eval suffix if not already present if "eval" not in clean_name.lower(): eval_name = f"{clean_name}-eval" else: eval_name = clean_name # Ensure the final name is under 28 characters if len(eval_name) > 28: eval_name = eval_name[:28] logger.info(f"Evaluation name generation completed") # Create benchmark runner runner = BenchmarkRunner() # Get the benchmark configuration and add model type parameter benchmark_config = get_unified_benchmark_config(hf_repo, base_model) # Make sure hf_repo is set correctly in the configuration if "conf" in benchmark_config: # Ensure hf_repo is set properly benchmark_config["conf"]["hf_repo"] = hf_repo # Also set repo_id for backwards compatibility benchmark_config["conf"]["repo_id"] = hf_repo # Add model type and model name to the configuration benchmark_config["conf"]["model_type"] = model_type benchmark_config["conf"]["unique_model_name"] = eval_name benchmark_config["conf"]["reasoning"] = reasoning # Set base_model benchmark_config["conf"]["base_model"] = base_model # Add email if valid if valid_email: benchmark_config["conf"]["email"] = valid_email # Create a unique user_id based ONLY on username if username is not None: # Ensure username is a simple string if not isinstance(username, str): username = str(username) # Limit username length to avoid issues if len(username) > 100: username = username[:100] # Create a unique hash from username only user_id = hashlib.md5(username.encode()).hexdigest() # Add user_id to the configuration benchmark_config["conf"]["user_id"] = user_id # Create a separate request_id based on repo_id, base_model and reasoning request_hash_input = f"{hf_repo}_{base_model}_{reasoning}" request_id = hashlib.md5(request_hash_input.encode()).hexdigest() # Add request_id to the configuration benchmark_config["conf"]["request_id"] = request_id # Still add username for backward compatibility benchmark_config["conf"]["username"] = username else: # Username is required for the request, so don't proceed logger.error("Username not available, cannot submit benchmark request") return styled_error("Authentication error. Username not available."), {} # Execute the unified benchmark request logger.info("Submitting benchmark task") results = runner.run_all_benchmarks_with_config(benchmark_config) # Format result for UI display dag_run_info = {} if results.get("status") == "success" and "unified" in results.get("results", {}): unified_result = results["results"]["unified"] if "run_id" in unified_result: dag_run_info = { "dag_run_id": unified_result["run_id"], "dag_id": DAG_IDS["unified"], "status": "queued" } # Create simple success/error message if results["status"] == "success": success_msg = f"Benchmark started for {hf_repo} (Type: {model_type})" if valid_email: success_msg += f". Results will be sent to {valid_email}." result_message = styled_message(success_msg) logger.info("Benchmark successfully submitted") else: # Log the error but show simplified message logger.error(f"Benchmark submission failed") result_message = styled_error("Failed to start benchmark") # Return message and run info return result_message, dag_run_info except Exception as e: # Log the full error logger.error(f"Error during benchmark submission: {str(e)}") logger.error(f"Exception details: {traceback.format_exc()}") # Return simplified error message return styled_error("An error occurred while submitting the benchmark"), {} def get_model_information(hf_repo: str, display_full_info: bool = True) -> Tuple[str, dict]: """ Get model type and information. Args: hf_repo: Model repository ID display_full_info: Whether to include detailed information in the returned message Returns: Tuple[str, dict]: A tuple containing: - message: Formatted message about the model - model_info: Dictionary with model information """ try: logger.info("Analyzing model information") model_data = check_model_type(hf_repo) if "error" in model_data.get("info", {}): error_message = model_data["info"]["error"] logger.error("Model analysis failed") return styled_error("Failed to analyze model"), {} # If we don't need to display full info, return minimal message if not display_full_info: logger.info("Model analysis completed") return f"Model analysis completed.", model_data model_type = model_data.get("model_type", "unknown") info = model_data.get("info", {}) # Format a nice message with full information message = f"
Type: {model_type.capitalize()}
" if "base_model" in info: message += f"Base Model: {info['base_model']}
" message += "