import gradio as gr import openai import requests import json from typing import Dict, Any, List, Tuple from datetime import datetime import os class MCPClient: """MCP Client for communicating with the MCP server""" def __init__(self, server_url: str): self.server_url = server_url.rstrip('/') def call_tool_sync(self, tool_name: str, arguments: Dict[str, Any] = None) -> Dict[str, Any]: """Synchronous tool call using requests instead of aiohttp""" if arguments is None: arguments = {} mcp_request = { "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": tool_name, "arguments": arguments } } try: response = requests.post( f"{self.server_url}/mcp", json=mcp_request, headers={ "Content-Type": "application/json", "ngrok-skip-browser-warning": "true" }, timeout=30 ) if response.status_code == 200: result = response.json() if "result" in result and "content" in result["result"]: content = result["result"]["content"][0]["text"] return json.loads(content) return result else: return { "success": False, "error": f"HTTP {response.status_code}: {response.text}" } except Exception as e: return { "success": False, "error": f"Connection error: {str(e)}" } def list_tools_sync(self) -> List[Dict[str, Any]]: """Synchronous tool listing using requests""" mcp_request = { "jsonrpc": "2.0", "id": 1, "method": "tools/list" } try: response = requests.post( f"{self.server_url}/mcp", json=mcp_request, headers={ "Content-Type": "application/json", "ngrok-skip-browser-warning": "true" }, timeout=30 ) if response.status_code == 200: result = response.json() return result.get("result", {}).get("tools", []) return [] except Exception as e: print(f"Error listing tools: {str(e)}") return [] class AIAssistant: """AI Assistant with MCP integration""" def __init__(self, openai_api_key: str, mcp_client: MCPClient): try: self.openai_client = openai.OpenAI( api_key=openai_api_key, timeout=30.0 ) except Exception as e: # Fallback for older OpenAI versions openai.api_key = openai_api_key self.openai_client = openai self.mcp_client = mcp_client self.available_tools = [] def initialize(self): """Initialize the assistant by fetching available tools""" self.available_tools = self.mcp_client.list_tools_sync() def get_system_prompt(self) -> str: """Generate system prompt with available tools""" tools_description = "\n".join([ f"- {tool['name']}: {tool['description']}" for tool in self.available_tools ]) return f"""You are an AI assistant with access to SAP business systems and news data through specialized tools. Available tools: {tools_description} When a user asks for information that can be retrieved using these tools, you should: 1. Identify which tool(s) would be helpful 2. Call the appropriate tool(s) with the right parameters 3. Wait for the results before providing your final response For SAP-related queries (purchase orders, requisitions), use the SAP tools. For news-related queries, use the news tools. To call a tool, use this exact format: CALL_TOOL: tool_name or CALL_TOOL: tool_name(parameter1=value1, parameter2=value2) Examples: - For "show me purchase orders": CALL_TOOL: get_purchase_orders - For "get 20 purchase orders": CALL_TOOL: get_purchase_orders(top=20) - For "latest tech news": CALL_TOOL: get_news_headlines(category=technology) - For "get news from BBC": CALL_TOOL: get_news_by_source(source_id=bbc-news) - For "get news from CNN": CALL_TOOL: get_news_by_source(source_id=cnn) - For "get news from Reuters": CALL_TOOL: get_news_by_source(source_id=reuters) IMPORTANT: For news by source queries, always include the source_id parameter: - BBC: source_id=bbc-news - CNN: source_id=cnn - Reuters: source_id=reuters - Associated Press: source_id=associated-press - The Guardian: source_id=the-guardian - Washington Post: source_id=the-washington-post After calling a tool, I will provide you with the results to interpret for the user. """ def extract_tool_calls(self, response: str) -> List[Dict[str, Any]]: """Extract tool calls from AI response""" tool_calls = [] lines = response.split('\n') for line in lines: line = line.strip() if line.startswith('CALL_TOOL:'): try: # Remove 'CALL_TOOL:' prefix and clean up tool_part = line[10:].strip() # Handle cases with or without parentheses if '(' in tool_part and ')' in tool_part: tool_name = tool_part.split('(')[0].strip() params_str = tool_part.split('(')[1].split(')')[0] params = {} if params_str.strip(): for param in params_str.split(','): if '=' in param: key, value = param.split('=', 1) key = key.strip() value = value.strip().strip('"\'') try: if value.isdigit(): value = int(value) elif value.lower() in ['true', 'false']: value = value.lower() == 'true' except: pass params[key] = value tool_calls.append({ 'name': tool_name, 'arguments': params }) else: # Simple tool call without parameters tool_name = tool_part.strip() tool_calls.append({ 'name': tool_name, 'arguments': {} }) except Exception as e: print(f"Error parsing tool call '{line}': {e}") continue return tool_calls def truncate_tool_result(self, result: Dict[str, Any], max_chars: int = 2000) -> Dict[str, Any]: """Truncate tool results to prevent context overflow""" if not isinstance(result, dict): return result result_copy = result.copy() result_str = json.dumps(result_copy, indent=2) if len(result_str) > max_chars: # Try to truncate data arrays/lists first for key, value in result_copy.items(): if isinstance(value, list) and len(value) > 3: result_copy[key] = value[:3] + [f"... ({len(value) - 3} more items truncated)"] elif isinstance(value, str) and len(value) > 500: result_copy[key] = value[:500] + "... (truncated)" # If still too long, add truncation notice result_str = json.dumps(result_copy, indent=2) if len(result_str) > max_chars: result_copy = { "success": result.get("success", False), "truncated": True, "message": f"Result truncated due to size. Original had {len(result_str)} characters.", "sample_data": str(result)[:1000] + "..." if len(str(result)) > 1000 else str(result) } return result_copy def process_message(self, user_message: str) -> Tuple[str, str]: """Process user message and handle tool calls""" tool_info = "" try: messages = [ {"role": "system", "content": self.get_system_prompt()}, {"role": "user", "content": user_message} ] # Check if we have a proper OpenAI client if hasattr(self.openai_client, 'chat'): response = self.openai_client.chat.completions.create( model="gpt-3.5-turbo", messages=messages, temperature=0.7, max_tokens=800 # Reduced to leave more room for context ) ai_response = response.choices[0].message.content else: # Fallback for older API response = self.openai_client.ChatCompletion.create( model="gpt-3.5-turbo", messages=messages, temperature=0.7, max_tokens=800 ) ai_response = response.choices[0].message.content tool_calls = self.extract_tool_calls(ai_response) # Debug information print(f"AI Response: {ai_response}") print(f"Extracted tool calls: {tool_calls}") if tool_calls: tool_results = [] for tool_call in tool_calls: tool_info += f"🔧 Calling: {tool_call['name']}\n" # FIXED: Use call_tool_sync instead of await call_tool result = self.mcp_client.call_tool_sync( tool_call['name'], tool_call['arguments'] ) # Truncate large results to prevent context overflow truncated_result = self.truncate_tool_result(result) tool_results.append({ 'tool': tool_call['name'], 'result': truncated_result }) if result.get('success'): tool_info += f"✅ {tool_call['name']} completed\n" else: tool_info += f"❌ {tool_call['name']} failed: {result.get('error', 'Unknown error')}\n" # Create concise tool results summary tool_results_text = "\n\n".join([ f"Tool: {tr['tool']}\nResult: {json.dumps(tr['result'], indent=2)[:1500]}{'...(truncated)' if len(json.dumps(tr['result'], indent=2)) > 1500 else ''}" for tr in tool_results ]) final_messages = messages + [ {"role": "assistant", "content": ai_response}, {"role": "user", "content": f"Here are the tool results:\n\n{tool_results_text}\n\nPlease interpret these results and provide a helpful response to the user."} ] # Get final response with tool results if hasattr(self.openai_client, 'chat'): final_response = self.openai_client.chat.completions.create( model="gpt-3.5-turbo", messages=final_messages, temperature=0.7, max_tokens=800 # Reduced max tokens ) return final_response.choices[0].message.content, tool_info else: final_response = self.openai_client.ChatCompletion.create( model="gpt-3.5-turbo", messages=final_messages, temperature=0.7, max_tokens=800 ) return final_response.choices[0].message.content, tool_info else: return ai_response, "" except Exception as e: return f"❌ Error processing your request: {str(e)}", "" # Global variables assistant = None mcp_client = None def test_connection(mcp_url): """Test MCP server connection""" if not mcp_url or mcp_url == "https://your-ngrok-url.ngrok.io": return "❌ Please enter a valid MCP server URL" try: # Test health endpoint response = requests.get(f"{mcp_url.rstrip('/')}/health", timeout=10) if response.status_code == 200: data = response.json() # Test MCP tools list mcp_request = { "jsonrpc": "2.0", "id": 1, "method": "tools/list" } mcp_response = requests.post( f"{mcp_url.rstrip('/')}/mcp", json=mcp_request, headers={ "Content-Type": "application/json", "ngrok-skip-browser-warning": "true" }, timeout=10 ) if mcp_response.status_code == 200: mcp_data = mcp_response.json() tools = mcp_data.get("result", {}).get("tools", []) tool_names = [tool.get("name", "Unknown") for tool in tools] return f"✅ Connected successfully!\nHealth Status: {data.get('status', 'Unknown')}\nMCP Tools: {len(tools)}\nAvailable: {', '.join(tool_names)}" else: return f"✅ Health OK, but MCP endpoint failed: HTTP {mcp_response.status_code}" else: return f"❌ Connection failed: HTTP {response.status_code}" except Exception as e: return f"❌ Connection error: {str(e)}" def initialize_assistant(openai_key, mcp_url): """Initialize the AI assistant""" global assistant, mcp_client if not openai_key: return "❌ Please enter your OpenAI API key" if not mcp_url or mcp_url == "https://your-ngrok-url.ngrok.io": return "❌ Please enter a valid MCP server URL" try: mcp_client = MCPClient(mcp_url) assistant = AIAssistant(openai_key, mcp_client) assistant.initialize() return f"✅ AI Assistant initialized with {len(assistant.available_tools)} tools available" except Exception as e: return f"❌ Failed to initialize: {str(e)}" def chat_interface(message, history, openai_key, mcp_url): """Main chat interface""" global assistant if not assistant: init_result = initialize_assistant(openai_key, mcp_url) if "❌" in init_result: history.append([message, init_result]) return history, "" try: print(f"Calling process_message with: {message}") # Limit conversation history to prevent context overflow # Keep only the last 5 exchanges (10 messages total) if len(history) > 10: history = history[-10:] # Make sure we call the synchronous method result = assistant.process_message(message) print(f"process_message returned: {type(result)} - {result}") # Check if result is a tuple (response, tool_info) if isinstance(result, tuple) and len(result) == 2: response, tool_info = result print(f"Unpacked: response={response}, tool_info={tool_info}") else: response = str(result) tool_info = "" print(f"Single result: {response}") # Format response with tool info if available if tool_info: full_response = f"**Tool Execution:**\n{tool_info}\n\n**Response:**\n{response}" else: full_response = response history.append([message, full_response]) return history, "" except Exception as e: import traceback error_response = f"❌ Error: {str(e)}\n\nTraceback:\n{traceback.format_exc()}" print(f"Error in chat_interface: {error_response}") history.append([message, error_response]) return history, "" # Create Gradio interface with gr.Blocks(title="AI Assistant with SAP & News Integration", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🤖 AI Assistant with SAP & News Integration") gr.Markdown("Chat with an AI that can access SAP business data and news through natural language queries.") with gr.Row(): with gr.Column(scale=2): chatbot = gr.Chatbot( height=500, show_label=False, container=True, bubble_full_width=False ) msg = gr.Textbox( placeholder="Ask me about SAP data, news, or anything else...", show_label=False, container=False ) with gr.Row(): submit_btn = gr.Button("Send", variant="primary") clear_btn = gr.Button("Clear", variant="secondary") with gr.Column(scale=1): gr.Markdown("### ⚙️ Configuration") openai_key = gr.Textbox( label="OpenAI API Key", type="password", placeholder="sk-..." ) mcp_url = gr.Textbox( label="MCP Server URL", value="https://your-ngrok-url.ngrok.io", placeholder="https://abc123.ngrok.io" ) test_btn = gr.Button("Test Connection", variant="secondary") connection_status = gr.Textbox(label="Connection Status", interactive=False) gr.Markdown("### 📋 Example Queries") gr.Markdown(""" - "Show me recent purchase orders" - "Get purchase requisitions" - "What's the latest tech news?" - "Get news from BBC" - "Show me business news from the US" """) # Event handlers def respond(message, history, openai_key, mcp_url): return chat_interface(message, history, openai_key, mcp_url) submit_btn.click( respond, [msg, chatbot, openai_key, mcp_url], [chatbot, msg] ) msg.submit( respond, [msg, chatbot, openai_key, mcp_url], [chatbot, msg] ) clear_btn.click(lambda: ([], ""), outputs=[chatbot, msg]) test_btn.click( test_connection, [mcp_url], [connection_status] ) if __name__ == "__main__": demo.launch()