File size: 19,991 Bytes
3232d64
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
"""
Unified Benchmark Pipeline

This module provides a unified interface to run all benchmark pipelines
with a single model name.
"""

import logging
import json
import os
import re
from typing import Dict, Any, List, Optional, Tuple
import traceback
import hashlib

# Import email validator library
try:
    from email_validator import validate_email, EmailNotValidError
    logger = logging.getLogger(__name__)
except ImportError as e:
    # If library is not installed, log warning
    validate_email = None
    EmailNotValidError = Exception
    logger = logging.getLogger(__name__)
    logger.warning(f"email-validator library not installed: {e}")

from api.clients.airflow_client import AirflowClient
from api.config import get_api_config_for_type, get_airflow_config
from pipelines.benchmark_configs import get_unified_benchmark_config
from src.utils import log_model_submission

# Import formatting utilities
from src.display.formatting import styled_error, styled_message, styled_warning
from src.submission.check_model_type import check_model_type
from src.submission.check_validity import determine_model_type

# Set up logger
logger = logging.getLogger(__name__)
# Configure logging to be minimal
logging.getLogger("api.clients.airflow_client").setLevel(logging.ERROR)
logging.getLogger("mezura.http").setLevel(logging.ERROR)
logging.getLogger("api.config").setLevel(logging.ERROR)

# Get DAG IDs from configuration
def get_dag_id(benchmark_type: str) -> str:
    """
    Get the DAG ID for a benchmark type from configuration
    
    Args:
        benchmark_type: Type of benchmark
        
    Returns:
        str: DAG ID for the benchmark
    """
    config = get_api_config_for_type(benchmark_type)
    return config.get("dag_id", f"{benchmark_type}_benchmark")

# Map benchmark types to their DAG IDs
DAG_IDS = {
    "unified": "accept_request_dag",
    "hybrid": "evalmix",
    # "lmharness": "lmharness_benchmark",  # LM Harness removed
    "rag": "rag_benchmark",
    "snake": "snake_benchmark",
    "arena": "arena_evaluation",
    "light": "lighteval_benchmark"
}

class BenchmarkRunner:
    """
    Runner class for unified benchmark execution
    """
    
    def __init__(self):
        """Initialize the benchmark runner"""
        self.client = AirflowClient()
        
    def run_all_benchmarks(self, hf_repo: str, base_model: str = None) -> Dict[str, Any]:
        """
        Run the unified benchmark pipeline for a single model
        
        Args:
            hf_repo: Name of the model repository to evaluate
            base_model: Base model information (optional)
            
        Returns:
            Dict[str, Any]: Dictionary with benchmark results
        """
        # Log the benchmark start
        logger.info("Preparing benchmark task")
        
        # Get the unified benchmark configuration
        benchmark_config = get_unified_benchmark_config(hf_repo, base_model)
        
        # Send the benchmark request
        try:
            logger.info("Submitting benchmark task to execution system")
            result = self.client.send_dag_request(
                dag_id=DAG_IDS["unified"],
                conf=benchmark_config["conf"]
            )
            
            return {
                "status": "success",
                "message": f"Benchmark started successfully",
                "results": {"unified": result}
            }
            
        except Exception as e:
            logger.error("Benchmark submission failed")
            return {
                "status": "error",
                "message": f"Error running benchmark: {str(e)}",
                "results": {}
            }

    def run_all_benchmarks_with_config(self, benchmark_config: Dict[str, Any]) -> Dict[str, Any]:
        """
        Run the unified benchmark pipeline for a single model with a given benchmark configuration
        
        Args:
            benchmark_config: Dictionary with benchmark configuration
            
        Returns:
            Dict[str, Any]: Dictionary with benchmark results
        """
        # Get the model name
        model_name = benchmark_config.get("conf", {}).get("repo_id", "model")
        if "hf_repo" in benchmark_config.get("conf", {}):
            model_name = benchmark_config["conf"]["hf_repo"]
        logger.info(f"Preparing benchmark with configuration for model: {model_name}")
        
        # SECURITY: Commented out to prevent potential credential exposure
        # logger.info(f"Benchmark configuration: {json.dumps(benchmark_config)}")
        
        # SECURITY: Commented out to prevent potential credential exposure
        # logger.info(f"POST payload: {json.dumps(benchmark_config['conf'])}")
        
        # Add specific logging for base model and repo ID
        repo_id = benchmark_config.get('conf', {}).get('repo_id', 'unknown')
        base_model = benchmark_config.get('conf', {}).get('base_model', 'unknown')
        # logger.info(f"SENDING TO AIRFLOW - REPO_ID: {repo_id}, BASE_MODEL: {base_model}")
        
        # Log to dedicated submissions log file
        log_model_submission(repo_id, base_model)
        
        # Send the benchmark request
        try:
            logger.info("Submitting benchmark task to execution system")
            result = self.client.send_dag_request(
                dag_id=DAG_IDS["unified"],
                conf=benchmark_config["conf"]
            )
            
            return {
                "status": "success",
                "message": "Benchmark started successfully",
                "results": {"unified": result}
            }
            
        except Exception as e:
            logger.error(f"Benchmark submission failed: {str(e)}")
            logger.error(f"Exception details: {traceback.format_exc()}")
            return {
                "status": "error",
                "message": f"Error running benchmark: {str(e)}",
                "results": {}
            }

