|
import streamlit as st |
|
import openai |
|
import requests |
|
import json |
|
import asyncio |
|
import aiohttp |
|
from typing import Dict, Any, List |
|
from datetime import datetime |
|
import os |
|
|
|
|
|
st.set_page_config( |
|
page_title="AI Assistant with SAP & News Integration", |
|
page_icon="π€", |
|
layout="wide" |
|
) |
|
|
|
|
|
st.markdown(""" |
|
<style> |
|
.main-header { |
|
font-size: 2.5rem; |
|
font-weight: bold; |
|
text-align: center; |
|
color: #1f77b4; |
|
margin-bottom: 2rem; |
|
} |
|
.chat-message { |
|
padding: 1rem; |
|
border-radius: 0.5rem; |
|
margin: 0.5rem 0; |
|
} |
|
.user-message { |
|
background-color: #e3f2fd; |
|
border-left: 4px solid #2196f3; |
|
} |
|
.assistant-message { |
|
background-color: #f5f5f5; |
|
border-left: 4px solid #4caf50; |
|
} |
|
.tool-result { |
|
background-color: #fff3e0; |
|
border: 1px solid #ff9800; |
|
border-radius: 0.5rem; |
|
padding: 1rem; |
|
margin: 1rem 0; |
|
} |
|
.error-message { |
|
background-color: #ffebee; |
|
border: 1px solid #f44336; |
|
border-radius: 0.5rem; |
|
padding: 1rem; |
|
margin: 1rem 0; |
|
} |
|
</style> |
|
""", unsafe_allow_html=True) |
|
|
|
class MCPClient: |
|
"""MCP Client for communicating with the MCP server""" |
|
|
|
def __init__(self, server_url: str): |
|
self.server_url = server_url.rstrip('/') |
|
self.session = None |
|
|
|
async def initialize_session(self): |
|
"""Initialize aiohttp session""" |
|
if not self.session: |
|
self.session = aiohttp.ClientSession() |
|
|
|
async def close_session(self): |
|
"""Close aiohttp session""" |
|
if self.session: |
|
await self.session.close() |
|
self.session = None |
|
|
|
async def call_tool(self, tool_name: str, arguments: Dict[str, Any] = None) -> Dict[str, Any]: |
|
"""Call a tool on the MCP server""" |
|
if arguments is None: |
|
arguments = {} |
|
|
|
await self.initialize_session() |
|
|
|
mcp_request = { |
|
"jsonrpc": "2.0", |
|
"id": 1, |
|
"method": "tools/call", |
|
"params": { |
|
"name": tool_name, |
|
"arguments": arguments |
|
} |
|
} |
|
|
|
try: |
|
async with self.session.post( |
|
f"{self.server_url}/mcp", |
|
json=mcp_request, |
|
headers={"Content-Type": "application/json"} |
|
) as response: |
|
if response.status == 200: |
|
result = await response.json() |
|
if "result" in result and "content" in result["result"]: |
|
|
|
content = result["result"]["content"][0]["text"] |
|
return json.loads(content) |
|
return result |
|
else: |
|
return { |
|
"success": False, |
|
"error": f"HTTP {response.status}: {await response.text()}" |
|
} |
|
except Exception as e: |
|
return { |
|
"success": False, |
|
"error": f"Connection error: {str(e)}" |
|
} |
|
|
|
async def list_tools(self) -> List[Dict[str, Any]]: |
|
"""List available tools on the MCP server""" |
|
await self.initialize_session() |
|
|
|
mcp_request = { |
|
"jsonrpc": "2.0", |
|
"id": 1, |
|
"method": "tools/list" |
|
} |
|
|
|
try: |
|
async with self.session.post( |
|
f"{self.server_url}/mcp", |
|
json=mcp_request, |
|
headers={"Content-Type": "application/json"} |
|
) as response: |
|
if response.status == 200: |
|
result = await response.json() |
|
return result.get("result", {}).get("tools", []) |
|
return [] |
|
except Exception as e: |
|
st.error(f"Error listing tools: {str(e)}") |
|
return [] |
|
|
|
class AIAssistant: |
|
"""AI Assistant with MCP integration""" |
|
|
|
def __init__(self, openai_api_key: str, mcp_client: MCPClient): |
|
self.openai_client = openai.OpenAI(api_key=openai_api_key) |
|
self.mcp_client = mcp_client |
|
self.available_tools = [] |
|
|
|
async def initialize(self): |
|
"""Initialize the assistant by fetching available tools""" |
|
self.available_tools = await self.mcp_client.list_tools() |
|
|
|
def get_system_prompt(self) -> str: |
|
"""Generate system prompt with available tools""" |
|
tools_description = "\n".join([ |
|
f"- {tool['name']}: {tool['description']}" |
|
for tool in self.available_tools |
|
]) |
|
|
|
return f"""You are an AI assistant with access to SAP business systems and news data through specialized tools. |
|
|
|
Available tools: |
|
{tools_description} |
|
|
|
When a user asks for information that can be retrieved using these tools, you should: |
|
1. Identify which tool(s) would be helpful |
|
2. Call the appropriate tool(s) with the right parameters |
|
3. Interpret and present the results in a user-friendly way |
|
|
|
For SAP-related queries (purchase orders, requisitions), use the SAP tools. |
|
For news-related queries, use the news tools. |
|
|
|
Always explain what you're doing and present results clearly. If a tool call fails, explain the error and suggest alternatives. |
|
|
|
You can call tools by responding with: CALL_TOOL: tool_name(parameter1=value1, parameter2=value2) |
|
""" |
|
|
|
def extract_tool_calls(self, response: str) -> List[Dict[str, Any]]: |
|
"""Extract tool calls from AI response""" |
|
tool_calls = [] |
|
lines = response.split('\n') |
|
|
|
for line in lines: |
|
if line.strip().startswith('CALL_TOOL:'): |
|
try: |
|
|
|
tool_part = line.strip()[10:].strip() |
|
|
|
if '(' in tool_part and ')' in tool_part: |
|
tool_name = tool_part.split('(')[0].strip() |
|
params_str = tool_part.split('(')[1].split(')')[0] |
|
|
|
|
|
params = {} |
|
if params_str.strip(): |
|
for param in params_str.split(','): |
|
if '=' in param: |
|
key, value = param.split('=', 1) |
|
key = key.strip() |
|
value = value.strip().strip('"\'') |
|
|
|
try: |
|
if value.isdigit(): |
|
value = int(value) |
|
elif value.lower() in ['true', 'false']: |
|
value = value.lower() == 'true' |
|
except: |
|
pass |
|
params[key] = value |
|
|
|
tool_calls.append({ |
|
'name': tool_name, |
|
'arguments': params |
|
}) |
|
except Exception as e: |
|
st.error(f"Error parsing tool call: {e}") |
|
|
|
return tool_calls |
|
|
|
async def process_message(self, user_message: str) -> str: |
|
"""Process user message and handle tool calls""" |
|
try: |
|
|
|
messages = [ |
|
{"role": "system", "content": self.get_system_prompt()}, |
|
{"role": "user", "content": user_message} |
|
] |
|
|
|
response = self.openai_client.chat.completions.create( |
|
model="gpt-3.5-turbo", |
|
messages=messages, |
|
temperature=0.7, |
|
max_tokens=1000 |
|
) |
|
|
|
ai_response = response.choices[0].message.content |
|
|
|
|
|
tool_calls = self.extract_tool_calls(ai_response) |
|
|
|
if tool_calls: |
|
tool_results = [] |
|
|
|
for tool_call in tool_calls: |
|
st.info(f"π§ Calling tool: {tool_call['name']} with parameters: {tool_call['arguments']}") |
|
|
|
result = await self.mcp_client.call_tool( |
|
tool_call['name'], |
|
tool_call['arguments'] |
|
) |
|
|
|
tool_results.append({ |
|
'tool': tool_call['name'], |
|
'result': result |
|
}) |
|
|
|
|
|
if result.get('success'): |
|
st.success(f"β
Tool {tool_call['name']} executed successfully") |
|
with st.expander(f"π {tool_call['name']} Results", expanded=False): |
|
st.json(result) |
|
else: |
|
st.error(f"β Tool {tool_call['name']} failed: {result.get('error', 'Unknown error')}") |
|
|
|
|
|
tool_results_text = "\n\n".join([ |
|
f"Tool: {tr['tool']}\nResult: {json.dumps(tr['result'], indent=2)}" |
|
for tr in tool_results |
|
]) |
|
|
|
final_messages = messages + [ |
|
{"role": "assistant", "content": ai_response}, |
|
{"role": "user", "content": f"Here are the tool results:\n\n{tool_results_text}\n\nPlease interpret these results and provide a helpful response to the user."} |
|
] |
|
|
|
final_response = self.openai_client.chat.completions.create( |
|
model="gpt-3.5-turbo", |
|
messages=final_messages, |
|
temperature=0.7, |
|
max_tokens=1000 |
|
) |
|
|
|
return final_response.choices[0].message.content |
|
|
|
else: |
|
return ai_response |
|
|
|
except Exception as e: |
|
return f"β Error processing your request: {str(e)}" |
|
|
|
|
|
def main(): |
|
st.markdown('<h1 class="main-header">π€ AI Assistant with SAP & News Integration</h1>', unsafe_allow_html=True) |
|
|
|
|
|
with st.sidebar: |
|
st.header("βοΈ Configuration") |
|
|
|
|
|
openai_api_key = st.text_input( |
|
"OpenAI API Key", |
|
type="password", |
|
help="Enter your OpenAI API key" |
|
) |
|
|
|
|
|
mcp_server_url = st.text_input( |
|
"MCP Server URL", |
|
value="https://your-ngrok-url.ngrok.io", |
|
help="Enter your ngrok URL where the MCP server is running" |
|
) |
|
|
|
|
|
if st.button("π Test MCP Connection"): |
|
if mcp_server_url: |
|
try: |
|
response = requests.get(f"{mcp_server_url.rstrip('/')}/health", timeout=10) |
|
if response.status_code == 200: |
|
st.success("β
MCP Server connected successfully!") |
|
st.json(response.json()) |
|
else: |
|
st.error(f"β Connection failed: HTTP {response.status_code}") |
|
except Exception as e: |
|
st.error(f"β Connection error: {str(e)}") |
|
else: |
|
st.error("Please enter MCP Server URL") |
|
|
|
st.markdown("---") |
|
st.markdown("### π Available Commands") |
|
st.markdown(""" |
|
- **SAP Purchase Orders**: "Show me recent purchase orders" |
|
- **SAP Requisitions**: "Get purchase requisitions" |
|
- **News Headlines**: "What's the latest tech news?" |
|
- **News by Source**: "Get news from BBC" |
|
""") |
|
|
|
|
|
if not openai_api_key: |
|
st.warning("β οΈ Please enter your OpenAI API key in the sidebar to continue.") |
|
return |
|
|
|
if not mcp_server_url or mcp_server_url == "https://your-ngrok-url.ngrok.io": |
|
st.warning("β οΈ Please enter your MCP server URL in the sidebar.") |
|
return |
|
|
|
|
|
if 'messages' not in st.session_state: |
|
st.session_state.messages = [] |
|
|
|
if 'assistant' not in st.session_state: |
|
mcp_client = MCPClient(mcp_server_url) |
|
st.session_state.assistant = AIAssistant(openai_api_key, mcp_client) |
|
|
|
|
|
async def init_assistant(): |
|
await st.session_state.assistant.initialize() |
|
|
|
try: |
|
asyncio.run(init_assistant()) |
|
st.success("π AI Assistant initialized successfully!") |
|
except Exception as e: |
|
st.error(f"β Failed to initialize assistant: {str(e)}") |
|
return |
|
|
|
|
|
for message in st.session_state.messages: |
|
with st.chat_message(message["role"]): |
|
st.markdown(message["content"]) |
|
|
|
|
|
if prompt := st.chat_input("Ask me about SAP data, news, or anything else..."): |
|
|
|
st.session_state.messages.append({"role": "user", "content": prompt}) |
|
|
|
with st.chat_message("user"): |
|
st.markdown(prompt) |
|
|
|
|
|
with st.chat_message("assistant"): |
|
with st.spinner("π€ Thinking and processing..."): |
|
try: |
|
response = asyncio.run( |
|
st.session_state.assistant.process_message(prompt) |
|
) |
|
st.markdown(response) |
|
|
|
|
|
st.session_state.messages.append({"role": "assistant", "content": response}) |
|
|
|
except Exception as e: |
|
error_msg = f"β Sorry, I encountered an error: {str(e)}" |
|
st.error(error_msg) |
|
st.session_state.messages.append({"role": "assistant", "content": error_msg}) |
|
|
|
|
|
st.markdown("---") |
|
st.markdown( |
|
"π‘ **Tip**: Try asking about purchase orders, requisitions, or latest news. " |
|
"The AI will automatically use the appropriate tools to fetch the data." |
|
) |
|
|
|
if __name__ == "__main__": |
|
main() |