import streamlit as st import openai import requests import json import asyncio import aiohttp from typing import Dict, Any, List from datetime import datetime import os # Page configuration st.set_page_config( page_title="AI Assistant with SAP & News Integration", page_icon="🤖", layout="wide" ) # Custom CSS for better UI st.markdown(""" """, unsafe_allow_html=True) class MCPClient: """MCP Client for communicating with the MCP server""" def __init__(self, server_url: str): self.server_url = server_url.rstrip('/') self.session = None async def initialize_session(self): """Initialize aiohttp session""" if not self.session: self.session = aiohttp.ClientSession() async def close_session(self): """Close aiohttp session""" if self.session: await self.session.close() self.session = None async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> Dict[str, Any]: """Call a tool on the MCP server""" if arguments is None: arguments = {} await self.initialize_session() mcp_request = { "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": tool_name, "arguments": arguments } } try: async with self.session.post( f"{self.server_url}/mcp", json=mcp_request, headers={"Content-Type": "application/json"} ) as response: if response.status == 200: result = await response.json() if "result" in result and "content" in result["result"]: # Extract the actual data from MCP response content = result["result"]["content"][0]["text"] return json.loads(content) return result else: return { "success": False, "error": f"HTTP {response.status}: {await response.text()}" } except Exception as e: return { "success": False, "error": f"Connection error: {str(e)}" } async def list_tools(self) -> List[Dict[str, Any]]: """List available tools on the MCP server""" await self.initialize_session() mcp_request = { "jsonrpc": "2.0", "id": 1, "method": "tools/list" } try: async with self.session.post( f"{self.server_url}/mcp", json=mcp_request, headers={"Content-Type": "application/json"} ) as response: if response.status == 200: result = await response.json() return result.get("result", {}).get("tools", []) return [] except Exception as e: st.error(f"Error listing tools: {str(e)}") return [] class AIAssistant: """AI Assistant with MCP integration""" def __init__(self, openai_api_key: str, mcp_client: MCPClient): self.openai_client = openai.OpenAI(api_key=openai_api_key) self.mcp_client = mcp_client self.available_tools = [] async def initialize(self): """Initialize the assistant by fetching available tools""" self.available_tools = await self.mcp_client.list_tools() def get_system_prompt(self) -> str: """Generate system prompt with available tools""" tools_description = "\n".join([ f"- {tool['name']}: {tool['description']}" for tool in self.available_tools ]) return f"""You are an AI assistant with access to SAP business systems and news data through specialized tools. Available tools: {tools_description} When a user asks for information that can be retrieved using these tools, you should: 1. Identify which tool(s) would be helpful 2. Call the appropriate tool(s) with the right parameters 3. Interpret and present the results in a user-friendly way For SAP-related queries (purchase orders, requisitions), use the SAP tools. For news-related queries, use the news tools. Always explain what you're doing and present results clearly. If a tool call fails, explain the error and suggest alternatives. You can call tools by responding with: CALL_TOOL: tool_name(parameter1=value1, parameter2=value2) """ def extract_tool_calls(self, response: str) -> List[Dict[str, Any]]: """Extract tool calls from AI response""" tool_calls = [] lines = response.split('\n') for line in lines: if line.strip().startswith('CALL_TOOL:'): try: # Parse tool call: CALL_TOOL: tool_name(param1=value1, param2=value2) tool_part = line.strip()[10:].strip() # Remove 'CALL_TOOL:' if '(' in tool_part and ')' in tool_part: tool_name = tool_part.split('(')[0].strip() params_str = tool_part.split('(')[1].split(')')[0] # Parse parameters params = {} if params_str.strip(): for param in params_str.split(','): if '=' in param: key, value = param.split('=', 1) key = key.strip() value = value.strip().strip('"\'') # Try to convert to appropriate type try: if value.isdigit(): value = int(value) elif value.lower() in ['true', 'false']: value = value.lower() == 'true' except: pass params[key] = value tool_calls.append({ 'name': tool_name, 'arguments': params }) except Exception as e: st.error(f"Error parsing tool call: {e}") return tool_calls async def process_message(self, user_message: str) -> str: """Process user message and handle tool calls""" try: # First, get AI response to understand what tools to call messages = [ {"role": "system", "content": self.get_system_prompt()}, {"role": "user", "content": user_message} ] response = self.openai_client.chat.completions.create( model="gpt-3.5-turbo", messages=messages, temperature=0.7, max_tokens=1000 ) ai_response = response.choices[0].message.content # Check if AI wants to call any tools tool_calls = self.extract_tool_calls(ai_response) if tool_calls: tool_results = [] for tool_call in tool_calls: st.info(f"🔧 Calling tool: {tool_call['name']} with parameters: {tool_call['arguments']}") result = await self.mcp_client.call_tool( tool_call['name'], tool_call['arguments'] ) tool_results.append({ 'tool': tool_call['name'], 'result': result }) # Display tool result if result.get('success'): st.success(f"✅ Tool {tool_call['name']} executed successfully") with st.expander(f"📊 {tool_call['name']} Results", expanded=False): st.json(result) else: st.error(f"❌ Tool {tool_call['name']} failed: {result.get('error', 'Unknown error')}") # Get final response with tool results tool_results_text = "\n\n".join([ f"Tool: {tr['tool']}\nResult: {json.dumps(tr['result'], indent=2)}" for tr in tool_results ]) final_messages = messages + [ {"role": "assistant", "content": ai_response}, {"role": "user", "content": f"Here are the tool results:\n\n{tool_results_text}\n\nPlease interpret these results and provide a helpful response to the user."} ] final_response = self.openai_client.chat.completions.create( model="gpt-3.5-turbo", messages=final_messages, temperature=0.7, max_tokens=1000 ) return final_response.choices[0].message.content else: return ai_response except Exception as e: return f"❌ Error processing your request: {str(e)}" # Streamlit App def main(): st.markdown('