import streamlit as st import openai import requests import json import asyncio import aiohttp from typing import Dict, Any, List from datetime import datetime import os # Page configuration st.set_page_config( page_title="AI Assistant with SAP & News Integration", page_icon="🤖", layout="wide" ) # Custom CSS for better UI st.markdown(""" """, unsafe_allow_html=True) class MCPClient: """MCP Client for communicating with the MCP server""" def __init__(self, server_url: str): self.server_url = server_url.rstrip('/') self.session = None async def initialize_session(self): """Initialize aiohttp session""" if not self.session: self.session = aiohttp.ClientSession() async def close_session(self): """Close aiohttp session""" if self.session: await self.session.close() self.session = None async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> Dict[str, Any]: """Call a tool on the MCP server""" if arguments is None: arguments = {} await self.initialize_session() mcp_request = { "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": tool_name, "arguments": arguments } } try: async with self.session.post( f"{self.server_url}/mcp", json=mcp_request, headers={"Content-Type": "application/json"} ) as response: if response.status == 200: result = await response.json() if "result" in result and "content" in result["result"]: # Extract the actual data from MCP response content = result["result"]["content"][0]["text"] return json.loads(content) return result else: return { "success": False, "error": f"HTTP {response.status}: {await response.text()}" } except Exception as e: return { "success": False, "error": f"Connection error: {str(e)}" } async def list_tools(self) -> List[Dict[str, Any]]: """List available tools on the MCP server""" await self.initialize_session() mcp_request = { "jsonrpc": "2.0", "id": 1, "method": "tools/list" } try: async with self.session.post( f"{self.server_url}/mcp", json=mcp_request, headers={"Content-Type": "application/json"} ) as response: if response.status == 200: result = await response.json() return result.get("result", {}).get("tools", []) return [] except Exception as e: st.error(f"Error listing tools: {str(e)}") return [] class AIAssistant: """AI Assistant with MCP integration""" def __init__(self, openai_api_key: str, mcp_client: MCPClient): self.openai_client = openai.OpenAI(api_key=openai_api_key) self.mcp_client = mcp_client self.available_tools = [] async def initialize(self): """Initialize the assistant by fetching available tools""" self.available_tools = await self.mcp_client.list_tools() def get_system_prompt(self) -> str: """Generate system prompt with available tools""" tools_description = "\n".join([ f"- {tool['name']}: {tool['description']}" for tool in self.available_tools ]) return f"""You are an AI assistant with access to SAP business systems and news data through specialized tools. Available tools: {tools_description} When a user asks for information that can be retrieved using these tools, you should: 1. Identify which tool(s) would be helpful 2. Call the appropriate tool(s) with the right parameters 3. Interpret and present the results in a user-friendly way For SAP-related queries (purchase orders, requisitions), use the SAP tools. For news-related queries, use the news tools. Always explain what you're doing and present results clearly. If a tool call fails, explain the error and suggest alternatives. You can call tools by responding with: CALL_TOOL: tool_name(parameter1=value1, parameter2=value2) """ def extract_tool_calls(self, response: str) -> List[Dict[str, Any]]: """Extract tool calls from AI response""" tool_calls = [] lines = response.split('\n') for line in lines: if line.strip().startswith('CALL_TOOL:'): try: # Parse tool call: CALL_TOOL: tool_name(param1=value1, param2=value2) tool_part = line.strip()[10:].strip() # Remove 'CALL_TOOL:' if '(' in tool_part and ')' in tool_part: tool_name = tool_part.split('(')[0].strip() params_str = tool_part.split('(')[1].split(')')[0] # Parse parameters params = {} if params_str.strip(): for param in params_str.split(','): if '=' in param: key, value = param.split('=', 1) key = key.strip() value = value.strip().strip('"\'') # Try to convert to appropriate type try: if value.isdigit(): value = int(value) elif value.lower() in ['true', 'false']: value = value.lower() == 'true' except: pass params[key] = value tool_calls.append({ 'name': tool_name, 'arguments': params }) except Exception as e: st.error(f"Error parsing tool call: {e}") return tool_calls async def process_message(self, user_message: str) -> str: """Process user message and handle tool calls""" try: # First, get AI response to understand what tools to call messages = [ {"role": "system", "content": self.get_system_prompt()}, {"role": "user", "content": user_message} ] response = self.openai_client.chat.completions.create( model="gpt-3.5-turbo", messages=messages, temperature=0.7, max_tokens=1000 ) ai_response = response.choices[0].message.content # Check if AI wants to call any tools tool_calls = self.extract_tool_calls(ai_response) if tool_calls: tool_results = [] for tool_call in tool_calls: st.info(f"🔧 Calling tool: {tool_call['name']} with parameters: {tool_call['arguments']}") result = await self.mcp_client.call_tool( tool_call['name'], tool_call['arguments'] ) tool_results.append({ 'tool': tool_call['name'], 'result': result }) # Display tool result if result.get('success'): st.success(f"✅ Tool {tool_call['name']} executed successfully") with st.expander(f"📊 {tool_call['name']} Results", expanded=False): st.json(result) else: st.error(f"❌ Tool {tool_call['name']} failed: {result.get('error', 'Unknown error')}") # Get final response with tool results tool_results_text = "\n\n".join([ f"Tool: {tr['tool']}\nResult: {json.dumps(tr['result'], indent=2)}" for tr in tool_results ]) final_messages = messages + [ {"role": "assistant", "content": ai_response}, {"role": "user", "content": f"Here are the tool results:\n\n{tool_results_text}\n\nPlease interpret these results and provide a helpful response to the user."} ] final_response = self.openai_client.chat.completions.create( model="gpt-3.5-turbo", messages=final_messages, temperature=0.7, max_tokens=1000 ) return final_response.choices[0].message.content else: return ai_response except Exception as e: return f"❌ Error processing your request: {str(e)}" # Streamlit App def main(): st.markdown('

