Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import pandas as pd | |
| from bs4 import BeautifulSoup | |
| import datetime | |
| import pytz | |
| import math | |
| import re | |
| import requests | |
| import traceback | |
| import sys | |
| import torch | |
| import transformers | |
| from torch.cuda import memory_allocated, memory_reserved | |
| # --- Transformers Imports --- | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| # --- LangChain Imports (Core) --- | |
| from langchain_huggingface import HuggingFacePipeline | |
| from langchain_core.prompts import PromptTemplate | |
| from langchain.tools import Tool | |
| from langchain.memory import ConversationBufferWindowMemory | |
| from langchain.llms.base import LLM | |
| print(f"--- Using transformers version: {transformers.__version__} ---") | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| # --- Tool Definitions (LangChain Style) --- | |
| def get_current_time_in_timezone_func(timezone: str) -> str: | |
| """A tool that fetches the current local time in a specified IANA timezone. Always use this tool for questions about the current time. Input should be a valid timezone string (e.g., 'America/New_York', 'Europe/London').""" | |
| print(f"--- Tool: Executing get_current_time_in_timezone for: {timezone} ---") | |
| try: | |
| tz = pytz.timezone(timezone) | |
| local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S %Z%z") | |
| return f"The current local time in {timezone} is: {local_time}" | |
| except pytz.exceptions.UnknownTimeZoneError: | |
| return f"Error: Unknown timezone '{timezone}'. Please use a valid IANA timezone name." | |
| except Exception as e: | |
| return f"Error fetching time for timezone '{timezone}': {str(e)}" | |
| def safe_calculator_func(expression: str) -> str: | |
| """A tool for evaluating simple mathematical expressions. Use this tool *only* for calculations involving numbers, +, -, *, /, %, parentheses, and the math functions: sqrt, pow. Do not use it to run other code.""" | |
| print(f"--- Tool: Executing safe_calculator with expression: {expression} ---") | |
| try: | |
| allowed_names = {"sqrt": math.sqrt, "pow": math.pow, "pi": math.pi} | |
| # Safely evaluate the expression | |
| if not re.match(r"^[0-9+\-*/().\s,math.sqrtpowpi]+$", expression): | |
| raise ValueError("Invalid characters in expression") | |
| result = eval(expression, {"__builtins__": {}}, allowed_names) | |
| return str(result) | |
| except Exception as e: | |
| print(f"Error during calculation for '{expression}': {e}") | |
| return f"Error calculating '{expression}': Invalid expression or calculation error ({e})." | |
| class SlicedLLM(LLM): | |
| """ | |
| Light wrapper around any LangChain LLM (we'll use the HuggingFacePipeline wrapper). | |
| Responsibilities: | |
| - Call the inner LLM | |
| - Extract text robustly from different return shapes | |
| - Truncate to `max_chars` from the end (keeps the most recent reasoning) | |
| - Strip instruction echoing by keeping from the last 'Thought:' if present | |
| """ | |
| def __init__(self, inner_llm, max_chars: int = 2048, **kwargs): | |
| self.inner_llm = inner_llm | |
| self.max_chars = int(max_chars) | |
| # required for LangChain LLM subclasses | |
| self.max_retries = kwargs.get("max_retries", 1) | |
| def _llm_type(self) -> str: | |
| return "sliced-llm" | |
| def _call(self, prompt: str, stop=None) -> str: | |
| """ | |
| Core call entrypoint used by LangChain. We call the inner LLM and then post-process. | |
| """ | |
| # 1) Call inner LLM (it may expose _call or be callable) | |
| raw = None | |
| # inner may be a LangChain LLM (with _call) or a callable pipeline | |
| if hasattr(self.inner_llm, "_call"): | |
| raw = self.inner_llm._call(prompt, stop=stop) | |
| else: | |
| # fallback - call and try to extract text | |
| # Many pipeline wrappers accept a string and return text or list | |
| raw = self.inner_llm(prompt) | |
| # 2) Extract text from common return shapes | |
| text = self._extract_text(raw) | |
| # 3) Attempt to remove repeated instruction blocks by finding last 'Thought:' anchor | |
| # We keep text from the last "Thought:" onward if that appears in the output. | |
| # This removes prompt-echoed instruction blocks that often appear earlier in the string. | |
| last_thought_idx = text.rfind("\nThought:") | |
| if last_thought_idx >= 0: | |
| # keep from the last Thought: (include the marker so parser sees it) | |
| text = text[last_thought_idx + 1 :] # +1 to keep leading newline trimmed | |
| # 4) Truncate to keep the most recent reasoning / final answer | |
| if len(text) > self.max_chars: | |
| text = text[-self.max_chars :] | |
| # 5) Strip leading/trailing whitespace | |
| return text.strip() | |
| def _extract_text(self, raw): | |
| """ | |
| Handle possible return formats: | |
| - plain str | |
| - list/dict results from HF pipeline | |
| - objects exposing .content or ['generated_text'] | |
| """ | |
| # Direct string | |
| if isinstance(raw, str): | |
| return raw | |
| # If it's a list (transformers pipeline may return list of dicts) | |
| if isinstance(raw, (list, tuple)) and len(raw) > 0: | |
| first = raw[0] | |
| if isinstance(first, dict): | |
| # common keys: 'generated_text', 'text' | |
| for k in ("generated_text", "text", "output_text"): | |
| if k in first: | |
| return str(first[k]) | |
| # else stringify the dict | |
| return str(first) | |
| else: | |
| return str(first) | |
| # If it's a dict with 'generated_text' etc. | |
| if isinstance(raw, dict): | |
| for k in ("generated_text", "text", "output_text"): | |
| if k in raw: | |
| return str(raw[k]) | |
| # fallback to string repr | |
| return str(raw) | |
| # Last resort: string conversion | |
| return str(raw) | |
| def _identifying_params(self): | |
| return {"inner": getattr(self.inner_llm, "_llm_type", None), "max_chars": self.max_chars} | |
| # --- Completely rewritten LangChainAgentWrapper (drop-in) --- | |
| class LangChainAgentWrapper: | |
| """ | |
| Rewritten, robust LangChain agent wrapper: | |
| - loads Gemma model (model_id variable) | |
| - wraps HF pipeline into HuggingFacePipeline (LangChain) | |
| - wraps that into SlicedLLM to truncate / clean model outputs | |
| - builds ReAct prompt (contains {tools} and {tool_names}) | |
| - creates agent with create_react_agent + AgentExecutor | |
| """ | |
| def __init__( | |
| self, | |
| model_id: str = "google/gemma-2b-it", | |
| max_new_tokens: int = 96, | |
| max_chars: int = 2048, | |
| max_iterations: int = 2, | |
| ): | |
| print("Initializing LangChainAgentWrapper...") | |
| try: | |
| # Lazy/delayed imports | |
| from langchain.agents import AgentExecutor, create_react_agent | |
| from langchain_community.tools import DuckDuckGoSearchRun | |
| # --- Tokenizer & Model --- | |
| print(f"Loading tokenizer for: {model_id}") | |
| tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| print(f"Loading model: {model_id}") | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_id, | |
| torch_dtype=torch.bfloat16, | |
| device_map="auto", | |
| offload_folder="offload", | |
| ) | |
| print("Model loaded successfully.") | |
| print(f"Allocated: {memory_allocated()/1e9:.2f} GB | Reserved: {memory_reserved()/1e9:.2f} GB") | |
| # --- HF pipeline (transformers) with safe defaults --- | |
| llm_pipeline = transformers.pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| max_new_tokens=max_new_tokens, | |
| return_full_text=False, | |
| pad_token_id=tokenizer.eos_token_id, | |
| eos_token_id=tokenizer.eos_token_id, | |
| ) | |
| print("Transformers pipeline created successfully.") | |
| # --- Wrap pipeline into LangChain HuggingFacePipeline LLM --- | |
| base_lc_llm = HuggingFacePipeline(pipeline=llm_pipeline) | |
| # --- Wrap that LLM into our slicer to keep outputs trimmed and to strip instruction echoes --- | |
| self.llm = SlicedLLM(base_lc_llm, max_chars=max_chars) | |
| print("SlicedLLM wrapper created successfully.") | |
| # --- Tools --- | |
| print("Defining tools...") | |
| search_tool = DuckDuckGoSearchRun( | |
| name="web_search", | |
| description="Web search via DuckDuckGo for up-to-date facts/events." | |
| ) | |
| self.tools = [ | |
| Tool( | |
| name="get_current_time_in_timezone", | |
| func=get_current_time_in_timezone_func, | |
| description=get_current_time_in_timezone_func.__doc__ | |
| ), | |
| search_tool, | |
| Tool( | |
| name="safe_calculator", | |
| func=safe_calculator_func, | |
| description=safe_calculator_func.__doc__ | |
| ), | |
| ] | |
| print(f"Tools prepared: {[t.name for t in self.tools]}") | |
| # --- ReAct prompt (must contain {tools} and {tool_names}) --- | |
| react_prompt = PromptTemplate( | |
| input_variables=["tools", "tool_names", "agent_scratchpad"], | |
| template=""" | |
| DO NOT REPEAT OR PARAPHRASE ANY PART OF THIS PROMPT. | |
| You are an assistant that strictly follows the ReAct format. | |
| You can use these tools: | |
| {tools} | |
| Valid tool names: {tool_names} | |
| When responding, follow this exact grammar and include nothing else: | |
| Thought: <brief reasoning> | |
| Action: <one of {tool_names} OR "none"> | |
| Action Input: <input for the action> | |
| (If you choose an action other than "none", the system will insert an Observation before you continue.) | |
| If Action is "none", finish by outputting: | |
| Final Answer: <short direct answer> | |
| {agent_scratchpad} | |
| Thought: | |
| """, | |
| ) | |
| # --- Create agent + executor --- | |
| print("Creating agent...") | |
| agent = create_react_agent(self.llm, self.tools, react_prompt) | |
| self.agent_executor = AgentExecutor( | |
| agent=agent, | |
| tools=self.tools, | |
| verbose=True, | |
| handle_parsing_errors=True, | |
| max_iterations=max_iterations, | |
| ) | |
| print("LangChain agent created successfully.") | |
| except Exception as e: | |
| print(f"CRITICAL ERROR: Failed to initialize LangChain agent: {e}") | |
| traceback.print_exc() | |
| raise RuntimeError(f"LangChain agent initialization failed: {e}") from e | |
| def __call__(self, question: str) -> str: | |
| """ | |
| Run the agent on a single question. | |
| We rely on AgentExecutor to manage the ReAct loops. | |
| """ | |
| print(f"\n--- LangChainAgentWrapper received question: {question[:140]}... ---") | |
| try: | |
| # AgentExecutor expects {"input": question} | |
| response = self.agent_executor.invoke({"input": question}) | |
| return response.get("output", "No output found.") | |
| except Exception as e: | |
| print(f"ERROR: LangChain agent execution failed: {e}") | |
| traceback.print_exc() | |
| # Return an informative string so the outer code can still submit something | |
| return f"Agent Error: Failed to process the question. Details: {e}" | |
| def __call__(self, question: str) -> str: | |
| """ | |
| Run the agent on a single question. We rely on the AgentExecutor to manage | |
| ReAct loops and tool invocations. Exceptions are caught and printed; the | |
| returned string will contain error details in that case. | |
| """ | |
| print(f"\n--- LangChainAgentWrapper received question: {question[:100]}... ---") | |
| try: | |
| # AgentExecutor expects a dict with the input under the key "input" | |
| response = self.agent_executor.invoke({"input": question}) | |
| # The LangChain executor returns a complex structure; .get("output") was used previously. | |
| # Returning response.get("output", "No output found.") preserves prior behaviour. | |
| return response.get("output", "No output found.") | |
| except Exception as e: | |
| print(f"ERROR: LangChain agent execution failed: {e}") | |
| traceback.print_exc() | |
| return f"Agent Error: Failed to process the question. Details: {e}" | |
| # --- Main Evaluation Logic --- | |
| def run_and_submit_all(profile: gr.OAuthProfile | None): | |
| space_id = os.getenv("SPACE_ID") | |
| if profile: | |
| username= f"{profile.username}" | |
| print(f"User logged in: {username}") | |
| else: | |
| print("User not logged in.") | |
| return "Please Login to Hugging Face with the button.", None | |
| api_url = DEFAULT_API_URL | |
| questions_url = f"{api_url}/questions" | |
| submit_url = f"{api_url}/submit" | |
| try: | |
| agent = LangChainAgentWrapper() | |
| except Exception as e: | |
| print(f"Error instantiating agent: {e}") | |
| return f"Error initializing agent: {e}", None | |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
| print(agent_code) | |
| print(f"Fetching questions from: {questions_url}") | |
| try: | |
| response = requests.get(questions_url, timeout=15) | |
| response.raise_for_status() | |
| questions_data = response.json() | |
| if not questions_data: | |
| print("Fetched questions list is empty.") | |
| return "Fetched questions list is empty or invalid format.", None | |
| print(f"Fetched {len(questions_data)} questions.") | |
| except Exception as e: | |
| print(f"An unexpected error occurred fetching questions: {e}") | |
| traceback.print_exc() | |
| return f"An unexpected error occurred fetching questions: {e}", None | |
| results_log = [] | |
| answers_payload = [] | |
| print(f"Running agent on {len(questions_data)} questions...") | |
| for item in questions_data: | |
| task_id = item.get("task_id") | |
| question_text = item.get("question") | |
| if not task_id or question_text is None: | |
| print(f"Skipping item with missing task_id or question: {item}") | |
| continue | |
| try: | |
| submitted_answer = agent(question_text) | |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
| except Exception as e: | |
| print(f"Error running agent on task {task_id}: {e}") | |
| traceback.print_exc() | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
| if not answers_payload: | |
| print("Agent did not produce any answers to submit.") | |
| return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
| submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
| status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
| print(status_update) | |
| print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
| try: | |
| response = requests.post(submit_url, json=submission_data, timeout=60) | |
| response.raise_for_status() | |
| result_data = response.json() | |
| final_status = ( | |
| f"Submission Successful!\n" | |
| f"User: {result_data.get('username')}\n" | |
| f"Overall Score: {result_data.get('score', 'N/A')}% " | |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
| f"Message: {result_data.get('message', 'No message received.')}" | |
| ) | |
| print("Submission successful.") | |
| results_df = pd.DataFrame(results_log) | |
| return final_status, results_df | |
| except Exception as e: | |
| status_message = f"An unexpected error occurred during submission: {e}" | |
| print(status_message) | |
| traceback.print_exc() | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| # --- Build Gradio Interface using Blocks --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Basic Agent Evaluation Runner") | |
| gr.Markdown( | |
| """ | |
| **Instructions:** | |
| 1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... | |
| 2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. | |
| 3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. | |
| """ | |
| ) | |
| gr.LoginButton() | |
| run_button = gr.Button("Run Evaluation & Submit All Answers") | |
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
| results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
| run_button.click( | |
| fn=run_and_submit_all, | |
| outputs=[status_output, results_table] | |
| ) | |
| if __name__ == "__main__": | |
| print("\n" + "-"*30 + " App Starting " + "-"*30) | |
| space_host_startup = os.getenv("SPACE_HOST") | |
| space_id_startup = os.getenv("SPACE_ID") | |
| if space_host_startup: | |
| print(f"✅ SPACE_HOST found: {space_host_startup}") | |
| print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
| else: | |
| print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
| if space_id_startup: | |
| print(f"✅ SPACE_ID found: {space_id_startup}") | |
| print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
| print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
| else: | |
| print("ℹ️ SPACE_ID in environment variable not found (running locally?).") | |
| print("-"*(60 + len(" App Starting ")) + "\n") | |
| print("Launching Gradio Interface for Basic Agent Evaluation...") | |
| demo.launch(debug=True, share=False) |