# Email validation function with email-validator
def is_valid_email(email: str) -> bool:
    """
    Validate email using email-validator library
    
    Args:
        email: Email string to validate
        
    Returns:
        bool: True if email is valid according to email-validator
    """
    if not email:
        return False
    
    # Use email-validator library if available
    if validate_email is not None:
        try:
            # Validate the email (no deliverability check needed for our case)
            emailinfo = validate_email(email, check_deliverability=False)
            logger.info(f"Email validation successful")
            # Store the normalized form of the email address
            return True
        except EmailNotValidError as e:
            # Log the specific validation error
            logger.info(f"Email validation failed")
            return False
    
    # If library not installed, fall back to simple regex validation
    logger.warning("Using fallback email validation")
    basic_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
    if bool(basic_pattern.match(email)):
        logger.info(f"Email validation successful")
        return True
    
    logger.info(f"Email validation failed")
    return False

def submit_unified_benchmark(hf_repo: str, base_model: str = None, reasoning: bool = False, email: str = None, profile=None):
    """
    Submit unified benchmark for a single model
    
    Args:
        hf_repo: Name of the model repository to evaluate
        base_model: Base model information (artık kullanılmıyor)
        reasoning: Whether to enable reasoning capability during evaluation
        email: Email address for notification (required)
        profile: User profile object from Hugging Face login (required)
        
    Returns:
        Tuple[str, Dict]: A tuple containing:
            - result_text: A markdown string with the status message
            - dag_run_info: A dictionary with the DAG run information
    """
    try:
        
        # Verify user is logged in before allowing submission
        if profile is None:
            return styled_error("Authentication required. Please log in with your Hugging Face account to submit models."), {}
        
        # Get username from profile for logging
        username = None
        try:
            if hasattr(profile, 'username'):
                username = profile.username
            elif hasattr(profile, 'name'):
                username = profile.name
            elif isinstance(profile, str) and profile.strip():
                username = profile
                
                # Extract username from "Logout (username)" format if present
                logout_pattern = re.compile(r'Logout \(([^)]+)\)')
                match = logout_pattern.search(username)
                if match:
                    username = match.group(1)
                    
                # Also try other common formats
                # Handle any string with parentheses containing the username
                elif '(' in username and ')' in username:
                    # Extract text between last opening and first closing parenthesis
                    start = username.rindex('(') + 1
                    end = username.find(')', start)
                    if start < end:
                        extracted = username[start:end].strip()
                        if extracted:  # Only use if not empty
                            username = extracted
            # If none of the above conditions are met, keep username as None
                
            # If username is not None, ensure it's a string
            if username is not None:
                if not isinstance(username, str):
                    username = str(username)
        except Exception as e:
            username = None
            logger.warning(f"Failed to extract username from profile: {str(e)}")
            
        # Log successful auth
        logger.info(f"Submission authorized for user: {username}")
        logger.info(f"Benchmark process started")

        # Validate email if provided
        valid_email = None
        if email:
            try:
                # Use the full email validation to get normalized form
                if validate_email is not None:
                    # Validate and normalize the email
                    emailinfo = validate_email(email, check_deliverability=False)
                    valid_email = emailinfo.normalized  # Use normalized form
                    logger.info(f"Email validation completed")
                else:
                    # Fallback if library not available
                    is_valid = is_valid_email(email)
                    if is_valid:
                        valid_email = email
                        logger.info(f"Email validation completed")
                    else:
                        logger.warning(f"Email validation failed")
                        return styled_warning("Invalid email address. Please enter a valid email address."), {}
            except EmailNotValidError as e:
                logger.warning(f"Email validation failed")
                return styled_warning(f"Invalid email address: {str(e)}"), {}
        else:
            # Email is required
            logger.warning(f"Email required but not provided")
            return styled_warning("Please provide an email address to receive benchmark results."), {}

        # First, analyze the model to get information without displaying details
        _, model_data = get_model_information(hf_repo, display_full_info=False)
        
        # Base model algılama ve kontrol işlemleri tamamen kaldırıldı
        
        # Determine model type
        model_type, type_message = determine_model_type(hf_repo)
        logger.info(f"Model type determination completed")
        
        if model_type == "unknown":
            return styled_warning(f"Could not determine model type. Benchmark not submitted."), {}
            
        # New check: Don't allow merged models
        if model_type == "merge" or model_type == "merged_model":
            logger.warning(f"Merged model detected. Currently not supported.")
            return styled_warning(f"Merged models are not supported yet. Benchmark not submitted."), {}
            
        # Step 3: Generate a model evaluation name - short, with "eval", under 28 chars
        # Extract short name from repo ID
        if "/" in hf_repo:
            org, model_name = hf_repo.split("/", 1)
            short_name = model_name
        else:
            short_name = hf_repo
        
        # Clean the name and make it shorter if needed
        # Remove special characters and replace with hyphens
        clean_name = re.sub(r'[^a-zA-Z0-9]', '-', short_name)
        # Truncate if too long
        if len(clean_name) > 20:  # Leave room for eval suffix
            clean_name = clean_name[:20]
        
        # Add eval suffix if not already present
        if "eval" not in clean_name.lower():
            eval_name = f"{clean_name}-eval"
        else:
            eval_name = clean_name
            
        # Ensure the final name is under 28 characters
        if len(eval_name) > 28:
            eval_name = eval_name[:28]
            
        logger.info(f"Evaluation name generation completed")
        
        # Create benchmark runner
        runner = BenchmarkRunner()
        
        # Get the benchmark configuration and add model type parameter
        benchmark_config = get_unified_benchmark_config(hf_repo, base_model)
        
        # Make sure hf_repo is set correctly in the configuration
        if "conf" in benchmark_config:
            # Ensure hf_repo is set properly
            benchmark_config["conf"]["hf_repo"] = hf_repo
            # Also set repo_id for backwards compatibility
            benchmark_config["conf"]["repo_id"] = hf_repo
            
            # Add model type and model name to the configuration
            benchmark_config["conf"]["model_type"] = model_type
            benchmark_config["conf"]["unique_model_name"] = eval_name
            benchmark_config["conf"]["reasoning"] = reasoning
            # Set base_model 
            benchmark_config["conf"]["base_model"] = base_model
            
            # Add email if valid
            if valid_email:
                benchmark_config["conf"]["email"] = valid_email
                
            # Create a unique user_id based ONLY on username
            if username is not None:
                # Ensure username is a simple string
                if not isinstance(username, str):
                    username = str(username)
                # Limit username length to avoid issues
                if len(username) > 100:
                    username = username[:100]
                
                # Create a unique hash from username only
                user_id = hashlib.md5(username.encode()).hexdigest()
                # Add user_id to the configuration
                benchmark_config["conf"]["user_id"] = user_id
                
                # Create a separate request_id based on repo_id, base_model and reasoning
                request_hash_input = f"{hf_repo}_{base_model}_{reasoning}"
                request_id = hashlib.md5(request_hash_input.encode()).hexdigest()
                # Add request_id to the configuration
                benchmark_config["conf"]["request_id"] = request_id
                
                # Still add username for backward compatibility
                benchmark_config["conf"]["username"] = username
            else:
                # Username is required for the request, so don't proceed
                logger.error("Username not available, cannot submit benchmark request")
                return styled_error("Authentication error. Username not available."), {}
        
        # Execute the unified benchmark request
        logger.info("Submitting benchmark task")
        results = runner.run_all_benchmarks_with_config(benchmark_config)
        
        # Format result for UI display
        dag_run_info = {}
        if results.get("status") == "success" and "unified" in results.get("results", {}):
            unified_result = results["results"]["unified"]
            if "run_id" in unified_result:
                dag_run_info = {
                    "dag_run_id": unified_result["run_id"],
                    "dag_id": DAG_IDS["unified"],
                    "status": "queued"
                }
        
        # Create simple success/error message
        if results["status"] == "success":
            success_msg = f"Benchmark started for {hf_repo} (Type: {model_type})"
            if valid_email:
                success_msg += f". Results will be sent to {valid_email}."
            result_message = styled_message(success_msg)
            logger.info("Benchmark successfully submitted")
        else:
            # Log the error but show simplified message
            logger.error(f"Benchmark submission failed")
            result_message = styled_error("Failed to start benchmark")
        
        # Return message and run info
        return result_message, dag_run_info
    
    except Exception as e:
        # Log the full error
        logger.error(f"Error during benchmark submission: {str(e)}")
        logger.error(f"Exception details: {traceback.format_exc()}")
        # Return simplified error message
        return styled_error("An error occurred while submitting the benchmark"), {}

