Agent-Example / app.py
SolshineMisfit's picture
Update app.py
3dd4f84 verified
from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, load_tool, tool, VisitWebpageTool
import datetime
import requests
import pytz
import yaml
import os
from datasets import Dataset
from huggingface_hub import HfApi
from openai import OpenAI
from tools.final_answer import FinalAnswerTool
from huggingface_hub import InferenceClient
from Gradio_UI import GradioUI
# Define the Perplexity system prompt
Perplex_Assistant_Prompt = """You are a helpful AI assistant that searches the web for accurate information."""
# Set up API key in environment variable as expected by HfApiModel
os.environ["HUGGINGFACE_API_TOKEN"] = os.getenv("HUGGINGFACE_API_KEY", "")
# Initialize search tools with fallback capability
try:
# Try DuckDuckGo first (default)
print("Initializing DuckDuckGo search tool...")
ddg_search_tool = DuckDuckGoSearchTool(max_results=10)
# Test the tool with a simple query
test_result = ddg_search_tool("test query")
print("DuckDuckGo search tool initialized successfully.")
# Use DuckDuckGo as the primary search tool
primary_search_tool = ddg_search_tool
search_tool_name = "DuckDuckGo"
except Exception as e:
print(f"Error initializing DuckDuckGo search tool: {str(e)}")
print("Falling back to Google search tool...")
try:
# Import GoogleSearchTool only if needed
from smolagents import GoogleSearchTool
google_search_tool = GoogleSearchTool()
# Test the Google search tool
test_result = google_search_tool("test query")
print("Google search tool initialized successfully.")
# Use Google as the fallback search tool
primary_search_tool = google_search_tool
search_tool_name = "Google"
except Exception as google_error:
print(f"Error initializing Google search tool: {str(google_error)}")
print("WARNING: No working search tool available. Agent functionality will be limited.")
# Create a minimal replacement that returns an explanatory message
def search_fallback(query):
return f"Search functionality unavailable. Both DuckDuckGo and Google search tools failed to initialize. Query was: {query}"
primary_search_tool = search_fallback
search_tool_name = "Unavailable"
# Initialize the VisitWebpageTool
visit_webpage_tool = VisitWebpageTool()
#@weave.op()
def tracked_perplexity_call(prompt: str, system_messages: str, model_name: str = "sonar-pro", assistant_meta: bool = False):
"""Enhanced Perplexity API call with explicit model tracking."""
client = OpenAI(api_key=os.getenv("PERPLEXITY_API_KEY"), base_url="https://api.perplexity.ai")
system_message = Perplex_Assistant_Prompt
if assistant_meta:
system_message += f"\n\n{system_messages}"
# Minimal parameters for Perplexity
return client.chat.completions.create(
model=model_name,
messages=[
{"role": "system", "content": system_message},
{"role": "user", "content": prompt},
],
stream=False,
).choices[0].message.content
@tool
def Sonar_Web_Search_Tool(arg1: str, arg2: str) -> str:
"""A tool that accesses Perplexity Sonar to search the web when the answer requires or would benefit from a real world web reference.
Args:
arg1: User Prompt
arg2: Details on the desired web search results as system message for sonar web search
"""
try:
sonar_response = tracked_perplexity_call(arg1, arg2)
return sonar_response
except Exception as e:
return f"Error using Sonar Websearch tool '{arg1} {arg2}': {str(e)}"
def parse_json(text: str):
"""
A safer JSON parser using ast.literal_eval.
Converts JSON-like strings to Python objects without executing code.
Handles common JSON literals (true, false, null) by converting them to Python equivalents.
"""
# Replace JSON literals with Python equivalents
prepared_text = text.replace("true", "True").replace("false", "False").replace("null", "None")
try:
import ast
return ast.literal_eval(prepared_text)
except (SyntaxError, ValueError) as e:
raise ValueError(f"Failed to parse JSON: {str(e)}")
def Dataset_Creator_Function(dataset_name: str, conversation_data: str) -> str:
"""Creates and pushes a dataset to Hugging Face with the conversation history.
Args:
dataset_name: Name for the dataset (will be prefixed with username)
conversation_data: String representing the conversation data. Can be:
- JSON array of objects (each object becomes a row)
- Pipe-separated values (first row as headers, subsequent rows as values)
- Plain text (stored in a single 'text' column)
Returns:
URL of the created dataset or error message along with the log output.
"""
log_text = ""
try:
# Required imports
import pandas as pd
from datasets import Dataset, DatasetDict
from huggingface_hub import HfApi
# Get API key
api_key = os.getenv("HF_API_KEY") or os.getenv("HUGGINGFACE_API_KEY")
if not api_key:
return "Error: No Hugging Face API key found in environment variables"
# Set fixed username
username = "Misfits-and-Machines"
safe_dataset_name = dataset_name.replace(" ", "_").lower()
repo_id = f"{username}/{safe_dataset_name}"
log_text += f"Creating dataset: {repo_id}\n"
# Ensure repository exists
hf_api = HfApi(token=api_key)
try:
if not hf_api.repo_exists(repo_id=repo_id, repo_type="dataset"):
hf_api.create_repo(repo_id=repo_id, repo_type="dataset")
log_text += f"Created repository: {repo_id}\n"
else:
log_text += f"Repository already exists: {repo_id}\n"
except Exception as e:
log_text += f"Note when checking/creating repository: {str(e)}\n"
# Process input data
created_ds = None
try:
# Try parsing as JSON using the safer parse_json function
try:
json_data = parse_json(conversation_data)
# Process based on data structure
if isinstance(json_data, list) and all(isinstance(item, dict) for item in json_data):
log_text += f"Processing JSON array with {len(json_data)} items\n"
# Create a dataset with columns for all keys in the first item
# This ensures the dataset structure is consistent
first_item = json_data[0]
columns = list(first_item.keys())
log_text += f"Detected columns: {columns}\n"
# Initialize data dictionary with empty lists for each column
data_dict = {col: [] for col in columns}
# Process each item
for item in json_data:
for col in columns:
# Get the value for this column, or empty string if missing
value = item.get(col, "")
data_dict[col].append(value)
# Debug output to verify data structure
for col in columns:
log_text += f"Column '{col}' has {len(data_dict[col])} entries\n"
# Create dataset from dictionary
ds = Dataset.from_dict(data_dict)
log_text += f"Created dataset with {len(ds)} rows\n"
created_ds = DatasetDict({"train": ds})
elif isinstance(json_data, dict):
log_text += "Processing single JSON object\n"
# For a single object, create a dataset with one row
data_dict = {k: [v] for k, v in json_data.items()}
ds = Dataset.from_dict(data_dict)
created_ds = DatasetDict({"train": ds})
else:
raise ValueError("JSON not recognized as array or single object")
except Exception as json_error:
log_text += f"Not processing as JSON: {str(json_error)}\n"
raise json_error # Propagate to next handler
except Exception:
# Try pipe-separated format
lines = conversation_data.strip().split('\n')
if '|' in conversation_data and len(lines) > 1:
log_text += "Processing as pipe-separated data\n"
headers = [h.strip() for h in lines[0].split('|')]
log_text += f"Detected headers: {headers}\n"
# Initialize data dictionary
data_dict = {header: [] for header in headers}
# Process each data row
for i, line in enumerate(lines[1:], 1):
if not line.strip():
continue
values = [val.strip() for val in line.split('|')]
if len(values) == len(headers):
for j, header in enumerate(headers):
data_dict[header].append(values[j])
else:
log_text += f"Warning: Skipping row {i} (column count mismatch)\n"
# Create dataset from dictionary
if all(len(values) > 0 for values in data_dict.values()):
ds = Dataset.from_dict(data_dict)
log_text += f"Created dataset with {len(ds)} rows\n"
created_ds = DatasetDict({"train": ds})
else:
log_text += "No valid rows found in pipe-separated data\n"
created_ds = DatasetDict({"train": Dataset.from_dict({"text": [conversation_data]})})
else:
# Fallback for plain text
log_text += "Processing as plain text\n"
created_ds = DatasetDict({"train": Dataset.from_dict({"text": [conversation_data]})})
# Push using the DatasetDict push_to_hub method.
log_text += f"Pushing dataset to {repo_id}\n"
created_ds.push_to_hub(
repo_id=repo_id,
token=api_key,
commit_message=f"Upload dataset: {dataset_name}"
)
dataset_url = f"https://huggingface.co/datasets/{repo_id}"
log_text += f"Dataset successfully pushed to: {dataset_url}\n"
return f"Successfully created dataset at {dataset_url}\nLogs:\n{log_text}"
except Exception as e:
import traceback
error_trace = traceback.format_exc()
log_text += f"Dataset creation error: {str(e)}\n{error_trace}\n"
return f"Error creating dataset: {str(e)}\nLogs:\n{log_text}"
@tool
def Dataset_Creator_Tool(dataset_name: str, conversation_data: str) -> str:
"""A tool that creates and pushes a dataset to Hugging Face.
Args:
dataset_name: Name for the dataset (will be prefixed with 'Misfits-and-Machines/')
conversation_data: Data content to save in the dataset. Formats supported:
1. JSON array of objects – Each object becomes a row (keys as columns).
Example: [{"name": "Product A", "brand": "Company X"}, {"name": "Product B", "brand": "Company Y"}]
2. Pipe-separated values – First row as headers, remaining rows as values.
Example: "name | brand\nProduct A | Company X\nProduct B | Company Y"
3. Plain text – Stored in a single 'text' column.
Returns:
A link to the created dataset on the Hugging Face Hub or an error message, along with log details.
"""
try:
log_text = f"Creating dataset '{dataset_name}' with {len(conversation_data)} characters of data\n"
log_text += f"Dataset will be created at Misfits-and-Machines/{dataset_name.replace(' ', '_').lower()}\n"
# Call Dataset_Creator_Function directly without trying to define any new functions
result = Dataset_Creator_Function(dataset_name, conversation_data)
log_text += f"Dataset creation result: {result}\n"
return log_text
except Exception as e:
import traceback
error_trace = traceback.format_exc()
return f"Error using Dataset Creator tool: {str(e)}\n{error_trace}"
def verify_dataset_exists(repo_id: str) -> dict:
"""Verify that a dataset exists and is valid on the Hugging Face Hub.
Args:
repo_id: Full repository ID in format "username/dataset_name"
Returns:
Dict with "exists" boolean and "message" string
"""
try:
# Check if dataset exists using the datasets-server API
api_url = f"https://datasets-server.huggingface.co/is-valid?dataset={repo_id}"
response = requests.get(api_url)
# Parse the response
if response.status_code == 200:
data = response.json()
# If any of these are True, the dataset exists in some form
if data.get("viewer", False) or data.get("preview", False):
return {"exists": True, "message": "Dataset is valid and accessible"}
else:
return {"exists": False, "message": "Dataset exists but may not be fully processed yet"}
else:
return {"exists": False, "message": f"API returned status code {response.status_code}"}
except Exception as e:
return {"exists": False, "message": f"Error verifying dataset: {str(e)}"}
@tool
def Check_Dataset_Validity(dataset_name: str) -> str:
"""A tool that checks if a dataset exists and is valid on Hugging Face.
Args:
dataset_name: Name of the dataset to check (with or without organization prefix)
Returns:
Status message about the dataset validity
"""
try:
# Ensure the dataset name has the organization prefix
if "/" not in dataset_name:
dataset_name = f"Misfits-and-Machines/{dataset_name.replace(' ', '_').lower()}"
# Check dataset validity
result = verify_dataset_exists(dataset_name)
if result["exists"]:
return f"Dataset '{dataset_name}' exists and is valid. You can access it at https://huggingface.co/datasets/{dataset_name}"
else:
return f"Dataset '{dataset_name}' could not be verified: {result['message']}. It may still be processing or may not exist."
except Exception as e:
return f"Error checking dataset validity: {str(e)}"
@tool
def get_current_time_in_timezone(timezone: str) -> str:
"""A tool that fetches the current local time in a specified timezone.
Args:
timezone: A string representing a valid timezone (e.g., 'America/New_York').
"""
try:
# Create timezone object
tz = pytz.timezone(timezone)
# Get current time in that timezone
local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")
return f"The current local time in {timezone} is: {local_time}"
except Exception as e:
return f"Error fetching time for timezone '{timezone}': {str(e)}"
final_answer = FinalAnswerTool()
# Keep the original endpoint as a backup
backup_model = HfApiModel(
max_tokens=2096,
temperature=0.5,
model_id='https://pflgm2locj2t89co.us-east-1.aws.endpoints.huggingface.cloud',
)
def model_with_fallback(prompt, **kwargs):
"""Simple model function with fallback to the original endpoint."""
try:
print("Using primary model: DeepSeek-R1-Distill-Qwen-32B")
# Get API key
api_key = os.getenv("HF_API_KEY") or os.getenv("HUGGINGFACE_API_KEY")
if not api_key:
raise ValueError("No Hugging Face API key found")
# Format prompt for the API
if isinstance(prompt, (dict, list)):
import json
prompt_text = json.dumps(prompt)
else:
prompt_text = str(prompt)
# Create client and call model
client = InferenceClient(
provider="hf-inference",
api_key=api_key
)
# Extract parameters
temperature = kwargs.get('temperature', 0.5)
max_tokens = kwargs.get('max_tokens', 2096)
stop_sequences = kwargs.get('stop_sequences', None)
# Call the API
messages = [{"role": "user", "content": prompt_text}]
completion = client.chat.completions.create(
model="deepseek-ai/DeepSeek-R1-Distill-Qwen-32B",
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
stop=stop_sequences
)
print("Primary model successful")
return completion.choices[0].message.content
except Exception as e:
print(f"Primary model failed: {str(e)}")
print("Falling back to backup model")
# Use the backup model
return backup_model(prompt, **kwargs)
# Set up the model for the agent
model = backup_model # Set to backup model directly for now to ensure it works
# Import tool from Hub
image_generation_tool = load_tool("agents-course/text-to-image", trust_remote_code=True)
with open("prompts.yaml", 'r') as stream:
prompt_templates = yaml.safe_load(stream)
# Initialize the agent using standard smolagents patterns
agent = CodeAgent(
model=model,
tools=[
final_answer,
Sonar_Web_Search_Tool,
primary_search_tool, # This is already set to either DuckDuckGo, Google, or fallback
get_current_time_in_timezone,
image_generation_tool,
Dataset_Creator_Tool,
Check_Dataset_Validity,
visit_webpage_tool, # This is correctly initialized as VisitWebpageTool()
],
max_steps=6,
verbosity_level=1,
grammar=None,
planning_interval=3,
name="Research Assistant",
description="""An AI assistant that can search the web, create datasets, and answer questions # Note about working within token limits
# When using with queries that might exceed token limits, consider:
# 1. Breaking tasks into smaller sub-tasks
# 2. Limiting the amount of data returned by search tools
# 3. Using the planning_interval to enable more effective reasoning""",
prompt_templates=prompt_templates
)
# Add informative message about which search tool is being used
print(f"Agent initialized with {search_tool_name} as primary search tool")
print(f"Available tools: final_answer, Sonar_Web_Search_Tool, {search_tool_name}, get_current_time_in_timezone, image_generation_tool, Dataset_Creator_Tool, Check_Dataset_Validity, visit_webpage_tool")
print(f"Using DeepSeek-R1-Distill-Qwen-32B as primary model, with HfApiModel as backup")
# Note about working within token limits - add this comment
# When using with queries that might exceed token limits, consider:
# 1. Breaking tasks into smaller sub-tasks
# 2. Limiting the amount of data returned by search tools
# 3. Using the planning_interval to enable more effective reasoning
# To fix the TypeError in Gradio_UI.py, you would need to modify that file
# For now, we'll just use the agent directly
try:
GradioUI(agent).launch()
except TypeError as e:
if "unsupported operand type(s) for +=" in str(e):
print("Error: Token counting issue in Gradio UI")
print("To fix, edit Gradio_UI.py and change:")
print("total_input_tokens += agent.model.last_input_token_count")
print("To:")
print("total_input_tokens += (agent.model.last_input_token_count or 0)")
else:
raise e