|
import gradio as gr |
|
import openai |
|
import requests |
|
import json |
|
from typing import Dict, Any, List, Tuple |
|
from datetime import datetime |
|
import os |
|
|
|
class MCPClient: |
|
"""MCP Client for communicating with the MCP server""" |
|
|
|
def __init__(self, server_url: str): |
|
self.server_url = server_url.rstrip('/') |
|
|
|
def call_tool_sync(self, tool_name: str, arguments: Dict[str, Any] = None) -> Dict[str, Any]: |
|
"""Synchronous tool call using requests instead of aiohttp""" |
|
if arguments is None: |
|
arguments = {} |
|
|
|
mcp_request = { |
|
"jsonrpc": "2.0", |
|
"id": 1, |
|
"method": "tools/call", |
|
"params": { |
|
"name": tool_name, |
|
"arguments": arguments |
|
} |
|
} |
|
|
|
try: |
|
response = requests.post( |
|
f"{self.server_url}/mcp", |
|
json=mcp_request, |
|
headers={ |
|
"Content-Type": "application/json", |
|
"ngrok-skip-browser-warning": "true" |
|
}, |
|
timeout=30 |
|
) |
|
|
|
if response.status_code == 200: |
|
result = response.json() |
|
if "result" in result and "content" in result["result"]: |
|
content = result["result"]["content"][0]["text"] |
|
return json.loads(content) |
|
return result |
|
else: |
|
return { |
|
"success": False, |
|
"error": f"HTTP {response.status_code}: {response.text}" |
|
} |
|
except Exception as e: |
|
return { |
|
"success": False, |
|
"error": f"Connection error: {str(e)}" |
|
} |
|
|
|
def list_tools_sync(self) -> List[Dict[str, Any]]: |
|
"""Synchronous tool listing using requests""" |
|
mcp_request = { |
|
"jsonrpc": "2.0", |
|
"id": 1, |
|
"method": "tools/list" |
|
} |
|
|
|
try: |
|
response = requests.post( |
|
f"{self.server_url}/mcp", |
|
json=mcp_request, |
|
headers={ |
|
"Content-Type": "application/json", |
|
"ngrok-skip-browser-warning": "true" |
|
}, |
|
timeout=30 |
|
) |
|
|
|
if response.status_code == 200: |
|
result = response.json() |
|
return result.get("result", {}).get("tools", []) |
|
return [] |
|
except Exception as e: |
|
print(f"Error listing tools: {str(e)}") |
|
return [] |
|
|
|
class AIAssistant: |
|
"""AI Assistant with MCP integration""" |
|
|
|
def __init__(self, openai_api_key: str, mcp_client: MCPClient): |
|
try: |
|
self.openai_client = openai.OpenAI( |
|
api_key=openai_api_key, |
|
timeout=30.0 |
|
) |
|
except Exception as e: |
|
|
|
openai.api_key = openai_api_key |
|
self.openai_client = openai |
|
self.mcp_client = mcp_client |
|
self.available_tools = [] |
|
|
|
def initialize(self): |
|
"""Initialize the assistant by fetching available tools""" |
|
self.available_tools = self.mcp_client.list_tools_sync() |
|
|
|
def get_system_prompt(self) -> str: |
|
"""Generate system prompt with available tools""" |
|
tools_description = "\n".join([ |
|
f"- {tool['name']}: {tool['description']}" |
|
for tool in self.available_tools |
|
]) |
|
|
|
return f"""You are an AI assistant with access to SAP business systems and news data through specialized tools. |
|
|
|
Available tools: |
|
{tools_description} |
|
|
|
When a user asks for information that can be retrieved using these tools, you should: |
|
1. Identify which tool(s) would be helpful |
|
2. Call the appropriate tool(s) with the right parameters |
|
3. Wait for the results before providing your final response |
|
|
|
For SAP-related queries (purchase orders, requisitions), use the SAP tools. |
|
For news-related queries, use the news tools. |
|
|
|
To call a tool, use this exact format: |
|
CALL_TOOL: tool_name |
|
or |
|
CALL_TOOL: tool_name(parameter1=value1, parameter2=value2) |
|
|
|
Examples: |
|
- For "show me purchase orders": CALL_TOOL: get_purchase_orders |
|
- For "get 20 purchase orders": CALL_TOOL: get_purchase_orders(top=20) |
|
- For "latest tech news": CALL_TOOL: get_news_headlines(category=technology) |
|
- For "get news from BBC": CALL_TOOL: get_news_by_source(source_id=bbc-news) |
|
- For "get news from CNN": CALL_TOOL: get_news_by_source(source_id=cnn) |
|
- For "get news from Reuters": CALL_TOOL: get_news_by_source(source_id=reuters) |
|
|
|
IMPORTANT: For news by source queries, always include the source_id parameter: |
|
- BBC: source_id=bbc-news |
|
- CNN: source_id=cnn |
|
- Reuters: source_id=reuters |
|
- Associated Press: source_id=associated-press |
|
- The Guardian: source_id=the-guardian |
|
- Washington Post: source_id=the-washington-post |
|
|
|
After calling a tool, I will provide you with the results to interpret for the user. |
|
""" |
|
|
|
def extract_tool_calls(self, response: str) -> List[Dict[str, Any]]: |
|
"""Extract tool calls from AI response""" |
|
tool_calls = [] |
|
lines = response.split('\n') |
|
|
|
for line in lines: |
|
line = line.strip() |
|
if line.startswith('CALL_TOOL:'): |
|
try: |
|
|
|
tool_part = line[10:].strip() |
|
|
|
|
|
if '(' in tool_part and ')' in tool_part: |
|
tool_name = tool_part.split('(')[0].strip() |
|
params_str = tool_part.split('(')[1].split(')')[0] |
|
|
|
params = {} |
|
if params_str.strip(): |
|
for param in params_str.split(','): |
|
if '=' in param: |
|
key, value = param.split('=', 1) |
|
key = key.strip() |
|
value = value.strip().strip('"\'') |
|
try: |
|
if value.isdigit(): |
|
value = int(value) |
|
elif value.lower() in ['true', 'false']: |
|
value = value.lower() == 'true' |
|
except: |
|
pass |
|
params[key] = value |
|
|
|
tool_calls.append({ |
|
'name': tool_name, |
|
'arguments': params |
|
}) |
|
else: |
|
|
|
tool_name = tool_part.strip() |
|
tool_calls.append({ |
|
'name': tool_name, |
|
'arguments': {} |
|
}) |
|
|
|
except Exception as e: |
|
print(f"Error parsing tool call '{line}': {e}") |
|
continue |
|
|
|
return tool_calls |
|
|
|
def truncate_tool_result(self, result: Dict[str, Any], max_chars: int = 2000) -> Dict[str, Any]: |
|
"""Truncate tool results to prevent context overflow""" |
|
if not isinstance(result, dict): |
|
return result |
|
|
|
result_copy = result.copy() |
|
result_str = json.dumps(result_copy, indent=2) |
|
|
|
if len(result_str) > max_chars: |
|
|
|
for key, value in result_copy.items(): |
|
if isinstance(value, list) and len(value) > 3: |
|
result_copy[key] = value[:3] + [f"... ({len(value) - 3} more items truncated)"] |
|
elif isinstance(value, str) and len(value) > 500: |
|
result_copy[key] = value[:500] + "... (truncated)" |
|
|
|
|
|
result_str = json.dumps(result_copy, indent=2) |
|
if len(result_str) > max_chars: |
|
result_copy = { |
|
"success": result.get("success", False), |
|
"truncated": True, |
|
"message": f"Result truncated due to size. Original had {len(result_str)} characters.", |
|
"sample_data": str(result)[:1000] + "..." if len(str(result)) > 1000 else str(result) |
|
} |
|
|
|
return result_copy |
|
|
|
def process_message(self, user_message: str) -> Tuple[str, str]: |
|
"""Process user message and handle tool calls""" |
|
tool_info = "" |
|
|
|
try: |
|
messages = [ |
|
{"role": "system", "content": self.get_system_prompt()}, |
|
{"role": "user", "content": user_message} |
|
] |
|
|
|
|
|
if hasattr(self.openai_client, 'chat'): |
|
response = self.openai_client.chat.completions.create( |
|
model="gpt-3.5-turbo", |
|
messages=messages, |
|
temperature=0.7, |
|
max_tokens=800 |
|
) |
|
ai_response = response.choices[0].message.content |
|
else: |
|
|
|
response = self.openai_client.ChatCompletion.create( |
|
model="gpt-3.5-turbo", |
|
messages=messages, |
|
temperature=0.7, |
|
max_tokens=800 |
|
) |
|
ai_response = response.choices[0].message.content |
|
tool_calls = self.extract_tool_calls(ai_response) |
|
|
|
|
|
print(f"AI Response: {ai_response}") |
|
print(f"Extracted tool calls: {tool_calls}") |
|
|
|
if tool_calls: |
|
tool_results = [] |
|
|
|
for tool_call in tool_calls: |
|
tool_info += f"π§ Calling: {tool_call['name']}\n" |
|
|
|
|
|
result = self.mcp_client.call_tool_sync( |
|
tool_call['name'], |
|
tool_call['arguments'] |
|
) |
|
|
|
|
|
truncated_result = self.truncate_tool_result(result) |
|
|
|
tool_results.append({ |
|
'tool': tool_call['name'], |
|
'result': truncated_result |
|
}) |
|
|
|
if result.get('success'): |
|
tool_info += f"β
{tool_call['name']} completed\n" |
|
else: |
|
tool_info += f"β {tool_call['name']} failed: {result.get('error', 'Unknown error')}\n" |
|
|
|
|
|
tool_results_text = "\n\n".join([ |
|
f"Tool: {tr['tool']}\nResult: {json.dumps(tr['result'], indent=2)[:1500]}{'...(truncated)' if len(json.dumps(tr['result'], indent=2)) > 1500 else ''}" |
|
for tr in tool_results |
|
]) |
|
|
|
final_messages = messages + [ |
|
{"role": "assistant", "content": ai_response}, |
|
{"role": "user", "content": f"Here are the tool results:\n\n{tool_results_text}\n\nPlease interpret these results and provide a helpful response to the user."} |
|
] |
|
|
|
|
|
if hasattr(self.openai_client, 'chat'): |
|
final_response = self.openai_client.chat.completions.create( |
|
model="gpt-3.5-turbo", |
|
messages=final_messages, |
|
temperature=0.7, |
|
max_tokens=800 |
|
) |
|
return final_response.choices[0].message.content, tool_info |
|
else: |
|
final_response = self.openai_client.ChatCompletion.create( |
|
model="gpt-3.5-turbo", |
|
messages=final_messages, |
|
temperature=0.7, |
|
max_tokens=800 |
|
) |
|
return final_response.choices[0].message.content, tool_info |
|
else: |
|
return ai_response, "" |
|
|
|
except Exception as e: |
|
return f"β Error processing your request: {str(e)}", "" |
|
|
|
|
|
assistant = None |
|
mcp_client = None |
|
|
|
def test_connection(mcp_url): |
|
"""Test MCP server connection""" |
|
if not mcp_url or mcp_url == "https://your-ngrok-url.ngrok.io": |
|
return "β Please enter a valid MCP server URL" |
|
|
|
try: |
|
|
|
response = requests.get(f"{mcp_url.rstrip('/')}/health", timeout=10) |
|
if response.status_code == 200: |
|
data = response.json() |
|
|
|
|
|
mcp_request = { |
|
"jsonrpc": "2.0", |
|
"id": 1, |
|
"method": "tools/list" |
|
} |
|
|
|
mcp_response = requests.post( |
|
f"{mcp_url.rstrip('/')}/mcp", |
|
json=mcp_request, |
|
headers={ |
|
"Content-Type": "application/json", |
|
"ngrok-skip-browser-warning": "true" |
|
}, |
|
timeout=10 |
|
) |
|
|
|
if mcp_response.status_code == 200: |
|
mcp_data = mcp_response.json() |
|
tools = mcp_data.get("result", {}).get("tools", []) |
|
tool_names = [tool.get("name", "Unknown") for tool in tools] |
|
|
|
return f"β
Connected successfully!\nHealth Status: {data.get('status', 'Unknown')}\nMCP Tools: {len(tools)}\nAvailable: {', '.join(tool_names)}" |
|
else: |
|
return f"β
Health OK, but MCP endpoint failed: HTTP {mcp_response.status_code}" |
|
else: |
|
return f"β Connection failed: HTTP {response.status_code}" |
|
except Exception as e: |
|
return f"β Connection error: {str(e)}" |
|
|
|
def initialize_assistant(openai_key, mcp_url): |
|
"""Initialize the AI assistant""" |
|
global assistant, mcp_client |
|
|
|
if not openai_key: |
|
return "β Please enter your OpenAI API key" |
|
|
|
if not mcp_url or mcp_url == "https://your-ngrok-url.ngrok.io": |
|
return "β Please enter a valid MCP server URL" |
|
|
|
try: |
|
mcp_client = MCPClient(mcp_url) |
|
assistant = AIAssistant(openai_key, mcp_client) |
|
assistant.initialize() |
|
return f"β
AI Assistant initialized with {len(assistant.available_tools)} tools available" |
|
except Exception as e: |
|
return f"β Failed to initialize: {str(e)}" |
|
|
|
def chat_interface(message, history, openai_key, mcp_url): |
|
"""Main chat interface""" |
|
global assistant |
|
|
|
if not assistant: |
|
init_result = initialize_assistant(openai_key, mcp_url) |
|
if "β" in init_result: |
|
history.append([message, init_result]) |
|
return history, "" |
|
|
|
try: |
|
print(f"Calling process_message with: {message}") |
|
|
|
|
|
|
|
if len(history) > 10: |
|
history = history[-10:] |
|
|
|
|
|
result = assistant.process_message(message) |
|
print(f"process_message returned: {type(result)} - {result}") |
|
|
|
|
|
if isinstance(result, tuple) and len(result) == 2: |
|
response, tool_info = result |
|
print(f"Unpacked: response={response}, tool_info={tool_info}") |
|
else: |
|
response = str(result) |
|
tool_info = "" |
|
print(f"Single result: {response}") |
|
|
|
|
|
if tool_info: |
|
full_response = f"**Tool Execution:**\n{tool_info}\n\n**Response:**\n{response}" |
|
else: |
|
full_response = response |
|
|
|
history.append([message, full_response]) |
|
return history, "" |
|
except Exception as e: |
|
import traceback |
|
error_response = f"β Error: {str(e)}\n\nTraceback:\n{traceback.format_exc()}" |
|
print(f"Error in chat_interface: {error_response}") |
|
history.append([message, error_response]) |
|
return history, "" |
|
|
|
|
|
with gr.Blocks(title="AI Assistant with SAP & News Integration", theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# π€ AI Assistant with SAP & News Integration") |
|
gr.Markdown("Chat with an AI that can access SAP business data and news through natural language queries.") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
chatbot = gr.Chatbot( |
|
height=500, |
|
show_label=False, |
|
container=True, |
|
bubble_full_width=False |
|
) |
|
|
|
msg = gr.Textbox( |
|
placeholder="Ask me about SAP data, news, or anything else...", |
|
show_label=False, |
|
container=False |
|
) |
|
|
|
with gr.Row(): |
|
submit_btn = gr.Button("Send", variant="primary") |
|
clear_btn = gr.Button("Clear", variant="secondary") |
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("### βοΈ Configuration") |
|
|
|
openai_key = gr.Textbox( |
|
label="OpenAI API Key", |
|
type="password", |
|
placeholder="sk-..." |
|
) |
|
|
|
mcp_url = gr.Textbox( |
|
label="MCP Server URL", |
|
value="https://your-ngrok-url.ngrok.io", |
|
placeholder="https://abc123.ngrok.io" |
|
) |
|
|
|
test_btn = gr.Button("Test Connection", variant="secondary") |
|
connection_status = gr.Textbox(label="Connection Status", interactive=False) |
|
|
|
gr.Markdown("### π Example Queries") |
|
gr.Markdown(""" |
|
- "Show me recent purchase orders" |
|
- "Get purchase requisitions" |
|
- "What's the latest tech news?" |
|
- "Get news from BBC" |
|
- "Show me business news from the US" |
|
""") |
|
|
|
|
|
def respond(message, history, openai_key, mcp_url): |
|
return chat_interface(message, history, openai_key, mcp_url) |
|
|
|
submit_btn.click( |
|
respond, |
|
[msg, chatbot, openai_key, mcp_url], |
|
[chatbot, msg] |
|
) |
|
|
|
msg.submit( |
|
respond, |
|
[msg, chatbot, openai_key, mcp_url], |
|
[chatbot, msg] |
|
) |
|
|
|
clear_btn.click(lambda: ([], ""), outputs=[chatbot, msg]) |
|
|
|
test_btn.click( |
|
test_connection, |
|
[mcp_url], |
|
[connection_status] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |