Client / streamlit_app.py
PD03's picture
Rename app.py to streamlit_app.py
4a37ac7 verified
import streamlit as st
import openai
import requests
import json
import asyncio
import aiohttp
from typing import Dict, Any, List
from datetime import datetime
import os
# Page configuration
st.set_page_config(
page_title="AI Assistant with SAP & News Integration",
page_icon="πŸ€–",
layout="wide"
)
# Custom CSS for better UI
st.markdown("""
<style>
.main-header {
font-size: 2.5rem;
font-weight: bold;
text-align: center;
color: #1f77b4;
margin-bottom: 2rem;
}
.chat-message {
padding: 1rem;
border-radius: 0.5rem;
margin: 0.5rem 0;
}
.user-message {
background-color: #e3f2fd;
border-left: 4px solid #2196f3;
}
.assistant-message {
background-color: #f5f5f5;
border-left: 4px solid #4caf50;
}
.tool-result {
background-color: #fff3e0;
border: 1px solid #ff9800;
border-radius: 0.5rem;
padding: 1rem;
margin: 1rem 0;
}
.error-message {
background-color: #ffebee;
border: 1px solid #f44336;
border-radius: 0.5rem;
padding: 1rem;
margin: 1rem 0;
}
</style>
""", unsafe_allow_html=True)
class MCPClient:
"""MCP Client for communicating with the MCP server"""
def __init__(self, server_url: str):
self.server_url = server_url.rstrip('/')
self.session = None
async def initialize_session(self):
"""Initialize aiohttp session"""
if not self.session:
self.session = aiohttp.ClientSession()
async def close_session(self):
"""Close aiohttp session"""
if self.session:
await self.session.close()
self.session = None
async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> Dict[str, Any]:
"""Call a tool on the MCP server"""
if arguments is None:
arguments = {}
await self.initialize_session()
mcp_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": arguments
}
}
try:
async with self.session.post(
f"{self.server_url}/mcp",
json=mcp_request,
headers={"Content-Type": "application/json"}
) as response:
if response.status == 200:
result = await response.json()
if "result" in result and "content" in result["result"]:
# Extract the actual data from MCP response
content = result["result"]["content"][0]["text"]
return json.loads(content)
return result
else:
return {
"success": False,
"error": f"HTTP {response.status}: {await response.text()}"
}
except Exception as e:
return {
"success": False,
"error": f"Connection error: {str(e)}"
}
async def list_tools(self) -> List[Dict[str, Any]]:
"""List available tools on the MCP server"""
await self.initialize_session()
mcp_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list"
}
try:
async with self.session.post(
f"{self.server_url}/mcp",
json=mcp_request,
headers={"Content-Type": "application/json"}
) as response:
if response.status == 200:
result = await response.json()
return result.get("result", {}).get("tools", [])
return []
except Exception as e:
st.error(f"Error listing tools: {str(e)}")
return []
class AIAssistant:
"""AI Assistant with MCP integration"""
def __init__(self, openai_api_key: str, mcp_client: MCPClient):
self.openai_client = openai.OpenAI(api_key=openai_api_key)
self.mcp_client = mcp_client
self.available_tools = []
async def initialize(self):
"""Initialize the assistant by fetching available tools"""
self.available_tools = await self.mcp_client.list_tools()
def get_system_prompt(self) -> str:
"""Generate system prompt with available tools"""
tools_description = "\n".join([
f"- {tool['name']}: {tool['description']}"
for tool in self.available_tools
])
return f"""You are an AI assistant with access to SAP business systems and news data through specialized tools.
Available tools:
{tools_description}
When a user asks for information that can be retrieved using these tools, you should:
1. Identify which tool(s) would be helpful
2. Call the appropriate tool(s) with the right parameters
3. Interpret and present the results in a user-friendly way
For SAP-related queries (purchase orders, requisitions), use the SAP tools.
For news-related queries, use the news tools.
Always explain what you're doing and present results clearly. If a tool call fails, explain the error and suggest alternatives.
You can call tools by responding with: CALL_TOOL: tool_name(parameter1=value1, parameter2=value2)
"""
def extract_tool_calls(self, response: str) -> List[Dict[str, Any]]:
"""Extract tool calls from AI response"""
tool_calls = []
lines = response.split('\n')
for line in lines:
if line.strip().startswith('CALL_TOOL:'):
try:
# Parse tool call: CALL_TOOL: tool_name(param1=value1, param2=value2)
tool_part = line.strip()[10:].strip() # Remove 'CALL_TOOL:'
if '(' in tool_part and ')' in tool_part:
tool_name = tool_part.split('(')[0].strip()
params_str = tool_part.split('(')[1].split(')')[0]
# Parse parameters
params = {}
if params_str.strip():
for param in params_str.split(','):
if '=' in param:
key, value = param.split('=', 1)
key = key.strip()
value = value.strip().strip('"\'')
# Try to convert to appropriate type
try:
if value.isdigit():
value = int(value)
elif value.lower() in ['true', 'false']:
value = value.lower() == 'true'
except:
pass
params[key] = value
tool_calls.append({
'name': tool_name,
'arguments': params
})
except Exception as e:
st.error(f"Error parsing tool call: {e}")
return tool_calls
async def process_message(self, user_message: str) -> str:
"""Process user message and handle tool calls"""
try:
# First, get AI response to understand what tools to call
messages = [
{"role": "system", "content": self.get_system_prompt()},
{"role": "user", "content": user_message}
]
response = self.openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
temperature=0.7,
max_tokens=1000
)
ai_response = response.choices[0].message.content
# Check if AI wants to call any tools
tool_calls = self.extract_tool_calls(ai_response)
if tool_calls:
tool_results = []
for tool_call in tool_calls:
st.info(f"πŸ”§ Calling tool: {tool_call['name']} with parameters: {tool_call['arguments']}")
result = await self.mcp_client.call_tool(
tool_call['name'],
tool_call['arguments']
)
tool_results.append({
'tool': tool_call['name'],
'result': result
})
# Display tool result
if result.get('success'):
st.success(f"βœ… Tool {tool_call['name']} executed successfully")
with st.expander(f"πŸ“Š {tool_call['name']} Results", expanded=False):
st.json(result)
else:
st.error(f"❌ Tool {tool_call['name']} failed: {result.get('error', 'Unknown error')}")
# Get final response with tool results
tool_results_text = "\n\n".join([
f"Tool: {tr['tool']}\nResult: {json.dumps(tr['result'], indent=2)}"
for tr in tool_results
])
final_messages = messages + [
{"role": "assistant", "content": ai_response},
{"role": "user", "content": f"Here are the tool results:\n\n{tool_results_text}\n\nPlease interpret these results and provide a helpful response to the user."}
]
final_response = self.openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=final_messages,
temperature=0.7,
max_tokens=1000
)
return final_response.choices[0].message.content
else:
return ai_response
except Exception as e:
return f"❌ Error processing your request: {str(e)}"
# Streamlit App
def main():
st.markdown('<h1 class="main-header">πŸ€– AI Assistant with SAP & News Integration</h1>', unsafe_allow_html=True)
# Sidebar for configuration
with st.sidebar:
st.header("βš™οΈ Configuration")
# OpenAI API Key
openai_api_key = st.text_input(
"OpenAI API Key",
type="password",
help="Enter your OpenAI API key"
)
# MCP Server URL
mcp_server_url = st.text_input(
"MCP Server URL",
value="https://your-ngrok-url.ngrok.io",
help="Enter your ngrok URL where the MCP server is running"
)
# Test connection button
if st.button("πŸ” Test MCP Connection"):
if mcp_server_url:
try:
response = requests.get(f"{mcp_server_url.rstrip('/')}/health", timeout=10)
if response.status_code == 200:
st.success("βœ… MCP Server connected successfully!")
st.json(response.json())
else:
st.error(f"❌ Connection failed: HTTP {response.status_code}")
except Exception as e:
st.error(f"❌ Connection error: {str(e)}")
else:
st.error("Please enter MCP Server URL")
st.markdown("---")
st.markdown("### πŸ“‹ Available Commands")
st.markdown("""
- **SAP Purchase Orders**: "Show me recent purchase orders"
- **SAP Requisitions**: "Get purchase requisitions"
- **News Headlines**: "What's the latest tech news?"
- **News by Source**: "Get news from BBC"
""")
# Main chat interface
if not openai_api_key:
st.warning("⚠️ Please enter your OpenAI API key in the sidebar to continue.")
return
if not mcp_server_url or mcp_server_url == "https://your-ngrok-url.ngrok.io":
st.warning("⚠️ Please enter your MCP server URL in the sidebar.")
return
# Initialize session state
if 'messages' not in st.session_state:
st.session_state.messages = []
if 'assistant' not in st.session_state:
mcp_client = MCPClient(mcp_server_url)
st.session_state.assistant = AIAssistant(openai_api_key, mcp_client)
# Initialize assistant
async def init_assistant():
await st.session_state.assistant.initialize()
try:
asyncio.run(init_assistant())
st.success("πŸš€ AI Assistant initialized successfully!")
except Exception as e:
st.error(f"❌ Failed to initialize assistant: {str(e)}")
return
# Display chat messages
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Chat input
if prompt := st.chat_input("Ask me about SAP data, news, or anything else..."):
# Add user message
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
# Get assistant response
with st.chat_message("assistant"):
with st.spinner("πŸ€” Thinking and processing..."):
try:
response = asyncio.run(
st.session_state.assistant.process_message(prompt)
)
st.markdown(response)
# Add assistant response to history
st.session_state.messages.append({"role": "assistant", "content": response})
except Exception as e:
error_msg = f"❌ Sorry, I encountered an error: {str(e)}"
st.error(error_msg)
st.session_state.messages.append({"role": "assistant", "content": error_msg})
# Footer
st.markdown("---")
st.markdown(
"πŸ’‘ **Tip**: Try asking about purchase orders, requisitions, or latest news. "
"The AI will automatically use the appropriate tools to fetch the data."
)
if __name__ == "__main__":
main()