Spaces:
Sleeping
Sleeping
""" | |
app.py | |
This script provides the Gradio web interface to run the evaluation. | |
This version focuses on robust image detection and processing. | |
""" | |
import os | |
import re | |
import gradio as gr | |
import requests | |
import pandas as pd | |
from urllib.parse import urlparse | |
import mimetypes | |
from typing import Optional, Tuple | |
from agent import create_agent_executor | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
# --- Helper function to parse the agent's output --- | |
def parse_final_answer(agent_response: str) -> str: | |
# Remove the FINAL ANSWER pattern search entirely | |
lines = [line for line in agent_response.split('\n') if line.strip()] | |
if lines: return lines[-1].strip() | |
return "Could not parse answer." | |
def detect_file_type_robust(url: str) -> Tuple[str, dict]: | |
""" | |
Robust file type detection with multiple validation methods. | |
Returns (file_type, metadata_dict) | |
""" | |
if not url or not url.strip(): | |
return "unknown", {"error": "Empty URL"} | |
url = url.strip() | |
metadata = {"original_url": url} | |
# Normalize URL | |
if not url.startswith(('http://', 'https://')): | |
return "unknown", {"error": "Invalid URL format - must start with http/https"} | |
try: | |
parsed = urlparse(url) | |
metadata["domain"] = parsed.netloc | |
metadata["path"] = parsed.path | |
except Exception as e: | |
return "unknown", {"error": f"URL parsing failed: {e}"} | |
# Method 1: File extension analysis | |
url_lower = url.lower() | |
image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.svg', '.tiff', '.ico'} | |
# Check for image extensions | |
for ext in image_extensions: | |
if url_lower.endswith(ext) or ext in url_lower.split('?')[0]: # Handle query params | |
metadata["detection_method"] = "file_extension" | |
metadata["extension"] = ext | |
return "image", metadata | |
# Method 2: Content-Type header check | |
try: | |
print(f"Checking content type for: {url}") | |
response = requests.head(url, timeout=10, allow_redirects=True) | |
content_type = response.headers.get('content-type', '').lower() | |
metadata["content_type"] = content_type | |
metadata["status_code"] = response.status_code | |
if response.status_code == 200: | |
if any(img_type in content_type for img_type in ['image/', 'image/jpeg', 'image/png', 'image/gif', 'image/webp']): | |
metadata["detection_method"] = "content_type" | |
return "image", metadata | |
else: | |
metadata["error"] = f"HTTP {response.status_code}" | |
except requests.RequestException as e: | |
metadata["error"] = f"Network error: {e}" | |
print(f"Network error checking {url}: {e}") | |
# Method 3: Domain-based detection for common image hosts | |
image_domains = { | |
'imgur.com', 'i.imgur.com', | |
'cdn.discordapp.com', 'media.discordapp.net', | |
'pbs.twimg.com', 'abs.twimg.com', | |
'i.redd.it', 'preview.redd.it', | |
'images.unsplash.com', | |
'via.placeholder.com', | |
'picsum.photos' | |
} | |
domain_lower = metadata.get("domain", "").lower() | |
if any(img_domain in domain_lower for img_domain in image_domains): | |
metadata["detection_method"] = "domain_based" | |
return "image", metadata | |
# Method 4: Guess from MIME types | |
try: | |
mime_type, _ = mimetypes.guess_type(url) | |
if mime_type and mime_type.startswith('image/'): | |
metadata["detection_method"] = "mime_guess" | |
metadata["mime_type"] = mime_type | |
return "image", metadata | |
except Exception: | |
pass | |
return "unknown", metadata | |
def create_structured_prompt(question_text: str, file_url: str = None) -> str: | |
""" | |
Create a structured prompt that provides clear task analysis for the agent. | |
""" | |
if not file_url: | |
return f"""TASK: {question_text} | |
ANALYSIS: This is a text-only question with no attachments. | |
APPROACH: Use available tools (web search, Wikipedia, etc.) as needed to answer accurately.""" | |
file_type, metadata = detect_file_type_robust(file_url) | |
if file_type == "image": | |
return f"""TASK: {question_text} | |
ATTACHMENT ANALYSIS: | |
- Type: Image file detected | |
- URL: {file_url} | |
- Detection method: {metadata.get('detection_method', 'unknown')} | |
- Metadata: {metadata} | |
REASONING REQUIRED: | |
1. This question involves an image that needs to be analyzed | |
2. You must examine the image content to answer the question | |
3. The image URL should be processed directly by your vision capabilities | |
APPROACH: Process the image URL directly with your vision model, then provide a comprehensive answer based on what you see.""" | |
else: | |
error_info = metadata.get('error', 'Unknown file type') | |
return f"""TASK: {question_text} | |
ATTACHMENT ANALYSIS: | |
- URL: {file_url} | |
- Type: Could not identify as supported file type | |
- Error: {error_info} | |
- Metadata: {metadata} | |
REASONING REQUIRED: | |
1. There is an attachment but it's not a recognized image format | |
2. You should attempt to process it as a regular web resource | |
3. Use web search or other tools to gather information about the URL content | |
APPROACH: Use web search or other available tools to gather information about this resource.""" | |
def run_and_submit_all(profile: gr.OAuthProfile | None): | |
""" | |
Fetches all questions, runs the agent on them, submits all answers, | |
and displays the results. | |
""" | |
if not profile: | |
return "Please log in to Hugging Face with the button above to submit.", None | |
username = profile.username | |
print(f"User logged in: {username}") | |
space_id = os.getenv("SPACE_ID") | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
questions_url = f"{DEFAULT_API_URL}/questions" | |
submit_url = f"{DEFAULT_API_URL}/submit" | |
# 1. Instantiate Agent | |
print("Initializing your custom agent...") | |
try: | |
agent_executor = create_agent_executor(provider="groq") | |
except Exception as e: | |
return f"Fatal Error: Could not initialize agent. Check logs. Details: {e}", None | |
# 2. Fetch Questions | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=20) | |
response.raise_for_status() | |
questions_data = response.json() | |
print(f"Fetched {len(questions_data)} questions.") | |
except Exception as e: | |
return f"Error fetching questions: {e}", pd.DataFrame() | |
# DEBUG: Print format of each question | |
print("\n=== QUESTION FORMATS DEBUG ===") | |
for i, item in enumerate(questions_data): | |
print(f"Question {i+1} keys: {list(item.keys())}") | |
print(f"Question {i+1} full data: {item}") | |
print("-" * 50) | |
print("=== END DEBUG ===\n") | |
# 3. Run your Agent | |
results_log, answers_payload = [], [] | |
print(f"Running agent on {len(questions_data)} questions...") | |
for i, item in enumerate(questions_data): | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or question_text is None: | |
continue | |
print(f"\n--- Running Task {i+1}/{len(questions_data)} (ID: {task_id}) ---") | |
# Get file URL if it exists | |
file_url = f"{DEFAULT_API_URL}/files/{task_id}" if item.get("has_file") else None | |
if file_url is None: | |
print ("No File Url") | |
# Create structured prompt with robust file analysis | |
structured_prompt = create_structured_prompt(question_text, file_url) | |
if file_url: | |
file_type, metadata = detect_file_type_robust(file_url) | |
print(f"File analysis: {file_url}") | |
print(f" - Type: {file_type}") | |
print(f" - Detection method: {metadata.get('detection_method', 'unknown')}") | |
if metadata.get('error'): | |
print(f" - Error: {metadata['error']}") | |
print(f"Structured Prompt for Agent:\n{structured_prompt}") | |
try: | |
# Pass the structured prompt to the agent | |
result = agent_executor.invoke({"messages": [("user", structured_prompt)]}) | |
raw_answer = result['messages'][-1].content | |
submitted_answer = parse_final_answer(raw_answer) | |
print(f"Raw LLM Response: '{raw_answer}'") | |
print(f"PARSED FINAL ANSWER: '{submitted_answer}'") | |
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text, | |
"File URL": file_url or "None", | |
"File Type": detect_file_type_robust(file_url)[0] if file_url else "None", | |
"Detection Method": detect_file_type_robust(file_url)[1].get('detection_method', 'N/A') if file_url else "N/A", | |
"Submitted Answer": submitted_answer | |
}) | |
except Exception as e: | |
print(f"!! AGENT ERROR on task {task_id}: {e}") | |
error_msg = f"AGENT RUNTIME ERROR: {e}" | |
answers_payload.append({"task_id": task_id, "submitted_answer": error_msg}) | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text, | |
"File URL": file_url or "None", | |
"File Type": detect_file_type_robust(file_url)[0] if file_url else "None", | |
"Detection Method": "Error", | |
"Submitted Answer": error_msg | |
}) | |
if not answers_payload: | |
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
# 4. Prepare and 5. Submit | |
submission_data = {"username": username, "agent_code": agent_code, "answers": answers_payload} | |
print(f"\nSubmitting {len(answers_payload)} answers for user '{username}'...") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = (f"Submission Successful!\nUser: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}%\n" | |
f"Processed {len([r for r in results_log if 'ERROR' not in r['Submitted Answer']])} successful tasks") | |
return final_status, pd.DataFrame(results_log) | |
except Exception as e: | |
status_message = f"Submission Failed: {e}" | |
print(status_message) | |
return status_message, pd.DataFrame(results_log) | |
# --- Gradio UI --- | |
with gr.Blocks(title="Image-Capable Agent Evaluation") as demo: | |
gr.Markdown("# Image-Capable Agent Evaluation Runner") | |
gr.Markdown("This agent can process images and perform web searches using Groq's vision-capable models.") | |
gr.LoginButton() | |
run_button = gr.Button("Run Evaluation & Submit All Answers", variant="primary") | |
status_output = gr.Textbox(label="Run Status / Submission Result", lines=6, interactive=False) | |
results_table = gr.DataFrame( | |
label="Questions and Agent Answers", | |
wrap=True, | |
row_count=10, | |
column_widths=[80, 200, 120, 100, 80, 200] | |
) | |
run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) | |
if __name__ == "__main__": | |
print("\n" + "-"*30 + " Image Agent App Starting " + "-"*30) | |
demo.launch() |