🤖 AI Assistant with SAP & News Integration

', unsafe_allow_html=True) # Sidebar for configuration with st.sidebar: st.header("⚙️ Configuration") # OpenAI API Key openai_api_key = st.text_input( "OpenAI API Key", type="password", help="Enter your OpenAI API key" ) # MCP Server URL mcp_server_url = st.text_input( "MCP Server URL", value="https://your-ngrok-url.ngrok.io", help="Enter your ngrok URL where the MCP server is running" ) # Test connection button if st.button("🔍 Test MCP Connection"): if mcp_server_url: try: response = requests.get(f"{mcp_server_url.rstrip('/')}/health", timeout=10) if response.status_code == 200: st.success("✅ MCP Server connected successfully!") st.json(response.json()) else: st.error(f"❌ Connection failed: HTTP {response.status_code}") except Exception as e: st.error(f"❌ Connection error: {str(e)}") else: st.error("Please enter MCP Server URL") st.markdown("---") st.markdown("### 📋 Available Commands") st.markdown(""" - **SAP Purchase Orders**: "Show me recent purchase orders" - **SAP Requisitions**: "Get purchase requisitions" - **News Headlines**: "What's the latest tech news?" - **News by Source**: "Get news from BBC" """) # Main chat interface if not openai_api_key: st.warning("⚠️ Please enter your OpenAI API key in the sidebar to continue.") return if not mcp_server_url or mcp_server_url == "https://your-ngrok-url.ngrok.io": st.warning("⚠️ Please enter your MCP server URL in the sidebar.") return # Initialize session state if 'messages' not in st.session_state: st.session_state.messages = [] if 'assistant' not in st.session_state: mcp_client = MCPClient(mcp_server_url) st.session_state.assistant = AIAssistant(openai_api_key, mcp_client) # Initialize assistant async def init_assistant(): await st.session_state.assistant.initialize() try: asyncio.run(init_assistant()) st.success("🚀 AI Assistant initialized successfully!") except Exception as e: st.error(f"❌ Failed to initialize assistant: {str(e)}") return # Display chat messages for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) # Chat input if prompt := st.chat_input("Ask me about SAP data, news, or anything else..."): # Add user message st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) # Get assistant response with st.chat_message("assistant"): with st.spinner("🤔 Thinking and processing..."): try: response = asyncio.run( st.session_state.assistant.process_message(prompt) ) st.markdown(response) # Add assistant response to history st.session_state.messages.append({"role": "assistant", "content": response}) except Exception as e: error_msg = f"❌ Sorry, I encountered an error: {str(e)}" st.error(error_msg) st.session_state.messages.append({"role": "assistant", "content": error_msg}) # Footer st.markdown("---") st.markdown( "💡 **Tip**: Try asking about purchase orders, requisitions, or latest news. " "The AI will automatically use the appropriate tools to fetch the data." ) if __name__ == "__main__": main()