def get_model_information(hf_repo: str, display_full_info: bool = True) -> Tuple[str, dict]:
    """
    Get model type and information.
    
    Args:
        hf_repo: Model repository ID
        display_full_info: Whether to include detailed information in the returned message
        
    Returns:
        Tuple[str, dict]: A tuple containing:
            - message: Formatted message about the model
            - model_info: Dictionary with model information
    """
    try:
        logger.info("Analyzing model information")
        model_data = check_model_type(hf_repo)
        
        if "error" in model_data.get("info", {}):
            error_message = model_data["info"]["error"]
            logger.error("Model analysis failed")
            return styled_error("Failed to analyze model"), {}
        
        # If we don't need to display full info, return minimal message
        if not display_full_info:
            logger.info("Model analysis completed")
            return f"Model analysis completed.", model_data
        
        model_type = model_data.get("model_type", "unknown")
        info = model_data.get("info", {})
        
        # Format a nice message with full information
        message = f"<div style='text-align: left;'>"
        message += f"<h3>Model: {hf_repo}</h3>"
        message += f"<p><b>Type:</b> {model_type.capitalize()}</p>"
        
        if "base_model" in info:
            message += f"<p><b>Base Model:</b> {info['base_model']}</p>"
        message += "</div>"
        
        logger.info("Model analysis completed with full information")
        return message, model_data
        
    except Exception as e:
        # Log the error but don't show details to user
        logger.error("Error during model analysis")
        return styled_error("Failed to get model information"), {}

def check_user_login(profile):
    if profile is None:
        return False, "Please log in with your Hugging Face account to submit models for benchmarking."
    
    # In some environments, profile may be a string instead of a profile object
    if isinstance(profile, str):
        if profile == "":
            return False, "Please log in with your Hugging Face account to submit models for benchmarking."
        return True, f"Logged in as {profile}"
        
    # Normal case where profile is an object with username attribute
    return True, f"Logged in as {profile.username}"