import os import gradio as gr from typing import List import logging import logging.handlers import json from datetime import datetime from langchain_openai import ChatOpenAI from langchain_core.tools import tool from langchain_core.messages import HumanMessage, AIMessage, ToolMessage from langchain_tavily import TavilySearch # Configuration - set to False to disable detailed logging ENABLE_DETAILED_LOGGING = True # Setup logging with rotation (7 days max) if ENABLE_DETAILED_LOGGING: # Create formatter formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') # Setup console handler console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) # Setup rotating file handler (7 days, daily rotation) file_handler = logging.handlers.TimedRotatingFileHandler( 'agent.log', when='midnight', interval=1, backupCount=7, # Keep 7 days of logs encoding='utf-8' ) file_handler.setFormatter(formatter) # Configure root logger logging.basicConfig( level=logging.INFO, handlers=[console_handler, file_handler] ) else: logging.basicConfig(level=logging.WARNING) logger = logging.getLogger(__name__) def get_current_date() -> str: """Get current date in YYYY-MM-DD format""" return datetime.now().strftime("%Y-%m-%d") def create_system_prompt() -> str: """Create dynamic system prompt with current date""" current_date = get_current_date() return f"""You are a helpful AI assistant with web search capabilities. TODAY'S DATE: {current_date} IMPORTANT: You have access to a web_search tool. Consider your knowledge cutoff date and today's date to decide when to search: USE WEB SEARCH when users ask about: - Events after your knowledge cutoff date - Current events, breaking news, or recent developments - Today's date, current time, or "what's happening now" - Real-time data (weather, stock prices, sports scores) - Recent updates to ongoing situations - Information that changes frequently - When users explicitly ask you to search the web DO NOT use web search for: - Historical facts before your cutoff date - General knowledge that doesn't change (capitals, basic science, etc.) - Established facts and concepts - Personal advice or opinions When in doubt about whether information might be outdated, use web search to get the most current information.""" # Configuration from environment variables llm_ip = os.environ.get('public_ip') llm_port = os.environ.get('port') llm_key = os.environ.get('api_key') llm_model = os.environ.get('model') # Tavily API configuration tavily_key = os.environ.get('tavily_key', '') if ENABLE_DETAILED_LOGGING: logger.info(f"Tavily API key present: {bool(tavily_key)}") if tavily_key: logger.info(f"Tavily API key length: {len(tavily_key)}") else: logger.warning("No Tavily API key found in environment variables") # Tool calling agent implementation class ToolCallingAgentChat: def __init__(self, ip: str, port: str, api_key: str, model: str): self.ip = ip self.port = port self.api_key = api_key self.model = model self.llm = None self.tools = [] self.conversation_id = None self._setup_agent() def reset_conversation(self): """Reset conversation state""" import uuid self.conversation_id = str(uuid.uuid4()) if ENABLE_DETAILED_LOGGING: logger.info(f"=== CONVERSATION RESET ===") logger.info(f"New conversation ID: {self.conversation_id}") def _setup_agent(self): """Initialize the tool calling agent""" try: if ENABLE_DETAILED_LOGGING: logger.info(f"=== SETTING UP TOOL CALLING AGENT ===") protocol = "https" if self.port in ["443", "11443"] or "https" in self.ip else "http" logger.info(f"LLM URL: {protocol}://{self.ip}:{self.port}/v1") logger.info(f"Model: {self.model}") logger.info(f"Using protocol: {protocol}") # Create OpenAI-compatible model with HTTPS support protocol = "https" if self.port in ["443", "11443"] or "https" in self.ip else "http" base_url = f"{protocol}://{self.ip}:{self.port}/v1" # Add custom headers for Cloudflare bypass cf_bypass_key = os.environ.get('cf_bypass_key', 'devquasar2025') self.llm = ChatOpenAI( base_url=base_url, api_key=self.api_key or "ollama", # Use provided key or default model=self.model, temperature=0.7, default_headers={ "x-cf-bypass": cf_bypass_key, "User-Agent": "DevQuasar-Agent/1.0" } ) if ENABLE_DETAILED_LOGGING: logger.info("LLM created successfully") # Define web search tool if tavily_key: if ENABLE_DETAILED_LOGGING: logger.info("Setting up Tavily search tool") try: @tool def web_search(query: str) -> str: """Search the web for current information about any topic. Use this when you need up-to-date information, current events, or real-time data.""" try: tavily_tool = TavilySearch( tavily_api_key=tavily_key, max_results=5, topic="general", include_answer=True, search_depth="advanced" ) result = tavily_tool.invoke({"query": query}) if ENABLE_DETAILED_LOGGING: logger.info(f"Tavily search successful for query: {query}") return result except Exception as e: error_str = str(e).lower() if ENABLE_DETAILED_LOGGING: logger.error(f"Tavily search failed for query '{query}': {e}") logger.error(f"Exception type: {type(e).__name__}") # Check for rate limit or quota issues if any(keyword in error_str for keyword in ['rate limit', 'quota', 'limit exceeded', 'usage limit', 'billing']): if ENABLE_DETAILED_LOGGING: logger.warning(f"Tavily rate limit/quota exceeded: {e}") return "I can't search the web right now due to rate limits." else: if ENABLE_DETAILED_LOGGING: logger.error(f"Tavily API error: {e}") return f"I can't search the web right now. Error: {str(e)[:100]}" self.tools = [web_search] if ENABLE_DETAILED_LOGGING: logger.info("Tavily search tool created successfully") except Exception as e: if ENABLE_DETAILED_LOGGING: logger.error(f"Failed to create Tavily tool: {e}") self.tools = [] else: if ENABLE_DETAILED_LOGGING: logger.warning("No Tavily API key found, no web search tool available") self.tools = [] # Bind tools to the model if self.tools: self.llm_with_tools = self.llm.bind_tools(self.tools) if ENABLE_DETAILED_LOGGING: logger.info(f"Tools bound to model: {[tool.name for tool in self.tools]}") else: self.llm_with_tools = self.llm if ENABLE_DETAILED_LOGGING: logger.info("No tools available, using base model") if ENABLE_DETAILED_LOGGING: logger.info("Tool calling agent created successfully") except Exception as e: logger.error(f"=== AGENT SETUP ERROR ===") logger.error(f"Failed to setup agent: {e}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") raise e def update_config(self, ip: str, port: str, api_key: str, model: str): """Update LLM configuration""" if (ip != self.ip or port != self.port or api_key != self.api_key or model != self.model): self.ip = ip self.port = port self.api_key = api_key self.model = model self._setup_agent() def chat(self, message: str, history: List[List[str]]) -> str: """Generate chat response using tool calling""" try: if not self.llm_with_tools: return "Error: Agent not initialized" if ENABLE_DETAILED_LOGGING: logger.info(f"=== USER INPUT ===") logger.info(f"Message: {message}") logger.info(f"History length: {len(history)}") # Convert history to messages for context with dynamic system message from langchain_core.messages import SystemMessage # Create dynamic system prompt with current date system_prompt = create_system_prompt() if ENABLE_DETAILED_LOGGING: logger.info(f"System prompt includes today's date: {get_current_date()}") messages = [SystemMessage(content=system_prompt)] for user_msg, assistant_msg in history: messages.append(HumanMessage(content=user_msg)) if assistant_msg: # Only add if assistant responded messages.append(AIMessage(content=assistant_msg)) # Add current message messages.append(HumanMessage(content=message)) # Get initial response from LLM if ENABLE_DETAILED_LOGGING: logger.info(f"=== INVOKING LLM ===") logger.info(f"Total messages in context: {len(messages)}") response = self.llm_with_tools.invoke(messages) if ENABLE_DETAILED_LOGGING: logger.info(f"=== LLM RESPONSE ===") logger.info(f"Response type: {type(response)}") logger.info(f"Response content: {response.content}") logger.info(f"Has tool calls: {bool(response.tool_calls if hasattr(response, 'tool_calls') else False)}") if hasattr(response, 'tool_calls') and response.tool_calls: logger.info(f"Tool calls: {response.tool_calls}") logger.info(f"Response additional_kwargs: {getattr(response, 'additional_kwargs', {})}") # Check if LLM wants to call tools tool_calls_to_execute = [] # Method 1: Proper tool calls if hasattr(response, 'tool_calls') and response.tool_calls: tool_calls_to_execute = response.tool_calls # Method 2: Fallback - check if model mentioned search in content and user asked for current info elif ("search" in message.lower() or "today" in message.lower() or "current" in message.lower() or "recent" in message.lower()) and self.tools: if ENABLE_DETAILED_LOGGING: logger.info(f"=== FALLBACK TOOL CALLING ===") logger.info(f"Detected need for search based on keywords") # Manually trigger web search import uuid tool_calls_to_execute = [{ 'name': 'web_search', 'args': {'query': message}, 'id': str(uuid.uuid4()) }] if tool_calls_to_execute: if ENABLE_DETAILED_LOGGING: logger.info(f"=== TOOL CALLS DETECTED ===") logger.info(f"Number of tool calls: {len(tool_calls_to_execute)}") # Add the LLM response to messages messages.append(response) # Execute tool calls for tool_call in tool_calls_to_execute: if ENABLE_DETAILED_LOGGING: logger.info(f"Executing tool: {tool_call['name']} with args: {tool_call['args']}") # Find and execute the tool tool_result = None for tool in self.tools: if tool.name == tool_call['name']: try: tool_result = tool.invoke(tool_call['args']) if ENABLE_DETAILED_LOGGING: logger.info(f"Tool executed successfully: {tool_call['name']}") break except Exception as e: tool_result = f"Tool execution failed: {str(e)}" if ENABLE_DETAILED_LOGGING: logger.error(f"Tool execution failed: {e}") if tool_result is None: tool_result = f"Tool {tool_call['name']} not found" # Add tool result to messages messages.append(ToolMessage( content=str(tool_result), tool_call_id=tool_call['id'] )) # Get final response from LLM after tool execution if ENABLE_DETAILED_LOGGING: logger.info(f"=== GETTING FINAL RESPONSE ===") final_response = self.llm_with_tools.invoke(messages) final_message = final_response.content else: # No tool calls, use the direct response final_message = response.content if ENABLE_DETAILED_LOGGING: logger.info(f"=== FINAL MESSAGE ===") logger.info(f"Final message: {final_message}") return final_message except Exception as e: error_msg = f"Agent error: {str(e)}" logger.error(f"=== AGENT ERROR ===") logger.error(f"Error: {e}") logger.error(f"Error type: {type(e)}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") return error_msg # Global agent instance tool_calling_agent = ToolCallingAgentChat(llm_ip, llm_port, llm_key, llm_model) def generate_response(message: str, history: List[List[str]], max_tokens: int): """Generate response using tool calling agent with dynamic system prompt""" global tool_calling_agent try: # Configuration is pre-loaded from environment variables # No runtime config changes allowed for security # Reset conversation if history is empty (indicates clear button was pressed) if len(history) == 0: tool_calling_agent.reset_conversation() # Generate response response = tool_calling_agent.chat(message, history) # Stream the response word by word for better UX words = response.split() current_response = "" for word in words: current_response += word + " " yield current_response.strip() except Exception as e: error_msg = f"Error: {str(e)}" logger.error(error_msg) yield error_msg # CSS to fix avatar distortion and positioning issues avatar_css = """ /* Fix avatar image distortion with multiple selectors for compatibility */ #chatbot .avatar-container img, #chatbot .message-row img, #chatbot .message img, #chatbot img[src*="twemoji"], #chatbot img[src*="huggingface"], .gr-chatbot img { width: 40px !important; height: 40px !important; min-width: 40px !important; min-height: 40px !important; max-width: 40px !important; max-height: 40px !important; border-radius: 50% !important; object-fit: cover !important; aspect-ratio: 1 / 1 !important; margin: 0px !important; padding: 0px !important; display: block !important; flex-shrink: 0 !important; } /* Force square containers */ #chatbot .avatar-container, .gr-chatbot .avatar-container { width: 40px !important; height: 40px !important; min-width: 40px !important; min-height: 40px !important; flex-shrink: 0 !important; display: flex !important; align-items: center !important; justify-content: center !important; } """ # Create Gradio ChatInterface chatbot = gr.ChatInterface( generate_response, chatbot=gr.Chatbot( avatar_images=[ "https://cdn.jsdelivr.net/gh/twitter/twemoji@latest/assets/72x72/1f464.png", # User avatar (person emoji) "https://cdn-avatars.huggingface.co/v1/production/uploads/64e6d37e02dee9bcb9d9fa18/o_HhUnXb_PgyYlqJ6gfEO.png" # Bot avatar ], height="64vh", elem_id="chatbot" ), additional_inputs=[ gr.Slider(50, 8192, label="Max Tokens", value=1024, info="Maximum number of tokens in the response"), ], title="🤖 DQ Micro Agent", description="DevQuasar self hosted Micro Agent with websearch capabilities", theme="finlaymacklon/smooth_slate", css=avatar_css ) if __name__ == "__main__": chatbot.queue().launch()