client2 / app.py
PD03's picture
Update app.py
77f67f6 verified
import gradio as gr
import openai
import requests
import json
from typing import Dict, Any, List, Tuple
from datetime import datetime
import os
class MCPClient:
"""MCP Client for communicating with the MCP server"""
def __init__(self, server_url: str):
self.server_url = server_url.rstrip('/')
def call_tool_sync(self, tool_name: str, arguments: Dict[str, Any] = None) -> Dict[str, Any]:
"""Synchronous tool call using requests instead of aiohttp"""
if arguments is None:
arguments = {}
mcp_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": arguments
}
}
try:
response = requests.post(
f"{self.server_url}/mcp",
json=mcp_request,
headers={
"Content-Type": "application/json",
"ngrok-skip-browser-warning": "true"
},
timeout=30
)
if response.status_code == 200:
result = response.json()
if "result" in result and "content" in result["result"]:
content = result["result"]["content"][0]["text"]
return json.loads(content)
return result
else:
return {
"success": False,
"error": f"HTTP {response.status_code}: {response.text}"
}
except Exception as e:
return {
"success": False,
"error": f"Connection error: {str(e)}"
}
def list_tools_sync(self) -> List[Dict[str, Any]]:
"""Synchronous tool listing using requests"""
mcp_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list"
}
try:
response = requests.post(
f"{self.server_url}/mcp",
json=mcp_request,
headers={
"Content-Type": "application/json",
"ngrok-skip-browser-warning": "true"
},
timeout=30
)
if response.status_code == 200:
result = response.json()
return result.get("result", {}).get("tools", [])
return []
except Exception as e:
print(f"Error listing tools: {str(e)}")
return []
class AIAssistant:
"""AI Assistant with MCP integration"""
def __init__(self, openai_api_key: str, mcp_client: MCPClient):
try:
self.openai_client = openai.OpenAI(
api_key=openai_api_key,
timeout=30.0
)
except Exception as e:
# Fallback for older OpenAI versions
openai.api_key = openai_api_key
self.openai_client = openai
self.mcp_client = mcp_client
self.available_tools = []
def initialize(self):
"""Initialize the assistant by fetching available tools"""
self.available_tools = self.mcp_client.list_tools_sync()
def get_system_prompt(self) -> str:
"""Generate system prompt with available tools"""
tools_description = "\n".join([
f"- {tool['name']}: {tool['description']}"
for tool in self.available_tools
])
return f"""You are an AI assistant with access to SAP business systems and news data through specialized tools.
Available tools:
{tools_description}
When a user asks for information that can be retrieved using these tools, you should:
1. Identify which tool(s) would be helpful
2. Call the appropriate tool(s) with the right parameters
3. Wait for the results before providing your final response
For SAP-related queries (purchase orders, requisitions), use the SAP tools.
For news-related queries, use the news tools.
To call a tool, use this exact format:
CALL_TOOL: tool_name
or
CALL_TOOL: tool_name(parameter1=value1, parameter2=value2)
Examples:
- For "show me purchase orders": CALL_TOOL: get_purchase_orders
- For "get 20 purchase orders": CALL_TOOL: get_purchase_orders(top=20)
- For "latest tech news": CALL_TOOL: get_news_headlines(category=technology)
- For "get news from BBC": CALL_TOOL: get_news_by_source(source_id=bbc-news)
- For "get news from CNN": CALL_TOOL: get_news_by_source(source_id=cnn)
- For "get news from Reuters": CALL_TOOL: get_news_by_source(source_id=reuters)
IMPORTANT: For news by source queries, always include the source_id parameter:
- BBC: source_id=bbc-news
- CNN: source_id=cnn
- Reuters: source_id=reuters
- Associated Press: source_id=associated-press
- The Guardian: source_id=the-guardian
- Washington Post: source_id=the-washington-post
After calling a tool, I will provide you with the results to interpret for the user.
"""
def extract_tool_calls(self, response: str) -> List[Dict[str, Any]]:
"""Extract tool calls from AI response"""
tool_calls = []
lines = response.split('\n')
for line in lines:
line = line.strip()
if line.startswith('CALL_TOOL:'):
try:
# Remove 'CALL_TOOL:' prefix and clean up
tool_part = line[10:].strip()
# Handle cases with or without parentheses
if '(' in tool_part and ')' in tool_part:
tool_name = tool_part.split('(')[0].strip()
params_str = tool_part.split('(')[1].split(')')[0]
params = {}
if params_str.strip():
for param in params_str.split(','):
if '=' in param:
key, value = param.split('=', 1)
key = key.strip()
value = value.strip().strip('"\'')
try:
if value.isdigit():
value = int(value)
elif value.lower() in ['true', 'false']:
value = value.lower() == 'true'
except:
pass
params[key] = value
tool_calls.append({
'name': tool_name,
'arguments': params
})
else:
# Simple tool call without parameters
tool_name = tool_part.strip()
tool_calls.append({
'name': tool_name,
'arguments': {}
})
except Exception as e:
print(f"Error parsing tool call '{line}': {e}")
continue
return tool_calls
def truncate_tool_result(self, result: Dict[str, Any], max_chars: int = 2000) -> Dict[str, Any]:
"""Truncate tool results to prevent context overflow"""
if not isinstance(result, dict):
return result
result_copy = result.copy()
result_str = json.dumps(result_copy, indent=2)
if len(result_str) > max_chars:
# Try to truncate data arrays/lists first
for key, value in result_copy.items():
if isinstance(value, list) and len(value) > 3:
result_copy[key] = value[:3] + [f"... ({len(value) - 3} more items truncated)"]
elif isinstance(value, str) and len(value) > 500:
result_copy[key] = value[:500] + "... (truncated)"
# If still too long, add truncation notice
result_str = json.dumps(result_copy, indent=2)
if len(result_str) > max_chars:
result_copy = {
"success": result.get("success", False),
"truncated": True,
"message": f"Result truncated due to size. Original had {len(result_str)} characters.",
"sample_data": str(result)[:1000] + "..." if len(str(result)) > 1000 else str(result)
}
return result_copy
def process_message(self, user_message: str) -> Tuple[str, str]:
"""Process user message and handle tool calls"""
tool_info = ""
try:
messages = [
{"role": "system", "content": self.get_system_prompt()},
{"role": "user", "content": user_message}
]
# Check if we have a proper OpenAI client
if hasattr(self.openai_client, 'chat'):
response = self.openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
temperature=0.7,
max_tokens=800 # Reduced to leave more room for context
)
ai_response = response.choices[0].message.content
else:
# Fallback for older API
response = self.openai_client.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=messages,
temperature=0.7,
max_tokens=800
)
ai_response = response.choices[0].message.content
tool_calls = self.extract_tool_calls(ai_response)
# Debug information
print(f"AI Response: {ai_response}")
print(f"Extracted tool calls: {tool_calls}")
if tool_calls:
tool_results = []
for tool_call in tool_calls:
tool_info += f"πŸ”§ Calling: {tool_call['name']}\n"
# FIXED: Use call_tool_sync instead of await call_tool
result = self.mcp_client.call_tool_sync(
tool_call['name'],
tool_call['arguments']
)
# Truncate large results to prevent context overflow
truncated_result = self.truncate_tool_result(result)
tool_results.append({
'tool': tool_call['name'],
'result': truncated_result
})
if result.get('success'):
tool_info += f"βœ… {tool_call['name']} completed\n"
else:
tool_info += f"❌ {tool_call['name']} failed: {result.get('error', 'Unknown error')}\n"
# Create concise tool results summary
tool_results_text = "\n\n".join([
f"Tool: {tr['tool']}\nResult: {json.dumps(tr['result'], indent=2)[:1500]}{'...(truncated)' if len(json.dumps(tr['result'], indent=2)) > 1500 else ''}"
for tr in tool_results
])
final_messages = messages + [
{"role": "assistant", "content": ai_response},
{"role": "user", "content": f"Here are the tool results:\n\n{tool_results_text}\n\nPlease interpret these results and provide a helpful response to the user."}
]
# Get final response with tool results
if hasattr(self.openai_client, 'chat'):
final_response = self.openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=final_messages,
temperature=0.7,
max_tokens=800 # Reduced max tokens
)
return final_response.choices[0].message.content, tool_info
else:
final_response = self.openai_client.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=final_messages,
temperature=0.7,
max_tokens=800
)
return final_response.choices[0].message.content, tool_info
else:
return ai_response, ""
except Exception as e:
return f"❌ Error processing your request: {str(e)}", ""
# Global variables
assistant = None
mcp_client = None
def test_connection(mcp_url):
"""Test MCP server connection"""
if not mcp_url or mcp_url == "https://your-ngrok-url.ngrok.io":
return "❌ Please enter a valid MCP server URL"
try:
# Test health endpoint
response = requests.get(f"{mcp_url.rstrip('/')}/health", timeout=10)
if response.status_code == 200:
data = response.json()
# Test MCP tools list
mcp_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list"
}
mcp_response = requests.post(
f"{mcp_url.rstrip('/')}/mcp",
json=mcp_request,
headers={
"Content-Type": "application/json",
"ngrok-skip-browser-warning": "true"
},
timeout=10
)
if mcp_response.status_code == 200:
mcp_data = mcp_response.json()
tools = mcp_data.get("result", {}).get("tools", [])
tool_names = [tool.get("name", "Unknown") for tool in tools]
return f"βœ… Connected successfully!\nHealth Status: {data.get('status', 'Unknown')}\nMCP Tools: {len(tools)}\nAvailable: {', '.join(tool_names)}"
else:
return f"βœ… Health OK, but MCP endpoint failed: HTTP {mcp_response.status_code}"
else:
return f"❌ Connection failed: HTTP {response.status_code}"
except Exception as e:
return f"❌ Connection error: {str(e)}"
def initialize_assistant(openai_key, mcp_url):
"""Initialize the AI assistant"""
global assistant, mcp_client
if not openai_key:
return "❌ Please enter your OpenAI API key"
if not mcp_url or mcp_url == "https://your-ngrok-url.ngrok.io":
return "❌ Please enter a valid MCP server URL"
try:
mcp_client = MCPClient(mcp_url)
assistant = AIAssistant(openai_key, mcp_client)
assistant.initialize()
return f"βœ… AI Assistant initialized with {len(assistant.available_tools)} tools available"
except Exception as e:
return f"❌ Failed to initialize: {str(e)}"
def chat_interface(message, history, openai_key, mcp_url):
"""Main chat interface"""
global assistant
if not assistant:
init_result = initialize_assistant(openai_key, mcp_url)
if "❌" in init_result:
history.append([message, init_result])
return history, ""
try:
print(f"Calling process_message with: {message}")
# Limit conversation history to prevent context overflow
# Keep only the last 5 exchanges (10 messages total)
if len(history) > 10:
history = history[-10:]
# Make sure we call the synchronous method
result = assistant.process_message(message)
print(f"process_message returned: {type(result)} - {result}")
# Check if result is a tuple (response, tool_info)
if isinstance(result, tuple) and len(result) == 2:
response, tool_info = result
print(f"Unpacked: response={response}, tool_info={tool_info}")
else:
response = str(result)
tool_info = ""
print(f"Single result: {response}")
# Format response with tool info if available
if tool_info:
full_response = f"**Tool Execution:**\n{tool_info}\n\n**Response:**\n{response}"
else:
full_response = response
history.append([message, full_response])
return history, ""
except Exception as e:
import traceback
error_response = f"❌ Error: {str(e)}\n\nTraceback:\n{traceback.format_exc()}"
print(f"Error in chat_interface: {error_response}")
history.append([message, error_response])
return history, ""
# Create Gradio interface
with gr.Blocks(title="AI Assistant with SAP & News Integration", theme=gr.themes.Soft()) as demo:
gr.Markdown("# πŸ€– AI Assistant with SAP & News Integration")
gr.Markdown("Chat with an AI that can access SAP business data and news through natural language queries.")
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot(
height=500,
show_label=False,
container=True,
bubble_full_width=False
)
msg = gr.Textbox(
placeholder="Ask me about SAP data, news, or anything else...",
show_label=False,
container=False
)
with gr.Row():
submit_btn = gr.Button("Send", variant="primary")
clear_btn = gr.Button("Clear", variant="secondary")
with gr.Column(scale=1):
gr.Markdown("### βš™οΈ Configuration")
openai_key = gr.Textbox(
label="OpenAI API Key",
type="password",
placeholder="sk-..."
)
mcp_url = gr.Textbox(
label="MCP Server URL",
value="https://your-ngrok-url.ngrok.io",
placeholder="https://abc123.ngrok.io"
)
test_btn = gr.Button("Test Connection", variant="secondary")
connection_status = gr.Textbox(label="Connection Status", interactive=False)
gr.Markdown("### πŸ“‹ Example Queries")
gr.Markdown("""
- "Show me recent purchase orders"
- "Get purchase requisitions"
- "What's the latest tech news?"
- "Get news from BBC"
- "Show me business news from the US"
""")
# Event handlers
def respond(message, history, openai_key, mcp_url):
return chat_interface(message, history, openai_key, mcp_url)
submit_btn.click(
respond,
[msg, chatbot, openai_key, mcp_url],
[chatbot, msg]
)
msg.submit(
respond,
[msg, chatbot, openai_key, mcp_url],
[chatbot, msg]
)
clear_btn.click(lambda: ([], ""), outputs=[chatbot, msg])
test_btn.click(
test_connection,
[mcp_url],
[connection_status]
)
if __name__ == "__main__":
demo.launch()