|
""" |
|
Unified Benchmark Pipeline |
|
|
|
This module provides a unified interface to run all benchmark pipelines |
|
with a single model name. |
|
""" |
|
|
|
import logging |
|
import json |
|
import os |
|
import re |
|
from typing import Dict, Any, List, Optional, Tuple |
|
import traceback |
|
import hashlib |
|
|
|
|
|
try: |
|
from email_validator import validate_email, EmailNotValidError |
|
logger = logging.getLogger(__name__) |
|
except ImportError as e: |
|
|
|
validate_email = None |
|
EmailNotValidError = Exception |
|
logger = logging.getLogger(__name__) |
|
logger.warning(f"email-validator library not installed: {e}") |
|
|
|
from api.clients.airflow_client import AirflowClient |
|
from api.config import get_api_config_for_type, get_airflow_config |
|
from pipelines.benchmark_configs import get_unified_benchmark_config |
|
from src.utils import log_model_submission |
|
|
|
|
|
from src.display.formatting import styled_error, styled_message, styled_warning |
|
from src.submission.check_model_type import check_model_type |
|
from src.submission.check_validity import determine_model_type |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
logging.getLogger("api.clients.airflow_client").setLevel(logging.ERROR) |
|
logging.getLogger("mezura.http").setLevel(logging.ERROR) |
|
logging.getLogger("api.config").setLevel(logging.ERROR) |
|
|
|
|
|
def get_dag_id(benchmark_type: str) -> str: |
|
""" |
|
Get the DAG ID for a benchmark type from configuration |
|
|
|
Args: |
|
benchmark_type: Type of benchmark |
|
|
|
Returns: |
|
str: DAG ID for the benchmark |
|
""" |
|
config = get_api_config_for_type(benchmark_type) |
|
return config.get("dag_id", f"{benchmark_type}_benchmark") |
|
|
|
|
|
DAG_IDS = { |
|
"unified": "accept_request_dag", |
|
"hybrid": "evalmix", |
|
|
|
"rag": "rag_benchmark", |
|
"snake": "snake_benchmark", |
|
"arena": "arena_evaluation", |
|
"light": "lighteval_benchmark" |
|
} |
|
|
|
class BenchmarkRunner: |
|
""" |
|
Runner class for unified benchmark execution |
|
""" |
|
|
|
def __init__(self): |
|
"""Initialize the benchmark runner""" |
|
self.client = AirflowClient() |
|
|
|
def run_all_benchmarks(self, hf_repo: str, base_model: str = None) -> Dict[str, Any]: |
|
""" |
|
Run the unified benchmark pipeline for a single model |
|
|
|
Args: |
|
hf_repo: Name of the model repository to evaluate |
|
base_model: Base model information (optional) |
|
|
|
Returns: |
|
Dict[str, Any]: Dictionary with benchmark results |
|
""" |
|
|
|
logger.info("Preparing benchmark task") |
|
|
|
|
|
benchmark_config = get_unified_benchmark_config(hf_repo, base_model) |
|
|
|
|
|
try: |
|
logger.info("Submitting benchmark task to execution system") |
|
result = self.client.send_dag_request( |
|
dag_id=DAG_IDS["unified"], |
|
conf=benchmark_config["conf"] |
|
) |
|
|
|
return { |
|
"status": "success", |
|
"message": f"Benchmark started successfully", |
|
"results": {"unified": result} |
|
} |
|
|
|
except Exception as e: |
|
logger.error("Benchmark submission failed") |
|
return { |
|
"status": "error", |
|
"message": f"Error running benchmark: {str(e)}", |
|
"results": {} |
|
} |
|
|
|
def run_all_benchmarks_with_config(self, benchmark_config: Dict[str, Any]) -> Dict[str, Any]: |
|
""" |
|
Run the unified benchmark pipeline for a single model with a given benchmark configuration |
|
|
|
Args: |
|
benchmark_config: Dictionary with benchmark configuration |
|
|
|
Returns: |
|
Dict[str, Any]: Dictionary with benchmark results |
|
""" |
|
|
|
model_name = benchmark_config.get("conf", {}).get("repo_id", "model") |
|
if "hf_repo" in benchmark_config.get("conf", {}): |
|
model_name = benchmark_config["conf"]["hf_repo"] |
|
logger.info(f"Preparing benchmark with configuration for model: {model_name}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
repo_id = benchmark_config.get('conf', {}).get('repo_id', 'unknown') |
|
base_model = benchmark_config.get('conf', {}).get('base_model', 'unknown') |
|
|
|
|
|
|
|
log_model_submission(repo_id, base_model) |
|
|
|
|
|
try: |
|
logger.info("Submitting benchmark task to execution system") |
|
result = self.client.send_dag_request( |
|
dag_id=DAG_IDS["unified"], |
|
conf=benchmark_config["conf"] |
|
) |
|
|
|
return { |
|
"status": "success", |
|
"message": "Benchmark started successfully", |
|
"results": {"unified": result} |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Benchmark submission failed: {str(e)}") |
|
logger.error(f"Exception details: {traceback.format_exc()}") |
|
return { |
|
"status": "error", |
|
"message": f"Error running benchmark: {str(e)}", |
|
"results": {} |
|
} |
|
|
|
|
|
def is_valid_email(email: str) -> bool: |
|
""" |
|
Validate email using email-validator library |
|
|
|
Args: |
|
email: Email string to validate |
|
|
|
Returns: |
|
bool: True if email is valid according to email-validator |
|
""" |
|
if not email: |
|
return False |
|
|
|
|
|
if validate_email is not None: |
|
try: |
|
|
|
emailinfo = validate_email(email, check_deliverability=False) |
|
logger.info(f"Email validation successful") |
|
|
|
return True |
|
except EmailNotValidError as e: |
|
|
|
logger.info(f"Email validation failed") |
|
return False |
|
|
|
|
|
logger.warning("Using fallback email validation") |
|
basic_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$') |
|
if bool(basic_pattern.match(email)): |
|
logger.info(f"Email validation successful") |
|
return True |
|
|
|
logger.info(f"Email validation failed") |
|
return False |
|
|
|
def submit_unified_benchmark(hf_repo: str, base_model: str = None, reasoning: bool = False, email: str = None, profile=None): |
|
""" |
|
Submit unified benchmark for a single model |
|
|
|
Args: |
|
hf_repo: Name of the model repository to evaluate |
|
base_model: Base model information (artık kullanılmıyor) |
|
reasoning: Whether to enable reasoning capability during evaluation |
|
email: Email address for notification (required) |
|
profile: User profile object from Hugging Face login (required) |
|
|
|
Returns: |
|
Tuple[str, Dict]: A tuple containing: |
|
- result_text: A markdown string with the status message |
|
- dag_run_info: A dictionary with the DAG run information |
|
""" |
|
try: |
|
|
|
|
|
if profile is None: |
|
return styled_error("Authentication required. Please log in with your Hugging Face account to submit models."), {} |
|
|
|
|
|
username = None |
|
try: |
|
if hasattr(profile, 'username'): |
|
username = profile.username |
|
elif hasattr(profile, 'name'): |
|
username = profile.name |
|
elif isinstance(profile, str) and profile.strip(): |
|
username = profile |
|
|
|
|
|
logout_pattern = re.compile(r'Logout \(([^)]+)\)') |
|
match = logout_pattern.search(username) |
|
if match: |
|
username = match.group(1) |
|
|
|
|
|
|
|
elif '(' in username and ')' in username: |
|
|
|
start = username.rindex('(') + 1 |
|
end = username.find(')', start) |
|
if start < end: |
|
extracted = username[start:end].strip() |
|
if extracted: |
|
username = extracted |
|
|
|
|
|
|
|
if username is not None: |
|
if not isinstance(username, str): |
|
username = str(username) |
|
except Exception as e: |
|
username = None |
|
logger.warning(f"Failed to extract username from profile: {str(e)}") |
|
|
|
|
|
logger.info(f"Submission authorized for user: {username}") |
|
logger.info(f"Benchmark process started") |
|
|
|
|
|
valid_email = None |
|
if email: |
|
try: |
|
|
|
if validate_email is not None: |
|
|
|
emailinfo = validate_email(email, check_deliverability=False) |
|
valid_email = emailinfo.normalized |
|
logger.info(f"Email validation completed") |
|
else: |
|
|
|
is_valid = is_valid_email(email) |
|
if is_valid: |
|
valid_email = email |
|
logger.info(f"Email validation completed") |
|
else: |
|
logger.warning(f"Email validation failed") |
|
return styled_warning("Invalid email address. Please enter a valid email address."), {} |
|
except EmailNotValidError as e: |
|
logger.warning(f"Email validation failed") |
|
return styled_warning(f"Invalid email address: {str(e)}"), {} |
|
else: |
|
|
|
logger.warning(f"Email required but not provided") |
|
return styled_warning("Please provide an email address to receive benchmark results."), {} |
|
|
|
|
|
_, model_data = get_model_information(hf_repo, display_full_info=False) |
|
|
|
|
|
|
|
|
|
model_type, type_message = determine_model_type(hf_repo) |
|
logger.info(f"Model type determination completed") |
|
|
|
if model_type == "unknown": |
|
return styled_warning(f"Could not determine model type. Benchmark not submitted."), {} |
|
|
|
|
|
if model_type == "merge" or model_type == "merged_model": |
|
logger.warning(f"Merged model detected. Currently not supported.") |
|
return styled_warning(f"Merged models are not supported yet. Benchmark not submitted."), {} |
|
|
|
|
|
|
|
if "/" in hf_repo: |
|
org, model_name = hf_repo.split("/", 1) |
|
short_name = model_name |
|
else: |
|
short_name = hf_repo |
|
|
|
|
|
|
|
clean_name = re.sub(r'[^a-zA-Z0-9]', '-', short_name) |
|
|
|
if len(clean_name) > 20: |
|
clean_name = clean_name[:20] |
|
|
|
|
|
if "eval" not in clean_name.lower(): |
|
eval_name = f"{clean_name}-eval" |
|
else: |
|
eval_name = clean_name |
|
|
|
|
|
if len(eval_name) > 28: |
|
eval_name = eval_name[:28] |
|
|
|
logger.info(f"Evaluation name generation completed") |
|
|
|
|
|
runner = BenchmarkRunner() |
|
|
|
|
|
benchmark_config = get_unified_benchmark_config(hf_repo, base_model) |
|
|
|
|
|
if "conf" in benchmark_config: |
|
|
|
benchmark_config["conf"]["hf_repo"] = hf_repo |
|
|
|
benchmark_config["conf"]["repo_id"] = hf_repo |
|
|
|
|
|
benchmark_config["conf"]["model_type"] = model_type |
|
benchmark_config["conf"]["unique_model_name"] = eval_name |
|
benchmark_config["conf"]["reasoning"] = reasoning |
|
|
|
benchmark_config["conf"]["base_model"] = base_model |
|
|
|
|
|
if valid_email: |
|
benchmark_config["conf"]["email"] = valid_email |
|
|
|
|
|
if username is not None: |
|
|
|
if not isinstance(username, str): |
|
username = str(username) |
|
|
|
if len(username) > 100: |
|
username = username[:100] |
|
|
|
|
|
user_id = hashlib.md5(username.encode()).hexdigest() |
|
|
|
benchmark_config["conf"]["user_id"] = user_id |
|
|
|
|
|
request_hash_input = f"{hf_repo}_{base_model}_{reasoning}" |
|
request_id = hashlib.md5(request_hash_input.encode()).hexdigest() |
|
|
|
benchmark_config["conf"]["request_id"] = request_id |
|
|
|
|
|
benchmark_config["conf"]["username"] = username |
|
else: |
|
|
|
logger.error("Username not available, cannot submit benchmark request") |
|
return styled_error("Authentication error. Username not available."), {} |
|
|
|
|
|
logger.info("Submitting benchmark task") |
|
results = runner.run_all_benchmarks_with_config(benchmark_config) |
|
|
|
|
|
dag_run_info = {} |
|
if results.get("status") == "success" and "unified" in results.get("results", {}): |
|
unified_result = results["results"]["unified"] |
|
if "run_id" in unified_result: |
|
dag_run_info = { |
|
"dag_run_id": unified_result["run_id"], |
|
"dag_id": DAG_IDS["unified"], |
|
"status": "queued" |
|
} |
|
|
|
|
|
if results["status"] == "success": |
|
success_msg = f"Benchmark started for {hf_repo} (Type: {model_type})" |
|
if valid_email: |
|
success_msg += f". Results will be sent to {valid_email}." |
|
result_message = styled_message(success_msg) |
|
logger.info("Benchmark successfully submitted") |
|
else: |
|
|
|
logger.error(f"Benchmark submission failed") |
|
result_message = styled_error("Failed to start benchmark") |
|
|
|
|
|
return result_message, dag_run_info |
|
|
|
except Exception as e: |
|
|
|
logger.error(f"Error during benchmark submission: {str(e)}") |
|
logger.error(f"Exception details: {traceback.format_exc()}") |
|
|
|
return styled_error("An error occurred while submitting the benchmark"), {} |
|
|
|
def get_model_information(hf_repo: str, display_full_info: bool = True) -> Tuple[str, dict]: |
|
""" |
|
Get model type and information. |
|
|
|
Args: |
|
hf_repo: Model repository ID |
|
display_full_info: Whether to include detailed information in the returned message |
|
|
|
Returns: |
|
Tuple[str, dict]: A tuple containing: |
|
- message: Formatted message about the model |
|
- model_info: Dictionary with model information |
|
""" |
|
try: |
|
logger.info("Analyzing model information") |
|
model_data = check_model_type(hf_repo) |
|
|
|
if "error" in model_data.get("info", {}): |
|
error_message = model_data["info"]["error"] |
|
logger.error("Model analysis failed") |
|
return styled_error("Failed to analyze model"), {} |
|
|
|
|
|
if not display_full_info: |
|
logger.info("Model analysis completed") |
|
return f"Model analysis completed.", model_data |
|
|
|
model_type = model_data.get("model_type", "unknown") |
|
info = model_data.get("info", {}) |
|
|
|
|
|
message = f"<div style='text-align: left;'>" |
|
message += f"<h3>Model: {hf_repo}</h3>" |
|
message += f"<p><b>Type:</b> {model_type.capitalize()}</p>" |
|
|
|
if "base_model" in info: |
|
message += f"<p><b>Base Model:</b> {info['base_model']}</p>" |
|
message += "</div>" |
|
|
|
logger.info("Model analysis completed with full information") |
|
return message, model_data |
|
|
|
except Exception as e: |
|
|
|
logger.error("Error during model analysis") |
|
return styled_error("Failed to get model information"), {} |
|
|
|
def check_user_login(profile): |
|
if profile is None: |
|
return False, "Please log in with your Hugging Face account to submit models for benchmarking." |
|
|
|
|
|
if isinstance(profile, str): |
|
if profile == "": |
|
return False, "Please log in with your Hugging Face account to submit models for benchmarking." |
|
return True, f"Logged in as {profile}" |
|
|
|
|
|
return True, f"Logged in as {profile.username}" |