dq_hosted1 / app.py
csabakecskemeti's picture
Upload app.py
efe6d5a verified
import os
import gradio as gr
from typing import List
import logging
import logging.handlers
import json
from datetime import datetime
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langchain_tavily import TavilySearch
# Configuration - set to False to disable detailed logging
ENABLE_DETAILED_LOGGING = True
# Setup logging with rotation (7 days max)
if ENABLE_DETAILED_LOGGING:
# Create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Setup console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
# Setup rotating file handler (7 days, daily rotation)
file_handler = logging.handlers.TimedRotatingFileHandler(
'agent.log',
when='midnight',
interval=1,
backupCount=7, # Keep 7 days of logs
encoding='utf-8'
)
file_handler.setFormatter(formatter)
# Configure root logger
logging.basicConfig(
level=logging.INFO,
handlers=[console_handler, file_handler]
)
else:
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
def get_current_date() -> str:
"""Get current date in YYYY-MM-DD format"""
return datetime.now().strftime("%Y-%m-%d")
def create_system_prompt() -> str:
"""Create dynamic system prompt with current date"""
current_date = get_current_date()
return f"""You are a helpful AI assistant with web search capabilities.
TODAY'S DATE: {current_date}
IMPORTANT: You have access to a web_search tool. Consider your knowledge cutoff date and today's date to decide when to search:
USE WEB SEARCH when users ask about:
- Events after your knowledge cutoff date
- Current events, breaking news, or recent developments
- Today's date, current time, or "what's happening now"
- Real-time data (weather, stock prices, sports scores)
- Recent updates to ongoing situations
- Information that changes frequently
- When users explicitly ask you to search the web
DO NOT use web search for:
- Historical facts before your cutoff date
- General knowledge that doesn't change (capitals, basic science, etc.)
- Established facts and concepts
- Personal advice or opinions
When in doubt about whether information might be outdated, use web search to get the most current information."""
# Configuration from environment variables
llm_ip = os.environ.get('public_ip')
llm_port = os.environ.get('port')
llm_key = os.environ.get('api_key')
llm_model = os.environ.get('model')
# Tavily API configuration
tavily_key = os.environ.get('tavily_key', '')
if ENABLE_DETAILED_LOGGING:
logger.info(f"Tavily API key present: {bool(tavily_key)}")
if tavily_key:
logger.info(f"Tavily API key length: {len(tavily_key)}")
else:
logger.warning("No Tavily API key found in environment variables")
# Tool calling agent implementation
class ToolCallingAgentChat:
def __init__(self, ip: str, port: str, api_key: str, model: str):
self.ip = ip
self.port = port
self.api_key = api_key
self.model = model
self.llm = None
self.tools = []
self.conversation_id = None
self._setup_agent()
def reset_conversation(self):
"""Reset conversation state"""
import uuid
self.conversation_id = str(uuid.uuid4())
if ENABLE_DETAILED_LOGGING:
logger.info(f"=== CONVERSATION RESET ===")
logger.info(f"New conversation ID: {self.conversation_id}")
def _setup_agent(self):
"""Initialize the tool calling agent"""
try:
if ENABLE_DETAILED_LOGGING:
logger.info(f"=== SETTING UP TOOL CALLING AGENT ===")
protocol = "https" if self.port in ["443", "11443"] or "https" in self.ip else "http"
logger.info(f"LLM URL: {protocol}://{self.ip}:{self.port}/v1")
logger.info(f"Model: {self.model}")
logger.info(f"Using protocol: {protocol}")
# Create OpenAI-compatible model with HTTPS support
protocol = "https" if self.port in ["443", "11443"] or "https" in self.ip else "http"
base_url = f"{protocol}://{self.ip}:{self.port}/v1"
# Add custom headers for Cloudflare bypass
cf_bypass_key = os.environ.get('cf_bypass_key', 'devquasar2025')
self.llm = ChatOpenAI(
base_url=base_url,
api_key=self.api_key or "ollama", # Use provided key or default
model=self.model,
temperature=0.7,
default_headers={
"x-cf-bypass": cf_bypass_key,
"User-Agent": "DevQuasar-Agent/1.0"
}
)
if ENABLE_DETAILED_LOGGING:
logger.info("LLM created successfully")
# Define web search tool
if tavily_key:
if ENABLE_DETAILED_LOGGING:
logger.info("Setting up Tavily search tool")
try:
@tool
def web_search(query: str) -> str:
"""Search the web for current information about any topic. Use this when you need up-to-date information, current events, or real-time data."""
try:
tavily_tool = TavilySearch(
tavily_api_key=tavily_key,
max_results=5,
topic="general",
include_answer=True,
search_depth="advanced"
)
result = tavily_tool.invoke({"query": query})
if ENABLE_DETAILED_LOGGING:
logger.info(f"Tavily search successful for query: {query}")
return result
except Exception as e:
error_str = str(e).lower()
if ENABLE_DETAILED_LOGGING:
logger.error(f"Tavily search failed for query '{query}': {e}")
logger.error(f"Exception type: {type(e).__name__}")
# Check for rate limit or quota issues
if any(keyword in error_str for keyword in ['rate limit', 'quota', 'limit exceeded', 'usage limit', 'billing']):
if ENABLE_DETAILED_LOGGING:
logger.warning(f"Tavily rate limit/quota exceeded: {e}")
return "I can't search the web right now due to rate limits."
else:
if ENABLE_DETAILED_LOGGING:
logger.error(f"Tavily API error: {e}")
return f"I can't search the web right now. Error: {str(e)[:100]}"
self.tools = [web_search]
if ENABLE_DETAILED_LOGGING:
logger.info("Tavily search tool created successfully")
except Exception as e:
if ENABLE_DETAILED_LOGGING:
logger.error(f"Failed to create Tavily tool: {e}")
self.tools = []
else:
if ENABLE_DETAILED_LOGGING:
logger.warning("No Tavily API key found, no web search tool available")
self.tools = []
# Bind tools to the model
if self.tools:
self.llm_with_tools = self.llm.bind_tools(self.tools)
if ENABLE_DETAILED_LOGGING:
logger.info(f"Tools bound to model: {[tool.name for tool in self.tools]}")
else:
self.llm_with_tools = self.llm
if ENABLE_DETAILED_LOGGING:
logger.info("No tools available, using base model")
if ENABLE_DETAILED_LOGGING:
logger.info("Tool calling agent created successfully")
except Exception as e:
logger.error(f"=== AGENT SETUP ERROR ===")
logger.error(f"Failed to setup agent: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
raise e
def update_config(self, ip: str, port: str, api_key: str, model: str):
"""Update LLM configuration"""
if (ip != self.ip or port != self.port or
api_key != self.api_key or model != self.model):
self.ip = ip
self.port = port
self.api_key = api_key
self.model = model
self._setup_agent()
def chat(self, message: str, history: List[List[str]]) -> str:
"""Generate chat response using tool calling"""
try:
if not self.llm_with_tools:
return "Error: Agent not initialized"
if ENABLE_DETAILED_LOGGING:
logger.info(f"=== USER INPUT ===")
logger.info(f"Message: {message}")
logger.info(f"History length: {len(history)}")
# Convert history to messages for context with dynamic system message
from langchain_core.messages import SystemMessage
# Create dynamic system prompt with current date
system_prompt = create_system_prompt()
if ENABLE_DETAILED_LOGGING:
logger.info(f"System prompt includes today's date: {get_current_date()}")
messages = [SystemMessage(content=system_prompt)]
for user_msg, assistant_msg in history:
messages.append(HumanMessage(content=user_msg))
if assistant_msg: # Only add if assistant responded
messages.append(AIMessage(content=assistant_msg))
# Add current message
messages.append(HumanMessage(content=message))
# Get initial response from LLM
if ENABLE_DETAILED_LOGGING:
logger.info(f"=== INVOKING LLM ===")
logger.info(f"Total messages in context: {len(messages)}")
response = self.llm_with_tools.invoke(messages)
if ENABLE_DETAILED_LOGGING:
logger.info(f"=== LLM RESPONSE ===")
logger.info(f"Response type: {type(response)}")
logger.info(f"Response content: {response.content}")
logger.info(f"Has tool calls: {bool(response.tool_calls if hasattr(response, 'tool_calls') else False)}")
if hasattr(response, 'tool_calls') and response.tool_calls:
logger.info(f"Tool calls: {response.tool_calls}")
logger.info(f"Response additional_kwargs: {getattr(response, 'additional_kwargs', {})}")
# Check if LLM wants to call tools
tool_calls_to_execute = []
# Method 1: Proper tool calls
if hasattr(response, 'tool_calls') and response.tool_calls:
tool_calls_to_execute = response.tool_calls
# Method 2: Fallback - check if model mentioned search in content and user asked for current info
elif ("search" in message.lower() or "today" in message.lower() or "current" in message.lower() or "recent" in message.lower()) and self.tools:
if ENABLE_DETAILED_LOGGING:
logger.info(f"=== FALLBACK TOOL CALLING ===")
logger.info(f"Detected need for search based on keywords")
# Manually trigger web search
import uuid
tool_calls_to_execute = [{
'name': 'web_search',
'args': {'query': message},
'id': str(uuid.uuid4())
}]
if tool_calls_to_execute:
if ENABLE_DETAILED_LOGGING:
logger.info(f"=== TOOL CALLS DETECTED ===")
logger.info(f"Number of tool calls: {len(tool_calls_to_execute)}")
# Add the LLM response to messages
messages.append(response)
# Execute tool calls
for tool_call in tool_calls_to_execute:
if ENABLE_DETAILED_LOGGING:
logger.info(f"Executing tool: {tool_call['name']} with args: {tool_call['args']}")
# Find and execute the tool
tool_result = None
for tool in self.tools:
if tool.name == tool_call['name']:
try:
tool_result = tool.invoke(tool_call['args'])
if ENABLE_DETAILED_LOGGING:
logger.info(f"Tool executed successfully: {tool_call['name']}")
break
except Exception as e:
tool_result = f"Tool execution failed: {str(e)}"
if ENABLE_DETAILED_LOGGING:
logger.error(f"Tool execution failed: {e}")
if tool_result is None:
tool_result = f"Tool {tool_call['name']} not found"
# Add tool result to messages
messages.append(ToolMessage(
content=str(tool_result),
tool_call_id=tool_call['id']
))
# Get final response from LLM after tool execution
if ENABLE_DETAILED_LOGGING:
logger.info(f"=== GETTING FINAL RESPONSE ===")
final_response = self.llm_with_tools.invoke(messages)
final_message = final_response.content
else:
# No tool calls, use the direct response
final_message = response.content
if ENABLE_DETAILED_LOGGING:
logger.info(f"=== FINAL MESSAGE ===")
logger.info(f"Final message: {final_message}")
return final_message
except Exception as e:
error_msg = f"Agent error: {str(e)}"
logger.error(f"=== AGENT ERROR ===")
logger.error(f"Error: {e}")
logger.error(f"Error type: {type(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
return error_msg
# Global agent instance
tool_calling_agent = ToolCallingAgentChat(llm_ip, llm_port, llm_key, llm_model)
def generate_response(message: str, history: List[List[str]], max_tokens: int):
"""Generate response using tool calling agent with dynamic system prompt"""
global tool_calling_agent
try:
# Configuration is pre-loaded from environment variables
# No runtime config changes allowed for security
# Reset conversation if history is empty (indicates clear button was pressed)
if len(history) == 0:
tool_calling_agent.reset_conversation()
# Generate response
response = tool_calling_agent.chat(message, history)
# Stream the response word by word for better UX
words = response.split()
current_response = ""
for word in words:
current_response += word + " "
yield current_response.strip()
except Exception as e:
error_msg = f"Error: {str(e)}"
logger.error(error_msg)
yield error_msg
# CSS to fix avatar distortion and positioning issues
avatar_css = """
/* Fix avatar image distortion with multiple selectors for compatibility */
#chatbot .avatar-container img,
#chatbot .message-row img,
#chatbot .message img,
#chatbot img[src*="twemoji"],
#chatbot img[src*="huggingface"],
.gr-chatbot img {
width: 40px !important;
height: 40px !important;
min-width: 40px !important;
min-height: 40px !important;
max-width: 40px !important;
max-height: 40px !important;
border-radius: 50% !important;
object-fit: cover !important;
aspect-ratio: 1 / 1 !important;
margin: 0px !important;
padding: 0px !important;
display: block !important;
flex-shrink: 0 !important;
}
/* Force square containers */
#chatbot .avatar-container,
.gr-chatbot .avatar-container {
width: 40px !important;
height: 40px !important;
min-width: 40px !important;
min-height: 40px !important;
flex-shrink: 0 !important;
display: flex !important;
align-items: center !important;
justify-content: center !important;
}
"""
# Create Gradio ChatInterface
chatbot = gr.ChatInterface(
generate_response,
chatbot=gr.Chatbot(
avatar_images=[
"https://cdn.jsdelivr.net/gh/twitter/twemoji@latest/assets/72x72/1f464.png", # User avatar (person emoji)
"https://cdn-avatars.huggingface.co/v1/production/uploads/64e6d37e02dee9bcb9d9fa18/o_HhUnXb_PgyYlqJ6gfEO.png" # Bot avatar
],
height="64vh",
elem_id="chatbot"
),
additional_inputs=[
gr.Slider(50, 8192, label="Max Tokens", value=1024,
info="Maximum number of tokens in the response"),
],
title="🤖 DQ Micro Agent",
description="DevQuasar self hosted Micro Agent with websearch capabilities",
theme="finlaymacklon/smooth_slate",
css=avatar_css
)
if __name__ == "__main__":
chatbot.queue().launch()