|
import gradio as gr |
|
from datasets import load_dataset, Dataset |
|
from datetime import datetime, date |
|
import io |
|
import os |
|
from PIL import Image, ImageDraw, ImageFont |
|
from huggingface_hub import login |
|
import requests |
|
import json |
|
import base64 |
|
import re |
|
import time |
|
import pandas as pd |
|
from io import StringIO |
|
|
|
|
|
try: |
|
HF_TOKEN = os.environ.get("HUGGINGFACE_TOKEN") |
|
if HF_TOKEN: |
|
login(token=HF_TOKEN) |
|
print("Logged in to Hugging Face Hub successfully.") |
|
else: |
|
print("HUGGINGFACE_TOKEN environment variable not set.") |
|
except Exception as e: |
|
print(f"Error logging in to Hugging Face Hub: {e}") |
|
|
|
|
|
SCORES_DATASET = "agents-course/unit4-students-scores" |
|
CERTIFICATES_DATASET = "agents-course/course-certificates-of-excellence" |
|
THRESHOLD_SCORE = 30 |
|
|
|
|
|
GAIA_API_BASE_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
|
|
GEMINI_API_URL_BASE = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent" |
|
|
|
|
|
def check_user_score(username): |
|
try: |
|
score_data = load_dataset(SCORES_DATASET, split="train", download_mode="force_redownload", token=HF_TOKEN if HF_TOKEN else True) |
|
matches = [row for row in score_data if row["username"] == username] |
|
return matches[0] if matches else None |
|
except Exception as e: |
|
print(f"Error checking user score: {e}") |
|
return None |
|
|
|
def has_certificate_entry(username): |
|
try: |
|
cert_data = load_dataset(CERTIFICATES_DATASET, split="train", download_mode="force_redownload", token=HF_TOKEN if HF_TOKEN else True) |
|
return any(row["username"] == username for row in cert_data) |
|
except Exception as e: |
|
print(f"Error checking certificate entry: {e}") |
|
return False |
|
|
|
def add_certificate_entry(username, name, score): |
|
try: |
|
ds = load_dataset(CERTIFICATES_DATASET, split="train", download_mode="force_redownload", token=HF_TOKEN if HF_TOKEN else True) |
|
filtered_rows = [row for row in ds if row["username"] != username] |
|
new_entry = { |
|
"username": username, |
|
"score": score, |
|
"name_on_certificate": name, |
|
"timestamp": datetime.now().isoformat() |
|
} |
|
filtered_rows.append(new_entry) |
|
updated_ds = Dataset.from_list(filtered_rows) |
|
updated_ds.push_to_hub(CERTIFICATES_DATASET, token=HF_TOKEN if HF_TOKEN else None) |
|
print(f"Certificate entry added/updated for {username}.") |
|
except Exception as e: |
|
print(f"Error adding certificate entry: {e}") |
|
|
|
def generate_certificate_image(name_on_cert): |
|
try: |
|
current_dir = os.path.dirname(__file__) |
|
certificate_template_path = os.path.join(current_dir, "certificate.png") |
|
font_path = os.path.join(current_dir, "Quattrocento-Regular.ttf") |
|
|
|
if not os.path.exists(certificate_template_path): |
|
alt_cert_path_templates_parent = os.path.join(current_dir,"..", "templates", "certificate.png") |
|
alt_cert_path_root = os.path.join(current_dir, "certificate.png") |
|
|
|
if os.path.exists(alt_cert_path_templates_parent): |
|
certificate_template_path = alt_cert_path_templates_parent |
|
elif os.path.exists(alt_cert_path_root): |
|
certificate_template_path = alt_cert_path_root |
|
else: |
|
raise FileNotFoundError(f"Certificate template not found. Checked default, ../templates/, and root relative to app.py.") |
|
|
|
if not os.path.exists(font_path): |
|
alt_font_path_parent = os.path.join(current_dir, "..","Quattrocento-Regular.ttf") |
|
alt_font_path_root = os.path.join(current_dir, "Quattrocento-Regular.ttf") |
|
|
|
if os.path.exists(alt_font_path_parent): |
|
font_path = alt_font_path_parent |
|
elif os.path.exists(alt_font_path_root): |
|
font_path = alt_font_path_root |
|
else: |
|
raise FileNotFoundError(f"Font file not found. Checked default and parent directory relative to app.py.") |
|
|
|
im = Image.open(certificate_template_path) |
|
d = ImageDraw.Draw(im) |
|
name_font = ImageFont.truetype(font_path, 100) |
|
date_font = ImageFont.truetype(font_path, 48) |
|
name_on_cert = name_on_cert.title() |
|
d.text((1000, 740), name_on_cert, fill="black", anchor="mm", font=name_font) |
|
d.text((1480, 1170), str(date.today()), fill="black", anchor="mm", font=date_font) |
|
pdf_buffer = io.BytesIO() |
|
im.convert("RGB").save(pdf_buffer, format="PDF") |
|
pdf_buffer.seek(0) |
|
return im, pdf_buffer |
|
except FileNotFoundError as fnf_error: |
|
print(fnf_error) |
|
raise |
|
except Exception as e: |
|
print(f"Error generating certificate image: {e}") |
|
raise |
|
|
|
def handle_certificate(name_on_certificate_input, profile: gr.OAuthProfile): |
|
if not profile: |
|
return "You must be logged in with your Hugging Face account.", None, None |
|
username = profile.username |
|
if not name_on_certificate_input.strip(): |
|
return "Please enter the name you want on the certificate.", None, None |
|
user_score_info = check_user_score(username) |
|
if not user_score_info: |
|
return f"No score found for {username}. Please complete Unit 4 first by submitting your agent's answers.", None, None |
|
score = user_score_info.get("score", 0) |
|
if score < THRESHOLD_SCORE: |
|
return f"Your score is {score}. You need at least {THRESHOLD_SCORE} to pass and receive a certificate.", None, None |
|
try: |
|
certificate_image, pdf_bytesio_object = generate_certificate_image(name_on_certificate_input) |
|
add_certificate_entry(username, name_on_certificate_input, score) |
|
temp_pdf_path = f"certificate_{username}.pdf" |
|
with open(temp_pdf_path, "wb") as f: |
|
f.write(pdf_bytesio_object.getvalue()) |
|
return f"Congratulations, {name_on_certificate_input}! You scored {score}. Here's your certificate:", certificate_image, temp_pdf_path |
|
except FileNotFoundError as e: |
|
return f"Critical error: A required file for certificate generation was not found: {e}. Please check Space file structure.", None, None |
|
except Exception as e: |
|
print(f"An unexpected error occurred in handle_certificate: {e}") |
|
return "An unexpected error occurred while generating your certificate. Please try again later.", None, None |
|
|
|
|
|
def get_gaia_api_questions(): |
|
try: |
|
questions_url = f"{GAIA_API_BASE_URL}/questions" |
|
print(f"Attempting to fetch questions from: {questions_url}") |
|
response = requests.get(questions_url, timeout=30) |
|
response.raise_for_status() |
|
return response.json(), None |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching GAIA questions: {e}") |
|
return None, f"Error fetching GAIA questions: {e}" |
|
except Exception as e: |
|
print(f"An unexpected error occurred while fetching questions: {e}") |
|
return None, f"An unexpected error occurred: {e}" |
|
|
|
def get_gaia_file_data_for_task(task_id_for_file_fetch, associated_file_metadata_list): |
|
file_url = f"{GAIA_API_BASE_URL}/files/{task_id_for_file_fetch}" |
|
print(f"Attempting to fetch file for task {task_id_for_file_fetch} from {file_url}") |
|
try: |
|
response = requests.get(file_url, timeout=30) |
|
response.raise_for_status() |
|
raw_bytes = response.content |
|
detected_mime_type = response.headers.get('Content-Type', '').split(';')[0].strip() |
|
file_name = "attached_file" |
|
if associated_file_metadata_list and isinstance(associated_file_metadata_list, list) and len(associated_file_metadata_list) > 0: |
|
first_file_meta = associated_file_metadata_list[0] |
|
if isinstance(first_file_meta, dict) and 'file_name' in first_file_meta: |
|
file_name = first_file_meta['file_name'] |
|
print(f"File fetched for task {task_id_for_file_fetch}. Mime-type: {detected_mime_type}, Name: {file_name}, Size: {len(raw_bytes)} bytes") |
|
return raw_bytes, detected_mime_type, file_name |
|
except requests.exceptions.HTTPError as http_err: |
|
if http_err.response.status_code == 404: |
|
print(f"No file found (404) for task {task_id_for_file_fetch} at {file_url}.") |
|
else: |
|
print(f"HTTP error fetching file for task {task_id_for_file_fetch}: {http_err}") |
|
return None, None, None |
|
except requests.exceptions.RequestException as e: |
|
print(f"Could not fetch file for task {task_id_for_file_fetch}: {e}. Proceeding without file content.") |
|
return None, None, None |
|
except Exception as e_gen: |
|
print(f"Unexpected error fetching file for task {task_id_for_file_fetch}: {e_gen}") |
|
return None, None, None |
|
|
|
def execute_python_code(code_string: str): |
|
""" |
|
Safely executes a string of Python code and captures its standard output. |
|
Returns the captured output or an error message. |
|
""" |
|
print(f"Attempting to execute Python code:\n{code_string[:500]}...") |
|
|
|
old_stdout = sys.stdout |
|
sys.stdout = captured_output = StringIO() |
|
|
|
execution_result = None |
|
error_message = None |
|
|
|
try: |
|
|
|
|
|
|
|
local_namespace = {} |
|
exec(code_string, {"__builtins__": __builtins__}, local_namespace) |
|
|
|
|
|
if 'final_answer' in local_namespace: |
|
execution_result = str(local_namespace['final_answer']) |
|
|
|
except Exception as e: |
|
print(f"Error executing Python code: {e}") |
|
error_message = f"Execution Error: {type(e).__name__}: {e}" |
|
finally: |
|
|
|
sys.stdout = old_stdout |
|
|
|
|
|
printed_output = captured_output.getvalue().strip() |
|
|
|
if execution_result: |
|
|
|
return execution_result, None |
|
elif printed_output: |
|
|
|
return printed_output, None |
|
elif error_message: |
|
|
|
return None, error_message |
|
else: |
|
|
|
return "Python code executed without explicit output or 'final_answer' variable.", None |
|
|
|
|
|
def clean_final_answer(raw_text: str) -> str: |
|
"""More robustly cleans the raw text output from the LLM.""" |
|
if not isinstance(raw_text, str): |
|
return "" |
|
|
|
answer = raw_text.strip() |
|
|
|
|
|
|
|
final_answer_match = re.search(r"FINAL ANSWER:\s*(.*)", answer, re.IGNORECASE | re.DOTALL) |
|
if final_answer_match: |
|
answer = final_answer_match.group(1).strip() |
|
|
|
|
|
common_prefixes = [ |
|
"The answer is", "The final answer is", "So, the answer is", "Therefore, the answer is", |
|
"Based on the information, the answer is", "The correct answer is", "My answer is", |
|
"Okay, the answer is", "Sure, the answer is", "Here is the answer:", "The solution is", |
|
"Answer:", "Result:" |
|
] |
|
for prefix in common_prefixes: |
|
if answer.lower().startswith(prefix.lower()): |
|
answer = answer[len(prefix):].strip() |
|
|
|
if answer.startswith(":") or answer.startswith("."): |
|
answer = answer[1:].strip() |
|
break |
|
|
|
|
|
if len(answer) >= 2: |
|
if (answer.startswith('"') and answer.endswith('"')) or \ |
|
(answer.startswith("'") and answer.endswith("'")): |
|
answer = answer[1:-1].strip() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
answer = re.sub(r'\s*,\s*', ',', answer) |
|
|
|
|
|
if len(answer) > 1 and answer.endswith(".") and not re.search(r"[a-zA-Z0-9]\.[a-zA-Z0-9]", answer): |
|
|
|
|
|
if not answer[:-1].strip().isdigit() and len(answer[:-1].strip().split()) > 3: |
|
pass |
|
else: |
|
answer = answer[:-1].strip() |
|
|
|
return answer |
|
|
|
def my_agent_logic(task_id: str, question: str, files_metadata: list = None): |
|
print(f"Agent (Enhanced Tools + Gemini) processing Task ID: {task_id}, Question: {question}") |
|
if files_metadata: |
|
print(f"File metadata associated: {files_metadata}") |
|
|
|
gemini_api_key = os.environ.get("GEMINI_API_KEY") |
|
if not gemini_api_key: |
|
return f"ERROR_GEMINI_KEY_MISSING_FOR_TASK_{task_id}" |
|
|
|
system_prompt_lines = [ |
|
"You are a general AI assistant. I will ask you a question.", |
|
"Your primary goal is to provide the single, exact, concise, and factual answer to the question.", |
|
"Do not include any conversational fluff, disclaimers, explanations, or any introductory phrases like 'The answer is:'. Your response should be ONLY the answer itself.", |
|
"Do not use markdown formatting unless the question explicitly asks for it.", |
|
"If the question implies a specific format (e.g., a number, a date, a comma-separated list), provide the answer in that format.", |
|
"Do NOT include the phrase 'FINAL ANSWER:' in your response to me.", |
|
"If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise by the question.", |
|
"If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise.", |
|
"If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string.", |
|
"If external files or tool outputs are provided below, use their content if relevant and accessible to answer the question.", |
|
] |
|
|
|
user_question_text_for_gemini = "\n".join(system_prompt_lines) + f"\n\nGAIA Question: {question}" |
|
gemini_parts = [] |
|
|
|
|
|
tool_output_description = "" |
|
file_content_bytes, detected_mime_type, file_name = None, None, None |
|
|
|
if files_metadata: |
|
file_content_bytes, detected_mime_type, file_name = get_gaia_file_data_for_task(task_id, files_metadata) |
|
|
|
if file_content_bytes: |
|
if file_name and file_name.lower().endswith(".py") and detected_mime_type in ["text/x-python", "application/x-python-code", "text/plain"]: |
|
print(f"Detected Python file: {file_name}") |
|
try: |
|
python_code = file_content_bytes.decode('utf-8') |
|
execution_result, exec_error = execute_python_code(python_code) |
|
if exec_error: |
|
tool_output_description += f"\n\nExecution of Python file '{file_name}' failed: {exec_error}" |
|
elif execution_result: |
|
tool_output_description += f"\n\nOutput from executing Python file '{file_name}':\n{execution_result}" |
|
else: |
|
tool_output_description += f"\n\nPython file '{file_name}' executed without specific return or error." |
|
except Exception as e_py_decode: |
|
tool_output_description += f"\n\nError decoding Python file '{file_name}': {e_py_decode}" |
|
|
|
elif detected_mime_type and detected_mime_type.startswith("image/"): |
|
try: |
|
base64_image = base64.b64encode(file_content_bytes).decode('utf-8') |
|
gemini_parts.append({"inline_data": {"mime_type": detected_mime_type, "data": base64_image}}) |
|
tool_output_description += f"\n\nAn image file '{file_name}' ({detected_mime_type}) is provided. Refer to it if relevant." |
|
print(f"Added image {file_name} to Gemini parts for task {task_id}.") |
|
except Exception as e_img: |
|
tool_output_description += f"\n\n[Agent note: Error processing image file '{file_name}': {e_img}]" |
|
|
|
elif detected_mime_type and detected_mime_type.startswith("audio/"): |
|
try: |
|
base64_audio = base64.b64encode(file_content_bytes).decode('utf-8') |
|
gemini_parts.append({"inline_data": {"mime_type": detected_mime_type, "data": base64_audio}}) |
|
tool_output_description += f"\n\nAn audio file '{file_name}' ({detected_mime_type}) is provided. Transcribe or analyze it if relevant to the question." |
|
print(f"Added audio {file_name} to Gemini parts for task {task_id}.") |
|
except Exception as e_audio: |
|
tool_output_description += f"\n\n[Agent note: Error processing audio file '{file_name}': {e_audio}]" |
|
|
|
elif detected_mime_type in ["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "application/vnd.ms-excel", "text/csv"]: |
|
try: |
|
if "csv" in detected_mime_type: df = pd.read_csv(io.BytesIO(file_content_bytes)) |
|
else: df = pd.read_excel(io.BytesIO(file_content_bytes)) |
|
|
|
|
|
preview_rows = min(10, len(df)) |
|
preview_cols = min(5, len(df.columns)) |
|
preview_df = df.iloc[:preview_rows, :preview_cols] |
|
df_description = f"First {preview_rows} rows and first {preview_cols} columns (if available):\n{preview_df.to_string(index=True)}\nTotal rows: {len(df)}, Total columns: {len(df.columns)}." |
|
if len(df.columns) > preview_cols: |
|
df_description += f"\nOther columns include: {list(df.columns[preview_cols:])}" |
|
|
|
tool_output_description += f"\n\nData from spreadsheet '{file_name}':\n{df_description}" |
|
print(f"Added spreadsheet preview for {file_name} to tool output description.") |
|
except Exception as e_xls: |
|
tool_output_description += f"\n\n[Agent note: Unable to parse spreadsheet '{file_name}': {e_xls}]" |
|
|
|
elif detected_mime_type == "text/plain": |
|
try: |
|
text_content = file_content_bytes.decode('utf-8') |
|
tool_output_description += f"\n\nContent of attached text file '{file_name}':\n{text_content[:2000]}" |
|
print(f"Added text file content '{file_name}' to tool output description.") |
|
except Exception as e_txt: |
|
tool_output_description += f"\n\n[Agent note: A text file '{file_name}' was associated but could not be decoded: {e_txt}]" |
|
else: |
|
tool_output_description += f"\n\nNote: A file named '{file_name}' (type: {detected_mime_type or 'unknown'}) is associated. Its content could not be directly processed by current tools." |
|
elif files_metadata : |
|
tool_output_description += f"\n\nNote: File(s) {files_metadata} were listed for this task, but could not be fetched or processed." |
|
|
|
|
|
|
|
|
|
final_user_text_for_gemini = user_question_text_for_gemini + tool_output_description |
|
if not any(p.get("inline_data") for p in gemini_parts): |
|
gemini_parts.append({"text": final_user_text_for_gemini}) |
|
else: |
|
gemini_parts.insert(0, {"text": final_user_text_for_gemini}) |
|
|
|
|
|
payload = { |
|
"contents": [{"role": "user", "parts": gemini_parts}], |
|
"generationConfig": {"temperature": 0.1, "maxOutputTokens": 350} |
|
} |
|
api_url_with_key = f"{GEMINI_API_URL_BASE}?key={gemini_api_key}" |
|
agent_computed_answer = f"ERROR_CALLING_GEMINI_FOR_TASK_{task_id}" |
|
|
|
try: |
|
headers = {"Content-Type": "application/json"} |
|
print(f"Calling Gemini API for task {task_id} with payload structure: {[(k, type(v)) for p in payload['contents'] for part in p['parts'] for k,v in part.items()]}") |
|
response = requests.post(api_url_with_key, headers=headers, json=payload, timeout=90) |
|
response.raise_for_status() |
|
result = response.json() |
|
|
|
if (result.get("candidates") and |
|
result["candidates"][0].get("content") and |
|
result["candidates"][0]["content"].get("parts") and |
|
result["candidates"][0]["content"]["parts"][0].get("text")): |
|
raw_answer_from_gemini = result["candidates"][0]["content"]["parts"][0]["text"].strip() |
|
agent_computed_answer = clean_final_answer(raw_answer_from_gemini) |
|
else: |
|
print(f"Warning: Unexpected response structure from Gemini API for task {task_id}: {json.dumps(result, indent=2)}") |
|
if result.get("promptFeedback") and result["promptFeedback"].get("blockReason"): |
|
block_reason = result["promptFeedback"]["blockReason"] |
|
agent_computed_answer = f"ERROR_GEMINI_PROMPT_BLOCKED_{block_reason}_FOR_TASK_{task_id}" |
|
else: |
|
agent_computed_answer = f"ERROR_PARSING_GEMINI_RESPONSE_FOR_TASK_{task_id}" |
|
except requests.exceptions.Timeout: |
|
agent_computed_answer = f"ERROR_GEMINI_TIMEOUT_FOR_TASK_{task_id}" |
|
except requests.exceptions.RequestException as e: |
|
if e.response is not None: print(f"Gemini API Error Response Status: {e.response.status_code}, Body: {e.response.text}") |
|
agent_computed_answer = f"ERROR_GEMINI_REQUEST_FAILED_FOR_TASK_{task_id}" |
|
except Exception as e: |
|
agent_computed_answer = f"ERROR_UNEXPECTED_IN_AGENT_LOGIC_FOR_TASK_{task_id}" |
|
print(f"Agent (Enhanced Tools + Gemini) computed answer for Task ID {task_id}: {agent_computed_answer}") |
|
return agent_computed_answer |
|
|
|
def run_agent_on_gaia(profile: gr.OAuthProfile, run_all_questions: bool = True): |
|
if not profile: |
|
return "You must be logged in to run the agent.", None, None |
|
log_messages = ["Starting agent run..."] |
|
questions_data, error_msg = get_gaia_api_questions() |
|
if error_msg: |
|
log_messages.append(error_msg) |
|
return "\n".join(log_messages), None, None |
|
if not questions_data: |
|
log_messages.append("No questions retrieved from GAIA API.") |
|
return "\n".join(log_messages), None, None |
|
log_messages.append(f"Retrieved {len(questions_data)} questions from GAIA.") |
|
answers_to_submit = [] |
|
tasks_to_process = questions_data |
|
if not run_all_questions and questions_data: |
|
import random |
|
if not isinstance(questions_data, list) or not questions_data: |
|
log_messages.append("Question data is not a list or is empty, cannot pick random.") |
|
return "\n".join(log_messages), None, None |
|
tasks_to_process = [random.choice(questions_data)] |
|
log_messages.append(f"Processing 1 random question based on user choice.") |
|
elif run_all_questions: |
|
log_messages.append(f"Processing all {len(tasks_to_process)} questions.") |
|
|
|
|
|
global sys |
|
import sys |
|
|
|
for task in tasks_to_process: |
|
task_id = task.get("task_id") |
|
question = task.get("question") |
|
associated_files_metadata = task.get("files", []) |
|
if task_id and question: |
|
log_messages.append(f"\nProcessing Task ID: {task_id}") |
|
log_messages.append(f"Question: {question}") |
|
if associated_files_metadata: |
|
log_messages.append(f"Associated files metadata: {associated_files_metadata}") |
|
submitted_answer = my_agent_logic(task_id, question, associated_files_metadata) |
|
log_messages.append(f"Agent's Answer: {submitted_answer}") |
|
answers_to_submit.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
|
if run_all_questions: |
|
time.sleep(1) |
|
else: |
|
log_messages.append(f"Skipping malformed task: {task}") |
|
if not answers_to_submit: |
|
log_messages.append("No answers were generated by the agent.") |
|
return "\n".join(log_messages), answers_to_submit, answers_to_submit |
|
|
|
def submit_agent_answers(profile: gr.OAuthProfile, answers_for_submission_state): |
|
if not profile: |
|
return "You must be logged in to submit answers." |
|
if not answers_for_submission_state: |
|
return "No answers available to submit. Please run the agent first." |
|
username = profile.username |
|
space_id = os.getenv('SPACE_ID', '') |
|
agent_code_link = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
submission_log_messages = [f"Preparing to submit answers for user: {username}"] |
|
if not space_id: |
|
your_space_name_guess = os.path.basename(os.path.dirname(os.path.abspath(__file__))) |
|
if not your_space_name_guess or your_space_name_guess == 'app': |
|
your_space_name_guess = "YOUR_SPACE_NAME_HERE" |
|
agent_code_link = f"https://huggingface.co/spaces/{username}/{your_space_name_guess}/tree/main" |
|
submission_log_messages.append(f"Warning: SPACE_ID not found. Constructed agent_code_link as: {agent_code_link}. Please verify this link is correct.") |
|
submission_log_messages.append(f"Agent Code Link: {agent_code_link}") |
|
payload = { |
|
"username": username, |
|
"agent_code": agent_code_link, |
|
"answers": answers_for_submission_state |
|
} |
|
try: |
|
submit_url = f"{GAIA_API_BASE_URL}/submit" |
|
print(f"Attempting to submit answers to: {submit_url} with payload: {payload}") |
|
response = requests.post(submit_url, json=payload, timeout=60) |
|
response.raise_for_status() |
|
submission_response = response.json() |
|
submission_log_messages.append(f"Submission successful! Response: {submission_response}") |
|
message = submission_response.get("message") |
|
score = submission_response.get("score") |
|
score_string = submission_response.get("score_string") |
|
final_message = "Submission processed." |
|
if message: final_message = message |
|
elif score_string: final_message = score_string |
|
elif score is not None: final_message = f"Score: {score}" |
|
return "\n".join(submission_log_messages) + f"\n\nβ‘οΈ Result: {final_message}" |
|
except requests.exceptions.Timeout: |
|
error_detail = f"Timeout error submitting answers to GAIA scoring API." |
|
submission_log_messages.append(error_detail) |
|
return "\n".join(submission_log_messages) |
|
except requests.exceptions.RequestException as e: |
|
error_detail = f"Error submitting answers: {e}" |
|
if e.response is not None: |
|
error_detail += f"\nResponse status: {e.response.status_code}" |
|
try: error_detail += f"\nResponse body: {e.response.json()}" |
|
except ValueError: error_detail += f"\nResponse body (text): {e.response.text}" |
|
submission_log_messages.append(error_detail) |
|
return "\n".join(submission_log_messages) |
|
except Exception as e: |
|
submission_log_messages.append(f"An unexpected error occurred during submission: {e}") |
|
return "\n".join(submission_log_messages) |
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# π Agents Course - Unit 4 Final Project") |
|
gr.Markdown("β οΈ **Note**: Due to high demand, you might experience occasional bugs. If something doesn't work, please try again after a moment!") |
|
gr.Markdown("---") |
|
gr.Markdown("Your Hugging Face login token (`HUGGINGFACE_TOKEN`) should be set as a Space Secret for dataset pushes.") |
|
gr.Markdown("Your Gemini API Key (`GEMINI_API_KEY`) **MUST** be set as a Space Secret for the agent to function.") |
|
gr.Markdown(f"**GAIA API Base URL Used:** `{GAIA_API_BASE_URL}`") |
|
gr.LoginButton() |
|
answers_to_submit_state = gr.State([]) |
|
with gr.Tabs(): |
|
with gr.TabItem("π€ Run Agent on GAIA Benchmark"): |
|
gr.Markdown("## Step 1: Run Your Agent & Generate Answers") |
|
gr.Markdown("This agent uses the Gemini API with enhanced tool handling (Python execution, audio, spreadsheets) to generate answers.") |
|
run_all_questions_checkbox = gr.Checkbox(label="Process all questions (unchecked processes 1 random question for testing)", value=True) |
|
run_agent_button = gr.Button("π Fetch Questions & Run My Agent") |
|
gr.Markdown("### Agent Run Log & Generated Answers:") |
|
agent_run_log_display = gr.Textbox(label="Agent Run Log", lines=10, interactive=False) |
|
generated_answers_display = gr.JSON(label="Generated Answers (for review before submission)") |
|
run_agent_button.click( |
|
fn=run_agent_on_gaia, |
|
inputs=[run_all_questions_checkbox], |
|
outputs=[agent_run_log_display, generated_answers_display, answers_to_submit_state] |
|
) |
|
gr.Markdown("## Step 2: Submit Agent's Answers") |
|
gr.Markdown("Once you have reviewed the generated answers, click below to submit them for scoring.") |
|
submit_button = gr.Button("π Submit Answers to GAIA Benchmark") |
|
submission_status_display = gr.Textbox(label="Submission Status", lines=5, interactive=False) |
|
submit_button.click( |
|
fn=submit_agent_answers, |
|
inputs=[answers_to_submit_state], |
|
outputs=[submission_status_display] |
|
) |
|
with gr.TabItem("π
Get Certificate"): |
|
gr.Markdown("# β
How to Get Your Certificate (After Scoring >= 30%)") |
|
gr.Markdown(""" |
|
1. Ensure you are logged in. |
|
2. If your agent scored 30% or higher (after submitting on the 'Run Agent' tab), you can get your certificate. |
|
3. Enter your full name as you want it to appear on the certificate. |
|
4. Click 'Get My Certificate'. |
|
""") |
|
gr.Markdown("---") |
|
gr.Markdown("π **Note**: You must have successfully submitted your agent's answers and achieved a score of **30% or higher**.") |
|
with gr.Row(): |
|
name_input = gr.Text(label="Enter your full name (this will appear on the certificate)") |
|
generate_cert_btn = gr.Button("π Get My Certificate") |
|
output_text_cert = gr.Textbox(label="Certificate Result") |
|
cert_image_display = gr.Image(label="Your Certificate Image", type="pil") |
|
cert_file_download = gr.File(label="Download Certificate (PDF)") |
|
generate_cert_btn.click( |
|
fn=handle_certificate, |
|
inputs=[name_input], |
|
outputs=[output_text_cert, cert_image_display, cert_file_download] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|