Spaces:
Build error
Build error
print("Application script started.") # Debugging print statement | |
import os | |
import gradio as gr | |
import requests | |
import inspect | |
import pandas as pd | |
import cv2 # Import opencv-python for video processing | |
import speech_recognition as sr # Import SpeechRecognition for audio processing | |
from pydub import AudioSegment # Import pydub for audio manipulation | |
import tempfile # Import tempfile for temporary file handling | |
import numpy as np # Import numpy for image processing | |
print("All libraries imported successfully.") # Debugging print | |
# Import libraries for SerpAPI | |
# Corrected import: Import GoogleSearch from google_search_results | |
from google_search_results import GoogleSearch | |
import google.generativeai as genai # Keep the import as the user might add LLM functionality back later | |
print("SerpAPI and GenAI libraries imported successfully.") # Debugging print | |
# Removed the import of google.colab.userdata as it's not available outside Colab | |
# from google.colab import userdata # To access the API key from secrets | |
# --- Get API Keys from Environment Variables --- | |
# SERPAPI_API_KEY and GOOGLE_API_KEY should be set as secrets in your Hugging Face Space | |
SERPAPI_API_KEY = os.getenv('SERPAPI_API_KEY') | |
print(f"SERPAPI_API_KEY (first 5 chars): {SERPAPI_API_KEY[:5] if SERPAPI_API_KEY else 'None'}...") # Debugging API key | |
# Access GOOGLE_API_KEY directly from environment variables using os.getenv() | |
GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY') | |
print(f"GOOGLE_API_KEY (first 5 chars): {GOOGLE_API_KEY[:5] if GOOGLE_API_KEY else 'None'}...") # Debugging API key | |
print("API keys retrieved (or attempted).") # Debugging print | |
# --- Define the default API URL --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" # Updated API URL | |
print(f"DEFAULT_API_URL set to: {DEFAULT_API_URL}") # Debugging print | |
# --- Google Generative AI LLM Initialization --- | |
# Keep LLM initialization but handle potential errors and None state | |
print("Attempting to initialize Google Generative AI model...") # Debugging print before loading | |
gemini_model = None # Initialize to None | |
# The check for GOOGLE_API_KEY and LLM configuration already uses os.getenv() | |
if not GOOGLE_API_KEY: | |
print("Warning: GOOGLE_API_KEY environment variable not set. LLM will not be available.") | |
else: | |
try: | |
# Configure the generative AI library | |
genai.configure(api_key=GOOGLE_API_KEY) | |
print("Google Generative AI configured.") | |
# Initialize the Generative Model | |
# Using a fast and efficient model like gemini-1.5-flash | |
# You can explore other models like 'gemini-1.5-pro' for potentially better results | |
gemini_model = genai.GenerativeModel('gemini-1.5-flash') | |
print("Gemini model initialized successfully.") # Debugging print after successful init | |
except Exception as e: | |
print(f"An error occurred during Google Generative AI initialization: {e}") | |
gemini_model = None # Ensure model is None if initialization fails | |
print("LLM initialization attempted.") # Debugging print | |
# --- Web Search Function (using SerpAPI) --- | |
print("Defining web_search function...") # Debugging print | |
def web_search(query: str) -> list[dict]: | |
# Removed global gemini_model declaration as it's not used here | |
""" | |
Performs a web search using SerpAPI and returns relevant information. | |
Args: | |
query: The search query string. | |
Returns: | |
A list of dictionaries, where each dictionary represents a search result | |
with keys 'title', 'snippet', and 'url'. Returns an empty list if no | |
results are found or an error occurs. | |
""" | |
print(f"web_search called with query: {query[:50]}...") # Debugging web_search call | |
if not SERPAPI_API_KEY: | |
print("SerpAPI key not found in environment variables.") | |
return [] | |
params = { | |
"q": query, | |
"api_key": SERPAPI_API_KEY, | |
"engine": "google", # Use Google search engine | |
"num": 5 # Number of results to fetch | |
} | |
results = [] | |
try: | |
search = GoogleSearch(params) # Use GoogleSearch from the correct package | |
search_results_dict = search.get_dict() # Get results as a dictionary | |
print(f"SerpAPI raw response keys: {search_results_dict.keys() if isinstance(search_results_dict, dict) else 'Response is not a dictionary'}") # Debugging response keys | |
# Log the full SerpAPI response for debugging if organic_results is missing or empty | |
if not isinstance(search_results_dict, dict) or "organic_results" not in search_results_dict or not isinstance(search_results_dict["organic_results"], list) or not search_results_dict["organic_results"]: | |
print(f"SerpAPI response did not contain organic results or had invalid format. Response: {search_results_dict}") | |
search_results = [] # Ensure search_results is empty if no organic results | |
# Extract organic results | |
# Add check that search_results_dict and organic_results are valid | |
if isinstance(search_results_dict, dict) and "organic_results" in search_results_dict and isinstance(search_results_dict["organic_results"], list): | |
print(f"Found {len(search_results_dict['organic_results'])} organic results.") # Debugging result count | |
for result in search_results_dict["organic_results"]: | |
# Add check for None or non-dict result item | |
if result is None or not isinstance(result, dict): | |
print(f"Skipping invalid search result item: {result}") | |
continue | |
item = { | |
'title': result.get('title'), | |
'url': result.get('link'), | |
'snippet': result.get('snippet', 'No snippet available') | |
} | |
search_results.append(item) # Append to search_results | |
except Exception as e: | |
print(f"An error occurred during SerpAPI web search: {e}") | |
return f"An error occurred during web search: {e}" | |
print(f"web_search returning {len(search_results)} results.") # Debugging return count | |
return search_results # Always return a list (empty or with results) | |
# --- Basic Agent Definition (Modified to remove LLM dependency for now) --- | |
print("Defining BasicAgent class...") # Debugging print | |
class BasicAgent: | |
def __init__(self): | |
print("BasicAgent initialized.") # Debugging print before init | |
# Removed global gemini_model declaration as it's not used here | |
# global gemini_model # Access global variable | |
# if gemini_model is None: | |
# print("Warning: Google Generative AI model not successfully loaded before agent initialization.") | |
# else: | |
# print("Google Generative AI model found and ready.") # Debugging print after successful init | |
def process_video(self, video_source: str) -> str: | |
""" | |
Processes a video source (file path or URL), extracts frames, and | |
performs placeholder visual analysis. | |
Args: | |
video_source: Path to the video file or a video URL. | |
Returns: | |
A string summarizing the video processing result or an error message. | |
""" | |
print(f"Processing video source: {video_source}") | |
cap = None | |
try: | |
# Attempt to open the video source | |
# Using cv2.CAP_FFMPEG might help with URLs, but requires FFmpeg | |
# cap = cv2.VideoCapture(video_source, cv2.CAP_FFMPEG) | |
cap = cv2.VideoCapture(video_source) | |
# Check if the video was opened successfully | |
if not cap.isOpened(): | |
print(f"Error: Could not open video source {video_source}") | |
return f"Error: Could not open video source {video_source}" | |
frame_count = 0 | |
while True: | |
# Read a frame from the video | |
ret, frame = cap.read() | |
# If frame was not read successfully, we've reached the end of the video | |
if not ret: | |
print("End of video stream.") | |
break | |
frame_count += 1 | |
# --- Placeholder for visual analysis --- | |
# In a real application, you would perform analysis on the 'frame' object here. | |
# This could involve object detection, scene recognition, etc. | |
# Example placeholder: | |
# gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
# Perform analysis on gray_frame | |
if frame_count % 100 == 0: # Print progress every 100 frames | |
print(f"Processed {frame_count} frames.") | |
print(f"Finished processing video. Total frames extracted: {frame_count}") | |
return f"Successfully processed video. Extracted {frame_count} frames." | |
except Exception as e: | |
print(f"An error occurred during video processing: {e}") | |
return f"An error occurred during video processing: {e}" | |
finally: | |
# Release the video capture object | |
if cap: | |
cap.release() | |
print("Video capture released.") | |
def process_audio(self, audio_source: str) -> str: | |
""" | |
Processes an audio source (file path), extracts speech, and performs | |
placeholder audio analysis. | |
Args: | |
audio_source: Path to the audio file. | |
Returns: | |
A string summarizing the audio processing result or an error message. | |
""" | |
print(f"Processing audio source: {audio_source}") | |
recognizer = sr.Recognizer() | |
try: | |
# Load the audio file | |
audio = AudioSegment.from_file(audio_source) | |
print(f"Audio loaded. Duration: {len(audio)} ms") | |
# Export to a format SpeechRecognition can handle (e.g., WAV) | |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as fp: | |
audio.export(fp.name, format="wav") | |
temp_wav_file = fp.name | |
print(f"Audio exported to temporary WAV: {temp_wav_file}") | |
# Use SpeechRecognition to transcribe the audio | |
with sr.AudioFile(temp_wav_file) as source: | |
print("Reading audio file for transcription...") | |
audio_data = recognizer.record(source) # read the entire audio file | |
print("Audio data recorded.") | |
# Attempt to recognize speech | |
try: | |
print("Attempting speech recognition...") | |
text = recognizer.recognize_google(audio_data) # Using Google Web Speech API | |
print(f"Transcription result: {text}") | |
return f"Audio processed. Transcription: '{text}'" | |
except sr.UnknownValueError: | |
print("Speech Recognition could not understand audio") | |
return "Audio processed, but could not understand speech." | |
except sr.RequestError as e: | |
print(f"Could not request results from Google Speech Recognition service; {e}") | |
return f"Audio processed, but speech recognition service failed: {e}" | |
except Exception as e: | |
print(f"An unexpected error occurred during speech recognition: {e}") | |
return f"An unexpected error occurred during speech recognition: {e}" | |
except Exception as e: | |
print(f"An error occurred during audio processing: {e}") | |
return f"An error occurred during audio processing: {e}" | |
finally: | |
# Clean up the temporary WAV file | |
if 'temp_wav_file' in locals() and os.path.exists(temp_wav_file): | |
os.remove(temp_wav_file) | |
print(f"Temporary WAV file removed: {temp_wav_file}") | |
def __call__(self, question: str, video_source: str | None = None, audio_source: str | None = None) -> str: | |
# Removed global gemini_model declaration as it's not used here | |
print(f"Agent received question (first 50 chars): {question[:50]}...") | |
print(f"Video source provided: {video_source}") | |
print(f"Audio source provided: {audio_source}") | |
# --- Check for media processing tasks --- | |
media_processing_results = [] | |
if video_source: | |
print("Video source provided. Attempting video processing.") | |
video_processing_result = self.process_video(video_source) | |
media_processing_results.append(f"Video processing result: {video_processing_result}") | |
if audio_source: | |
print("Audio source provided. Attempting audio processing.") | |
audio_processing_result = self.process_audio(audio_source) | |
media_processing_results.append(f"Audio processing result: {audio_processing_result}") | |
# If media was processed, return the results for now | |
if media_processing_results: | |
return "\n".join(media_processing_results) | |
# Simple logic to determine if a web search is needed (only if no media source) | |
question_lower = question.lower() | |
search_keywords = ["what is", "how to", "where is", "who is", "when did", "define", "explain", "tell me about"] | |
needs_search = any(keyword in question_lower for keyword in search_keywords) or "?" in question | |
print(f"Needs search: {needs_search}") # Debugging search decision | |
# --- Analyze question and refine search query --- | |
# Simplified search query generation - removed LLM query generation | |
search_query = question # Default search query is the original question | |
if needs_search: | |
print("Analyzing question for keywords and refining search query...") | |
# Basic keyword extraction: split by common question words and take the rest | |
parts = question_lower.split("what is", 1) | |
if len(parts) > 1: | |
search_query = parts[1].strip() | |
else: | |
parts = question_lower.split("how to", 1) | |
if len(parts) > 1: | |
search_query = parts[1].strip() | |
else: | |
parts = question_lower.split("where is", 1) | |
if len(parts) > 1: | |
search_query = parts[1].strip() | |
else: | |
parts = question_lower.split("who is", 1) | |
if len(parts) > 1: | |
search_query = parts[1].strip() | |
else: | |
parts = question_lower.split("when did", 1) | |
if len(parts) > 1: | |
search_query = parts[1].strip() | |
else: | |
parts = question_lower.split("define", 1) | |
if len(parts) > 1: | |
search_query = parts[1].strip() | |
else: | |
parts = question_lower.split("explain", 1) | |
if len(parts) > 1: | |
search_query = parts[1].strip() | |
else: | |
parts = question_lower.split("tell me about", 1) | |
if len(parts) > 1: | |
search_query = parts[1].strip() | |
else: | |
# If no specific question keyword found, use the whole question | |
search_query = question_lower.strip() | |
# Optional: Add quotation marks for multi-word phrases if identified | |
# This simple approach just uses the extracted part as is. | |
# A more complex approach would identify multi-word entities (e.g., "New York City") | |
# and wrap them in quotes. | |
# Optional: Add contextual terms | |
# Example: If "musician" or "band" is in the question, add "discography" | |
if any(word in question_lower for word in ["musician", "band", "artist", "singer"]): | |
search_query += " discography" | |
elif any(word in question_lower for word in ["movie", "film", "actor", "actress"]): | |
search_query += " plot summary" | |
elif any(word in question_lower for word in ["book", "author", "novel"]): | |
search_query += " plot summary" | |
print(f"Final search query used: {search_query}") # Debugging final query | |
search_results = [] # Initialize search_results to an empty list before the try block | |
if needs_search: | |
print(f"Question likely requires search. Searching for: {search_query}") | |
try: | |
params = { # Define params here, before calling GoogleSearch | |
"q": search_query, | |
"api_key": SERPAPI_API_KEY, | |
"engine": "google", # Use Google search engine | |
"num": 5 # Number of results to fetch | |
} | |
search = GoogleSearch(params) # Use GoogleSearch from the correct package | |
search_results_dict = search.get_dict() # Get results as a dictionary | |
print(f"SerpAPI raw response keys: {search_results_dict.keys() if isinstance(search_results_dict, dict) else 'Response is not a dictionary'}") # Debugging response keys | |
# Log the full SerpAPI response for debugging if organic_results is missing or empty | |
if not isinstance(search_results_dict, dict) or "organic_results" not in search_results_dict or not isinstance(search_results_dict["organic_results"], list) or not search_results_dict["organic_results"]: | |
print(f"SerpAPI response did not contain organic results or had invalid format. Response: {search_results_dict}") | |
search_results = [] # Ensure search_results is empty if no organic results | |
# Extract organic results | |
# Add check that search_results_dict and organic_results are valid | |
if isinstance(search_results_dict, dict) and "organic_results" in search_results_dict and isinstance(search_results_dict["organic_results"], list): | |
print(f"Found {len(search_results_dict['organic_results'])} organic results.") # Debugging result count | |
for result in search_results_dict["organic_results"]: | |
# Add check for None or non-dict result item | |
if result is None or not isinstance(result, dict): | |
print(f"Skipping invalid search result item: {result}") | |
continue | |
item = { | |
'title': result.get('title'), | |
'url': result.get('link'), | |
'snippet': result.get('snippet', 'No snippet available') | |
} | |
search_results.append(item) # Append to search_results | |
except Exception as e: | |
print(f"An error occurred during SerpAPI web search: {e}") | |
return f"An error occurred during web search: {e}" | |
print(f"web_search returning {len(search_results)} results.") # Debugging return count | |
# --- Use LLM to process search results if available (Removed LLM Synthesis) --- | |
# Check that search_results is a list and is not empty | |
if isinstance(search_results, list) and search_results and gemini_model is not None: | |
print("Using Google LLM to process search results.") # Debugging print before LLM call | |
# Format search results for the LLM | |
context = "" | |
for i, result in enumerate(search_results[:5]): # Use top 5 results for context | |
# Add check for None or non-dict result item before accessing keys | |
if result is None or not isinstance(result, dict): | |
print(f"Skipping invalid result at index {i} in LLM context formatting: {result}") | |
continue | |
context += f"Source {i+1}:\n" | |
if result.get('title'): | |
context += f"Title: {result['title']}\n" | |
if result.get('snippet'): | |
context += f"Snippet: {result['snippet']}\n" | |
if result.get('url'): | |
context += f"URL: {result['url']}\n" | |
context += "---\n" # Separator | |
# Refined prompt for the LLM | |
prompt = f"""Carefully read the following search results and answer the user's question based *only* on the information provided in these results. | |
If the search results do not contain sufficient information to fully answer the question, explicitly state that you could not find enough information in the provided results. | |
Do not use any outside knowledge. Structure your answer clearly and concisely. | |
Question: {question} | |
Search Results: | |
{context} | |
Answer:""" | |
print(f"LLM Prompt (first 500 chars):\n{prompt[:500]}...") # Debugging prompt | |
try: | |
# Generate content using the Gemini model | |
response = gemini_model.generate_content(prompt) | |
generated_text = response.text # Get the generated text | |
# Add check for empty or whitespace generated text | |
if generated_text and generated_text.strip(): | |
llm_answer = generated_text.strip() | |
print(f"LLM generated text (first 100 chars): {generated_text[:100]}...") # Debugging raw output | |
print(f"Agent returning LLM-based answer (first 100 chars): {llm_answer[:100]}...") # Debugging final answer | |
return llm_answer | |
else: | |
print("LLM generated empty or whitespace answer.") | |
return "I couldn't generate a specific answer based on the search results." | |
except Exception as llm_e: | |
print(f"An error occurred during LLM generation: {llm_e}") | |
return f"An error occurred while processing search results with the LLM: {llm_e}" | |
# Fallback if search results are empty or not a list, or LLM is None | |
elif isinstance(search_results, list) and search_results: # Search results exist and is a list, but LLM is not available or failed | |
print("Google Generative AI model not loaded or search results empty or LLM failed. Cannot use LLM for synthesis.") | |
# Return the old style answer if LLM is not available, but only if search results exist | |
print("Returning basic answer based on search results (LLM not available).") | |
answer_parts = [] | |
for i, result in enumerate(search_results[:3]): | |
# Add check for None or non-dict result item before accessing keys | |
if result is None or not isinstance(result, dict): | |
print(f"Skipping invalid result at index {i} in basic answer formatting: {result}") | |
continue | |
if result.get('snippet'): | |
# Limit snippet length to avoid overly long responses | |
snippet = result['snippet'] | |
if len(snippet) > 200: | |
snippet = snippet[:200] + "..." | |
answer_parts.append(f"Snippet {i+1}: {snippet}") | |
elif result.get('title'): | |
answer_parts.append(f"Result {i+1} Title: {result['title']}") | |
if answer_parts: | |
return "Based on web search (LLM not available):\n" + "\n".join(answer_parts) | |
else: | |
# Fallback if no useful snippets/titles found in search results | |
print("No useful snippets/titles found in search results.") | |
return "I couldn't find useful information in the search results (LLM not available)." | |
else: # search_results is None or not a list, or empty | |
print(f"Web search returned no results or results in invalid format. Type: {type(search_results)}") | |
return "I couldn't find any relevant information on the web for your question." | |
else: # needs_search is True but no search results were returned (this case is now covered by the try-except around web_search) | |
# This else block should ideally not be reached if needs_search is True and web_search is called | |
print("Question required search, but no search was performed or it failed.") | |
return "I couldn't perform a web search for your question." | |
else: | |
# If no search is needed, return a default or simple response | |
print("Question does not appear to require search. Returning fixed answer.") | |
fixed_answer = "How can I help you?" | |
return fixed_answer | |
def run_and_submit_all( profile: gr.OAuthProfile | None, other_arg=None): # Modified to accept 2 arguments | |
""" | |
Fetches all questions, runs the BasicAgent on them, submits all answers, | |
and displays the results. | |
""" | |
print("run_and_submit_all function started.") # Debugging print at function start | |
# --- Determine HF Space Runtime URL and Repo URL --- | |
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code | |
if profile: | |
username= f"{profile.username}" | |
print(f"User logged in: {username}") | |
else: | |
print("User not logged in.") | |
return "Please Login to Hugging Face with the button.", None | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" # Updated endpoint | |
submit_url = f"{api_url}/submit" # Updated endpoint | |
# 1. Instantiate Agent ( modify this part to create your agent) | |
print("Attempting to instantiate BasicAgent...") # Debugging print before instantiation | |
try: | |
agent = BasicAgent() | |
print("BasicAgent instantiated successfully.") # Debugging print after instantiation | |
except Exception as e: | |
print(f"Error instantiating agent: {e}") | |
return f"Error initializing agent: {e}", None | |
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
print(agent_code) | |
# 2. Fetch Questions | |
print(f"Fetching questions from: {questions_url}") | |
questions_data = None # Initialize to None | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
# Add check for empty or non-list questions_data immediately after fetching | |
if not isinstance(questions_data, list) or not questions_data: | |
print(f"Fetched questions_data is empty or not a list. Type: {type(questions_data)}") | |
return "Fetched questions list is empty or invalid format.", None | |
print(f"Fetched {len(questions_data)} questions.") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching questions: {e}") | |
return f"Error fetching questions: {e}", None | |
except requests.exceptions.JSONDecodeError as e: | |
print(f"Error decoding JSON response from questions endpoint: {e}") | |
# Print the response text for debugging if JSON decoding fails | |
print(f"Response text: {response.text[:500] if 'response' in locals() else 'No response object'}") | |
return f"Error decoding server response for questions: {e}", None | |
except Exception as e: | |
print(f"An unexpected error occurred fetching questions: {e}") | |
return f"An unexpected error occurred fetching questions: {e}", None | |
# 3. Run your Agent | |
results_log = [] | |
answers_payload = [] | |
print(f"Running agent on {len(questions_data)} questions...") | |
# The check that questions_data is a list is now done immediately after fetching | |
for item in questions_data: | |
# Add check for None or non-dict item before accessing keys | |
if item is None or not isinstance(item, dict): | |
print(f"Skipping invalid item in questions_data: {item}") | |
continue | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or not isinstance(task_id, (str, int)) or not question_text or not isinstance(question_text, str): | |
print(f"Skipping item with missing or invalid task_id or question: {item}") | |
continue | |
print(f"Processing Task ID: {task_id}") # Debugging task ID | |
try: | |
# Here, we only pass the question text for now, as the API doesn't support video input | |
# The video processing logic is added but not triggered by this function | |
submitted_answer = agent(question_text) | |
print(f"Agent returned answer for {task_id}: {submitted_answer[:50]}...") # Debugging returned answer | |
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
except Exception as e: | |
print(f"Error running agent on task {task_id}: {e}") | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
if not answers_payload: | |
print("Agent did not produce any answers to submit.") | |
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
# 4. Prepare Submission | |
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
print(status_update) | |
# 5. Submit | |
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = ( | |
f"Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
print("Submission successful.") | |
results_df = pd.DataFrame(results_log) | |
return final_status, results_df | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() | |
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
status_message = f"Submission Failed: {error_detail}" | |
print(status_message) | |
# If submission fails, also return the results log so the user can see what was attempted | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.JSONDecodeError: | |
error_detail += f" Response: {e.response.text[:500]}" | |
status_message = f"Submission Failed: {error_detail}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
status_message = f"Submission Failed: {error_detail}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.Timeout: | |
status_message = "Submission Failed: The request timed out." | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.RequestException as e: | |
status_message = f"Submission Failed: Network error - {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except Exception as e: | |
status_message = f"An unexpected error occurred during submission: {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
# Function to call process_video directly for testing | |
def test_video_processing(video_source: str) -> str: | |
print(f"Testing video processing with source: {video_source}") | |
try: | |
agent = BasicAgent() | |
return agent.process_video(video_source) | |
except Exception as e: | |
return f"Error during video processing test: {e}" | |
# Function to call process_audio directly for testing | |
def test_audio_processing(audio_source: str) -> str: | |
print(f"Testing audio processing with source: {audio_source}") | |
try: | |
agent = BasicAgent() | |
return agent.process_audio(audio_source) | |
except Exception as e: | |
return f"Error during audio processing test: {e}" | |
# Move Gradio interface definition and launch outside the function | |
print("Defining Gradio interface...") # Debugging print | |
with gr.Blocks(theme=gr.themes.Soft(), title="Basic Agent Evaluation Runner") as demo: | |
gr.Markdown( | |
""" | |
# Basic Agent Evaluation Runner | |
This application fetches a set of questions from a scoring API, | |
runs your custom agent against each question, and submits the answers for scoring. | |
**Instructions:** | |
1. Ensure your agent logic is defined in the `BasicAgent` class above. | |
2. **Get a SerpAPI key and a Google AI API key and add them as environment variables in your runtime environment (e.g., as secrets in your Hugging Face Space settings).** | |
3. Log in to Hugging Face using the button below. | |
4. Click the "Run Evaluation & Submit All Answers" button to run on predefined questions. | |
5. Use the "Test Video Processing" and "Test Audio Processing" sections to test media analysis. | |
""" | |
) | |
login_btn = gr.LoginButton() | |
run_button = gr.Button("Run Evaluation & Submit All Answers") | |
run_button.interactive = True # Re-enable the button | |
status_output = gr.Textbox(label="Run Status", interactive=False, lines=5) | |
results_output = gr.DataFrame(label="Evaluation Results") | |
run_button.click( | |
run_and_submit_all, | |
inputs=[login_btn], # Pass the profile from the login button | |
outputs=[status_output, results_output] | |
) | |
gr.Markdown("---") # Separator | |
gr.Markdown("## Test Media Processing") | |
video_test_input = gr.Video(label="Upload Video or Paste URL") | |
video_test_button = gr.Button("Test Video Processing") | |
video_test_output = gr.Textbox(label="Video Processing Result", interactive=False) | |
video_test_button.click( | |
test_video_processing, | |
inputs=[video_test_input], | |
outputs=[video_test_output] | |
) | |
audio_test_input = gr.Audio(label="Upload Audio or Paste URL") | |
audio_test_button = gr.Button("Test Audio Processing") | |
audio_test_output = gr.Textbox(label="Audio Processing Result", interactive=False) | |
audio_test_button.click( | |
test_audio_processing, | |
inputs=[audio_test_input], | |
outputs=[audio_test_output] | |
) | |
print("Gradio interface defined.") # Debugging print | |
# Ensure the app launches when the script is run | |
print("Checking if script is run directly...") # Debugging print | |
if __name__ == "__main__": | |
print("Launching Gradio demo...") # Debugging print | |
demo.launch(server_name="0.0.0.0") # Ensure binding to all interfaces | |
print("Script finished.") # Debugging print |