Spaces:
Running
Running
import os | |
import gradio as gr | |
from typing import List | |
import logging | |
import logging.handlers | |
import json | |
from datetime import datetime | |
from langchain_openai import ChatOpenAI | |
from langchain_core.tools import tool | |
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage | |
from langchain_tavily import TavilySearch | |
# Configuration - set to False to disable detailed logging | |
ENABLE_DETAILED_LOGGING = True | |
# Setup logging with rotation (7 days max) | |
if ENABLE_DETAILED_LOGGING: | |
# Create formatter | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
# Setup console handler | |
console_handler = logging.StreamHandler() | |
console_handler.setFormatter(formatter) | |
# Setup rotating file handler (7 days, daily rotation) | |
file_handler = logging.handlers.TimedRotatingFileHandler( | |
'agent.log', | |
when='midnight', | |
interval=1, | |
backupCount=7, # Keep 7 days of logs | |
encoding='utf-8' | |
) | |
file_handler.setFormatter(formatter) | |
# Configure root logger | |
logging.basicConfig( | |
level=logging.INFO, | |
handlers=[console_handler, file_handler] | |
) | |
else: | |
logging.basicConfig(level=logging.WARNING) | |
logger = logging.getLogger(__name__) | |
def get_current_date() -> str: | |
"""Get current date in YYYY-MM-DD format""" | |
return datetime.now().strftime("%Y-%m-%d") | |
def create_system_prompt() -> str: | |
"""Create dynamic system prompt with current date""" | |
current_date = get_current_date() | |
return f"""You are a helpful AI assistant with web search capabilities. | |
TODAY'S DATE: {current_date} | |
IMPORTANT: You have access to a web_search tool. Consider your knowledge cutoff date and today's date to decide when to search: | |
USE WEB SEARCH when users ask about: | |
- Events after your knowledge cutoff date | |
- Current events, breaking news, or recent developments | |
- Today's date, current time, or "what's happening now" | |
- Real-time data (weather, stock prices, sports scores) | |
- Recent updates to ongoing situations | |
- Information that changes frequently | |
- When users explicitly ask you to search the web | |
DO NOT use web search for: | |
- Historical facts before your cutoff date | |
- General knowledge that doesn't change (capitals, basic science, etc.) | |
- Established facts and concepts | |
- Personal advice or opinions | |
When in doubt about whether information might be outdated, use web search to get the most current information.""" | |
# Configuration from environment variables | |
llm_ip = os.environ.get('public_ip') | |
llm_port = os.environ.get('port') | |
llm_key = os.environ.get('api_key') | |
llm_model = os.environ.get('model') | |
# Tavily API configuration | |
tavily_key = os.environ.get('tavily_key', '') | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"Tavily API key present: {bool(tavily_key)}") | |
if tavily_key: | |
logger.info(f"Tavily API key length: {len(tavily_key)}") | |
else: | |
logger.warning("No Tavily API key found in environment variables") | |
# Tool calling agent implementation | |
class ToolCallingAgentChat: | |
def __init__(self, ip: str, port: str, api_key: str, model: str): | |
self.ip = ip | |
self.port = port | |
self.api_key = api_key | |
self.model = model | |
self.llm = None | |
self.tools = [] | |
self.conversation_id = None | |
self._setup_agent() | |
def reset_conversation(self): | |
"""Reset conversation state""" | |
import uuid | |
self.conversation_id = str(uuid.uuid4()) | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"=== CONVERSATION RESET ===") | |
logger.info(f"New conversation ID: {self.conversation_id}") | |
def _setup_agent(self): | |
"""Initialize the tool calling agent""" | |
try: | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"=== SETTING UP TOOL CALLING AGENT ===") | |
protocol = "https" if self.port in ["443", "11443"] or "https" in self.ip else "http" | |
logger.info(f"LLM URL: {protocol}://{self.ip}:{self.port}/v1") | |
logger.info(f"Model: {self.model}") | |
logger.info(f"Using protocol: {protocol}") | |
# Create OpenAI-compatible model with HTTPS support | |
protocol = "https" if self.port in ["443", "11443"] or "https" in self.ip else "http" | |
base_url = f"{protocol}://{self.ip}:{self.port}/v1" | |
# Add custom headers for Cloudflare bypass | |
cf_bypass_key = os.environ.get('cf_bypass_key', 'devquasar2025') | |
self.llm = ChatOpenAI( | |
base_url=base_url, | |
api_key=self.api_key or "ollama", # Use provided key or default | |
model=self.model, | |
temperature=0.7, | |
default_headers={ | |
"x-cf-bypass": cf_bypass_key, | |
"User-Agent": "DevQuasar-Agent/1.0" | |
} | |
) | |
if ENABLE_DETAILED_LOGGING: | |
logger.info("LLM created successfully") | |
# Define web search tool | |
if tavily_key: | |
if ENABLE_DETAILED_LOGGING: | |
logger.info("Setting up Tavily search tool") | |
try: | |
def web_search(query: str) -> str: | |
"""Search the web for current information about any topic. Use this when you need up-to-date information, current events, or real-time data.""" | |
try: | |
tavily_tool = TavilySearch( | |
tavily_api_key=tavily_key, | |
max_results=5, | |
topic="general", | |
include_answer=True, | |
search_depth="advanced" | |
) | |
result = tavily_tool.invoke({"query": query}) | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"Tavily search successful for query: {query}") | |
return result | |
except Exception as e: | |
error_str = str(e).lower() | |
if ENABLE_DETAILED_LOGGING: | |
logger.error(f"Tavily search failed for query '{query}': {e}") | |
logger.error(f"Exception type: {type(e).__name__}") | |
# Check for rate limit or quota issues | |
if any(keyword in error_str for keyword in ['rate limit', 'quota', 'limit exceeded', 'usage limit', 'billing']): | |
if ENABLE_DETAILED_LOGGING: | |
logger.warning(f"Tavily rate limit/quota exceeded: {e}") | |
return "I can't search the web right now due to rate limits." | |
else: | |
if ENABLE_DETAILED_LOGGING: | |
logger.error(f"Tavily API error: {e}") | |
return f"I can't search the web right now. Error: {str(e)[:100]}" | |
self.tools = [web_search] | |
if ENABLE_DETAILED_LOGGING: | |
logger.info("Tavily search tool created successfully") | |
except Exception as e: | |
if ENABLE_DETAILED_LOGGING: | |
logger.error(f"Failed to create Tavily tool: {e}") | |
self.tools = [] | |
else: | |
if ENABLE_DETAILED_LOGGING: | |
logger.warning("No Tavily API key found, no web search tool available") | |
self.tools = [] | |
# Bind tools to the model | |
if self.tools: | |
self.llm_with_tools = self.llm.bind_tools(self.tools) | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"Tools bound to model: {[tool.name for tool in self.tools]}") | |
else: | |
self.llm_with_tools = self.llm | |
if ENABLE_DETAILED_LOGGING: | |
logger.info("No tools available, using base model") | |
if ENABLE_DETAILED_LOGGING: | |
logger.info("Tool calling agent created successfully") | |
except Exception as e: | |
logger.error(f"=== AGENT SETUP ERROR ===") | |
logger.error(f"Failed to setup agent: {e}") | |
import traceback | |
logger.error(f"Traceback: {traceback.format_exc()}") | |
raise e | |
def update_config(self, ip: str, port: str, api_key: str, model: str): | |
"""Update LLM configuration""" | |
if (ip != self.ip or port != self.port or | |
api_key != self.api_key or model != self.model): | |
self.ip = ip | |
self.port = port | |
self.api_key = api_key | |
self.model = model | |
self._setup_agent() | |
def chat(self, message: str, history: List[List[str]]) -> str: | |
"""Generate chat response using tool calling""" | |
try: | |
if not self.llm_with_tools: | |
return "Error: Agent not initialized" | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"=== USER INPUT ===") | |
logger.info(f"Message: {message}") | |
logger.info(f"History length: {len(history)}") | |
# Convert history to messages for context with dynamic system message | |
from langchain_core.messages import SystemMessage | |
# Create dynamic system prompt with current date | |
system_prompt = create_system_prompt() | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"System prompt includes today's date: {get_current_date()}") | |
messages = [SystemMessage(content=system_prompt)] | |
for user_msg, assistant_msg in history: | |
messages.append(HumanMessage(content=user_msg)) | |
if assistant_msg: # Only add if assistant responded | |
messages.append(AIMessage(content=assistant_msg)) | |
# Add current message | |
messages.append(HumanMessage(content=message)) | |
# Get initial response from LLM | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"=== INVOKING LLM ===") | |
logger.info(f"Total messages in context: {len(messages)}") | |
response = self.llm_with_tools.invoke(messages) | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"=== LLM RESPONSE ===") | |
logger.info(f"Response type: {type(response)}") | |
logger.info(f"Response content: {response.content}") | |
logger.info(f"Has tool calls: {bool(response.tool_calls if hasattr(response, 'tool_calls') else False)}") | |
if hasattr(response, 'tool_calls') and response.tool_calls: | |
logger.info(f"Tool calls: {response.tool_calls}") | |
logger.info(f"Response additional_kwargs: {getattr(response, 'additional_kwargs', {})}") | |
# Check if LLM wants to call tools | |
tool_calls_to_execute = [] | |
# Method 1: Proper tool calls | |
if hasattr(response, 'tool_calls') and response.tool_calls: | |
tool_calls_to_execute = response.tool_calls | |
# Method 2: Fallback - check if model mentioned search in content and user asked for current info | |
elif ("search" in message.lower() or "today" in message.lower() or "current" in message.lower() or "recent" in message.lower()) and self.tools: | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"=== FALLBACK TOOL CALLING ===") | |
logger.info(f"Detected need for search based on keywords") | |
# Manually trigger web search | |
import uuid | |
tool_calls_to_execute = [{ | |
'name': 'web_search', | |
'args': {'query': message}, | |
'id': str(uuid.uuid4()) | |
}] | |
if tool_calls_to_execute: | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"=== TOOL CALLS DETECTED ===") | |
logger.info(f"Number of tool calls: {len(tool_calls_to_execute)}") | |
# Add the LLM response to messages | |
messages.append(response) | |
# Execute tool calls | |
for tool_call in tool_calls_to_execute: | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"Executing tool: {tool_call['name']} with args: {tool_call['args']}") | |
# Find and execute the tool | |
tool_result = None | |
for tool in self.tools: | |
if tool.name == tool_call['name']: | |
try: | |
tool_result = tool.invoke(tool_call['args']) | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"Tool executed successfully: {tool_call['name']}") | |
break | |
except Exception as e: | |
tool_result = f"Tool execution failed: {str(e)}" | |
if ENABLE_DETAILED_LOGGING: | |
logger.error(f"Tool execution failed: {e}") | |
if tool_result is None: | |
tool_result = f"Tool {tool_call['name']} not found" | |
# Add tool result to messages | |
messages.append(ToolMessage( | |
content=str(tool_result), | |
tool_call_id=tool_call['id'] | |
)) | |
# Get final response from LLM after tool execution | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"=== GETTING FINAL RESPONSE ===") | |
final_response = self.llm_with_tools.invoke(messages) | |
final_message = final_response.content | |
else: | |
# No tool calls, use the direct response | |
final_message = response.content | |
if ENABLE_DETAILED_LOGGING: | |
logger.info(f"=== FINAL MESSAGE ===") | |
logger.info(f"Final message: {final_message}") | |
return final_message | |
except Exception as e: | |
error_msg = f"Agent error: {str(e)}" | |
logger.error(f"=== AGENT ERROR ===") | |
logger.error(f"Error: {e}") | |
logger.error(f"Error type: {type(e)}") | |
import traceback | |
logger.error(f"Traceback: {traceback.format_exc()}") | |
return error_msg | |
# Global agent instance | |
tool_calling_agent = ToolCallingAgentChat(llm_ip, llm_port, llm_key, llm_model) | |
def generate_response(message: str, history: List[List[str]], max_tokens: int): | |
"""Generate response using tool calling agent with dynamic system prompt""" | |
global tool_calling_agent | |
try: | |
# Configuration is pre-loaded from environment variables | |
# No runtime config changes allowed for security | |
# Reset conversation if history is empty (indicates clear button was pressed) | |
if len(history) == 0: | |
tool_calling_agent.reset_conversation() | |
# Generate response | |
response = tool_calling_agent.chat(message, history) | |
# Stream the response word by word for better UX | |
words = response.split() | |
current_response = "" | |
for word in words: | |
current_response += word + " " | |
yield current_response.strip() | |
except Exception as e: | |
error_msg = f"Error: {str(e)}" | |
logger.error(error_msg) | |
yield error_msg | |
# CSS to fix avatar distortion and positioning issues | |
avatar_css = """ | |
/* Fix avatar image distortion with multiple selectors for compatibility */ | |
#chatbot .avatar-container img, | |
#chatbot .message-row img, | |
#chatbot .message img, | |
#chatbot img[src*="twemoji"], | |
#chatbot img[src*="huggingface"], | |
.gr-chatbot img { | |
width: 40px !important; | |
height: 40px !important; | |
min-width: 40px !important; | |
min-height: 40px !important; | |
max-width: 40px !important; | |
max-height: 40px !important; | |
border-radius: 50% !important; | |
object-fit: cover !important; | |
aspect-ratio: 1 / 1 !important; | |
margin: 0px !important; | |
padding: 0px !important; | |
display: block !important; | |
flex-shrink: 0 !important; | |
} | |
/* Force square containers */ | |
#chatbot .avatar-container, | |
.gr-chatbot .avatar-container { | |
width: 40px !important; | |
height: 40px !important; | |
min-width: 40px !important; | |
min-height: 40px !important; | |
flex-shrink: 0 !important; | |
display: flex !important; | |
align-items: center !important; | |
justify-content: center !important; | |
} | |
""" | |
# Create Gradio ChatInterface | |
chatbot = gr.ChatInterface( | |
generate_response, | |
chatbot=gr.Chatbot( | |
avatar_images=[ | |
"https://cdn.jsdelivr.net/gh/twitter/twemoji@latest/assets/72x72/1f464.png", # User avatar (person emoji) | |
"https://cdn-avatars.huggingface.co/v1/production/uploads/64e6d37e02dee9bcb9d9fa18/o_HhUnXb_PgyYlqJ6gfEO.png" # Bot avatar | |
], | |
height="64vh", | |
elem_id="chatbot" | |
), | |
additional_inputs=[ | |
gr.Slider(50, 8192, label="Max Tokens", value=1024, | |
info="Maximum number of tokens in the response"), | |
], | |
title="🤖 DQ Micro Agent", | |
description="DevQuasar self hosted Micro Agent with websearch capabilities", | |
theme="finlaymacklon/smooth_slate", | |
css=avatar_css | |
) | |
if __name__ == "__main__": | |
chatbot.queue().launch() |