|
import gradio as gr |
|
import openai |
|
import requests |
|
import json |
|
import os |
|
from typing import Dict, List, Any |
|
from datetime import datetime |
|
from pydantic import BaseModel |
|
|
|
|
|
openai.api_key = os.getenv("OPENAI_API_KEY") |
|
client = openai.OpenAI() |
|
|
|
|
|
SAP_API_KEY = os.getenv("SAP_API_KEY") |
|
BASE_URL = "https://sandbox.api.sap.com/s4hanacloud/sap/opu/odata/sap/API_BUSINESS_PARTNER" |
|
CREDIT_API_URL = "https://sandbox.api.sap.com/s4hanacloud/sap/opu/odata/sap/API_CRDTMBUSINESSPARTNER" |
|
|
|
class WorkflowStep(BaseModel): |
|
"""Model for workflow step tracking""" |
|
step_number: int |
|
description: str |
|
status: str |
|
result: Any = None |
|
timestamp: datetime = datetime.now() |
|
|
|
class MultiStepWorkflow(BaseModel): |
|
"""Model for tracking multi-step workflows""" |
|
workflow_id: str |
|
name: str |
|
steps: List[WorkflowStep] |
|
current_step: int = 0 |
|
status: str = "pending" |
|
final_result: Any = None |
|
|
|
class SAPBusinessPartnerAgent: |
|
def __init__(self): |
|
self.conversation_history = [] |
|
self.active_workflows = {} |
|
|
|
def parse_search_query(self, query: str) -> Dict[str, Any]: |
|
"""Parse natural language query into SAP API parameters using OpenAI""" |
|
system_prompt = """ |
|
Parse this business partner search query into SAP OData parameters. |
|
|
|
Available fields for A_BusinessPartner: |
|
- BusinessPartner (ID) |
|
- BusinessPartnerFullName (Name) |
|
- BusinessPartnerCategory (1=Person, 2=Organization, 3=Group) |
|
- BusinessPartnerGrouping |
|
- CreationDate |
|
- IsMarkedForArchiving |
|
- SearchTerm1, SearchTerm2 |
|
|
|
For country-specific queries, use contains() with country names. |
|
|
|
Return JSON with OData query parameters. |
|
|
|
Examples: |
|
- "Find customers in Germany" β {"$filter": "contains(BusinessPartnerFullName,'Germany') or contains(SearchTerm1,'DE')", "$top": "20"} |
|
- "Show me organizations" β {"$filter": "BusinessPartnerCategory eq '2'", "$top": "10"} |
|
- "Active partners only" β {"$filter": "IsMarkedForArchiving eq false", "$top": "10"} |
|
|
|
Always include $top with a reasonable limit (max 50 for workflows). |
|
""" |
|
|
|
try: |
|
response = client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=[ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": f"Query: {query}"} |
|
], |
|
response_format={"type": "json_object"} |
|
) |
|
|
|
result = json.loads(response.choices[0].message.content) |
|
if "$top" not in result: |
|
result["$top"] = "20" |
|
return result |
|
|
|
except Exception as e: |
|
|
|
if "germany" in query.lower(): |
|
return { |
|
"$filter": "contains(BusinessPartnerFullName,'Germany')", |
|
"$top": "20" |
|
} |
|
return { |
|
"$filter": f"contains(BusinessPartnerFullName,'{query}')", |
|
"$top": "10" |
|
} |
|
|
|
def call_sap_api(self, entity_set: str, params: Dict[str, Any], base_url: str = None) -> Dict[str, Any]: |
|
"""Execute SAP API call""" |
|
url = f"{base_url or BASE_URL}/{entity_set}" |
|
|
|
|
|
query_params = { |
|
"$format": "json", |
|
"$inlinecount": "allpages" |
|
} |
|
|
|
|
|
for key, value in params.items(): |
|
query_params[key] = value |
|
|
|
headers = { |
|
"Accept": "application/json", |
|
"APIKey": SAP_API_KEY or "demo_key" |
|
} |
|
|
|
try: |
|
response = requests.get(url, params=query_params, headers=headers, timeout=30) |
|
|
|
if response.status_code == 200: |
|
return response.json() |
|
else: |
|
|
|
return self.get_mock_data(entity_set) |
|
|
|
except Exception as e: |
|
|
|
return self.get_mock_data(entity_set) |
|
|
|
def get_mock_data(self, entity_set: str) -> Dict[str, Any]: |
|
"""Return mock SAP data for demo purposes""" |
|
if entity_set == "CreditManagementAccount": |
|
return { |
|
"d": { |
|
"results": [ |
|
{ |
|
"BusinessPartner": "1000000001", |
|
"CreditLimitAmount": "50000.00", |
|
"Currency": "EUR", |
|
"CreditLimitValidFrom": "/Date(1640995200000)/", |
|
"CreditLimitValidTo": "/Date(1672531200000)/", |
|
"CreditExposureAmount": "25000.00" |
|
}, |
|
{ |
|
"BusinessPartner": "1000000002", |
|
"CreditLimitAmount": "75000.00", |
|
"Currency": "EUR", |
|
"CreditLimitValidFrom": "/Date(1640995200000)/", |
|
"CreditLimitValidTo": "/Date(1672531200000)/", |
|
"CreditExposureAmount": "15000.00" |
|
} |
|
], |
|
"__count": "2" |
|
} |
|
} |
|
else: |
|
return { |
|
"d": { |
|
"results": [ |
|
{ |
|
"BusinessPartner": "1000000001", |
|
"BusinessPartnerFullName": "Munich Manufacturing GmbH", |
|
"BusinessPartnerCategory": "2", |
|
"CreationDate": "/Date(1640995200000)/", |
|
"IsMarkedForArchiving": False, |
|
"SearchTerm1": "MUNICH" |
|
}, |
|
{ |
|
"BusinessPartner": "1000000002", |
|
"BusinessPartnerFullName": "Berlin Tech Solutions AG", |
|
"BusinessPartnerCategory": "2", |
|
"CreationDate": "/Date(1641081600000)/", |
|
"IsMarkedForArchiving": False, |
|
"SearchTerm1": "BERLIN" |
|
}, |
|
{ |
|
"BusinessPartner": "1000000003", |
|
"BusinessPartnerFullName": "Hamburg Logistics Ltd", |
|
"BusinessPartnerCategory": "2", |
|
"CreationDate": "/Date(1641168000000)/", |
|
"IsMarkedForArchiving": False, |
|
"SearchTerm1": "HAMBURG" |
|
} |
|
], |
|
"__count": "3" |
|
} |
|
} |
|
|
|
def execute_multi_step_workflow(self, workflow_type: str, query: str) -> str: |
|
"""Execute multi-step workflows with progress tracking""" |
|
|
|
if workflow_type == "credit_analysis": |
|
return self.execute_credit_analysis_workflow(query) |
|
else: |
|
return f"β Unknown workflow type: {workflow_type}" |
|
|
|
def execute_credit_analysis_workflow(self, query: str) -> str: |
|
"""Execute the credit limit analysis workflow""" |
|
workflow_id = f"credit_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
|
|
|
|
|
workflow = MultiStepWorkflow( |
|
workflow_id=workflow_id, |
|
name="Credit Limit Analysis for German Customers", |
|
steps=[ |
|
WorkflowStep(step_number=1, description="Search for customers in Germany", status="pending"), |
|
WorkflowStep(step_number=2, description="Extract customer IDs", status="pending"), |
|
WorkflowStep(step_number=3, description="Fetch credit limits for each customer", status="pending"), |
|
WorkflowStep(step_number=4, description="Analyze and summarize findings", status="pending") |
|
] |
|
) |
|
|
|
self.active_workflows[workflow_id] = workflow |
|
|
|
try: |
|
|
|
workflow.steps[0].status = "in_progress" |
|
workflow.current_step = 1 |
|
|
|
|
|
if "germany" not in query.lower(): |
|
query = "Find customers in Germany" |
|
|
|
params = self.parse_search_query(query) |
|
customers_response = self.call_sap_api("A_BusinessPartner", params) |
|
|
|
workflow.steps[0].status = "completed" |
|
workflow.steps[0].result = customers_response |
|
|
|
|
|
workflow.steps[1].status = "in_progress" |
|
workflow.current_step = 2 |
|
|
|
customers = customers_response.get('d', {}).get('results', []) |
|
if not customers: |
|
workflow.status = "failed" |
|
return "β No customers found in Germany" |
|
|
|
customer_ids = [customer['BusinessPartner'] for customer in customers] |
|
workflow.steps[1].status = "completed" |
|
workflow.steps[1].result = customer_ids |
|
|
|
|
|
workflow.steps[2].status = "in_progress" |
|
workflow.current_step = 3 |
|
|
|
credit_data = [] |
|
for customer_id in customer_ids: |
|
credit_params = { |
|
"$filter": f"BusinessPartner eq '{customer_id}'", |
|
"$top": "1" |
|
} |
|
credit_response = self.call_sap_api("CreditManagementAccount", credit_params, CREDIT_API_URL) |
|
|
|
credit_results = credit_response.get('d', {}).get('results', []) |
|
if credit_results: |
|
credit_info = credit_results[0] |
|
|
|
customer_info = next((c for c in customers if c['BusinessPartner'] == customer_id), {}) |
|
|
|
credit_data.append({ |
|
'customer_id': customer_id, |
|
'customer_name': customer_info.get('BusinessPartnerFullName', 'Unknown'), |
|
'credit_limit': credit_info.get('CreditLimitAmount', '0'), |
|
'currency': credit_info.get('Currency', 'EUR'), |
|
'exposure': credit_info.get('CreditExposureAmount', '0'), |
|
'utilization': self.calculate_utilization( |
|
credit_info.get('CreditExposureAmount', '0'), |
|
credit_info.get('CreditLimitAmount', '0') |
|
) |
|
}) |
|
|
|
workflow.steps[2].status = "completed" |
|
workflow.steps[2].result = credit_data |
|
|
|
|
|
workflow.steps[3].status = "in_progress" |
|
workflow.current_step = 4 |
|
|
|
summary = self.generate_credit_analysis_summary(credit_data, customers) |
|
|
|
workflow.steps[3].status = "completed" |
|
workflow.steps[3].result = summary |
|
workflow.status = "completed" |
|
workflow.final_result = summary |
|
|
|
return summary |
|
|
|
except Exception as e: |
|
workflow.status = "failed" |
|
return f"β Workflow failed at step {workflow.current_step}: {str(e)}" |
|
|
|
def calculate_utilization(self, exposure: str, limit: str) -> float: |
|
"""Calculate credit utilization percentage""" |
|
try: |
|
exposure_val = float(exposure) |
|
limit_val = float(limit) |
|
if limit_val > 0: |
|
return round((exposure_val / limit_val) * 100, 2) |
|
return 0.0 |
|
except: |
|
return 0.0 |
|
|
|
def generate_credit_analysis_summary(self, credit_data: List[Dict], customers: List[Dict]) -> str: |
|
"""Generate comprehensive credit analysis summary""" |
|
|
|
if not credit_data: |
|
return "β No credit data found for German customers" |
|
|
|
|
|
total_customers = len(customers) |
|
customers_with_credit = len(credit_data) |
|
total_credit_limit = sum(float(item['credit_limit']) for item in credit_data) |
|
total_exposure = sum(float(item['exposure']) for item in credit_data) |
|
avg_utilization = sum(item['utilization'] for item in credit_data) / len(credit_data) |
|
|
|
|
|
high_risk = [item for item in credit_data if item['utilization'] > 80] |
|
low_risk = [item for item in credit_data if item['utilization'] < 30] |
|
|
|
|
|
summary = f"""## π Credit Limit Analysis - German Customers |
|
|
|
### π **Workflow Execution Summary** |
|
β
**Step 1:** Found {total_customers} German customers |
|
β
**Step 2:** Extracted customer IDs |
|
β
**Step 3:** Retrieved credit data for {customers_with_credit} customers |
|
β
**Step 4:** Completed analysis and summary |
|
|
|
### π **Key Financial Metrics** |
|
- **Total Credit Limits:** β¬{total_credit_limit:,.2f} |
|
- **Total Credit Exposure:** β¬{total_exposure:,.2f} |
|
- **Average Utilization:** {avg_utilization:.1f}% |
|
- **Overall Exposure Ratio:** {(total_exposure/total_credit_limit*100):.1f}% |
|
|
|
### π¨ **Risk Analysis** |
|
|
|
#### High Risk Customers (>80% utilization): |
|
""" |
|
|
|
if high_risk: |
|
for customer in high_risk: |
|
summary += f""" |
|
**{customer['customer_name']}** (ID: {customer['customer_id']}) |
|
- Credit Limit: β¬{float(customer['credit_limit']):,.2f} |
|
- Current Exposure: β¬{float(customer['exposure']):,.2f} |
|
- Utilization: **{customer['utilization']}%** β οΈ |
|
""" |
|
else: |
|
summary += "\nβ
No high-risk customers found\n" |
|
|
|
summary += f""" |
|
#### Low Risk Customers (<30% utilization): |
|
""" |
|
|
|
if low_risk: |
|
for customer in low_risk[:3]: |
|
summary += f""" |
|
**{customer['customer_name']}** (ID: {customer['customer_id']}) |
|
- Credit Limit: β¬{float(customer['credit_limit']):,.2f} |
|
- Utilization: {customer['utilization']}% β
|
|
""" |
|
if len(low_risk) > 3: |
|
summary += f"\n... and {len(low_risk) - 3} more low-risk customers\n" |
|
else: |
|
summary += "\nβ οΈ No low-risk customers found\n" |
|
|
|
summary += f""" |
|
### π‘ **Recommendations** |
|
1. **Monitor High-Risk Accounts:** Review customers with >80% utilization |
|
2. **Credit Line Reviews:** Consider adjusting limits based on utilization patterns |
|
3. **Payment Terms:** Evaluate payment terms for high-exposure customers |
|
4. **Regular Monitoring:** Set up alerts for utilization threshold breaches |
|
|
|
### π **Detailed Customer List** |
|
""" |
|
|
|
for i, customer in enumerate(credit_data, 1): |
|
risk_indicator = "π΄" if customer['utilization'] > 80 else "π‘" if customer['utilization'] > 50 else "π’" |
|
summary += f""" |
|
{i}. {risk_indicator} **{customer['customer_name']}** |
|
- ID: {customer['customer_id']} |
|
- Credit Limit: β¬{float(customer['credit_limit']):,.2f} |
|
- Exposure: β¬{float(customer['exposure']):,.2f} ({customer['utilization']}%) |
|
""" |
|
|
|
return summary |
|
|
|
def format_business_partner_response(self, response: Dict, original_query: str) -> str: |
|
"""Format business partner API response into readable text""" |
|
try: |
|
if 'd' in response and 'results' in response['d']: |
|
results = response['d']['results'] |
|
total_count = response['d'].get('__count', len(results)) |
|
|
|
if not results: |
|
return f"β No business partners found for: '{original_query}'" |
|
|
|
|
|
formatted_response = f"## π Business Partner Search Results\n\n" |
|
formatted_response += f"**Query:** {original_query}\n" |
|
formatted_response += f"**Found:** {total_count} business partner(s)\n\n" |
|
|
|
for i, partner in enumerate(results, 1): |
|
bp_id = partner.get('BusinessPartner', 'N/A') |
|
bp_name = partner.get('BusinessPartnerFullName', 'N/A') |
|
bp_category = partner.get('BusinessPartnerCategory', 'N/A') |
|
is_archived = partner.get('IsMarkedForArchiving', False) |
|
|
|
category_text = { |
|
'1': 'π€ Person', |
|
'2': 'π’ Organization', |
|
'3': 'π Group' |
|
}.get(bp_category, 'β Unknown') |
|
|
|
status = 'π’ Active' if not is_archived else 'π΄ Archived' |
|
|
|
formatted_response += f"### {i}. {bp_name}\n" |
|
formatted_response += f"- **ID:** {bp_id}\n" |
|
formatted_response += f"- **Type:** {category_text}\n" |
|
formatted_response += f"- **Status:** {status}\n\n" |
|
|
|
return formatted_response |
|
else: |
|
return f"β οΈ Received unexpected response format for: '{original_query}'" |
|
|
|
except Exception as e: |
|
return f"β Error formatting response: {str(e)}" |
|
|
|
def search_business_partners(self, query: str) -> str: |
|
"""Main search function for business partners""" |
|
try: |
|
|
|
params = self.parse_search_query(query) |
|
|
|
|
|
response = self.call_sap_api("A_BusinessPartner", params) |
|
|
|
|
|
return self.format_business_partner_response(response, query) |
|
|
|
except Exception as e: |
|
return f"β Error searching business partners: {str(e)}" |
|
|
|
def process_user_query(self, user_query: str, history: List) -> tuple: |
|
"""Process user query and return response with updated history""" |
|
|
|
|
|
history.append([user_query, None]) |
|
|
|
try: |
|
|
|
if any(keyword in user_query.lower() for keyword in ['credit limit', 'credit analysis', 'germany credit', 'german customers credit']): |
|
response = self.execute_multi_step_workflow("credit_analysis", user_query) |
|
elif any(keyword in user_query.lower() for keyword in ['search', 'find', 'show', 'list', 'get']): |
|
response = self.search_business_partners(user_query) |
|
else: |
|
|
|
system_prompt = """ |
|
You are a SAP Business Partner Assistant with multi-step workflow capabilities. |
|
|
|
Available capabilities: |
|
1. Search business partners |
|
2. Credit limit analysis workflows |
|
3. Multi-step customer analysis |
|
|
|
Guide users to: |
|
- Use search terms for finding partners |
|
- Ask for "credit analysis for German customers" for workflows |
|
- Request specific business partner operations |
|
|
|
Be helpful and explain what you can do. |
|
""" |
|
|
|
ai_response = client.chat.completions.create( |
|
model="gpt-4o-mini", |
|
messages=[ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": user_query} |
|
] |
|
) |
|
|
|
response = ai_response.choices[0].message.content |
|
|
|
|
|
history[-1][1] = response |
|
|
|
except Exception as e: |
|
error_response = f"β Sorry, I encountered an error: {str(e)}\n\nTry asking: 'Run credit analysis for German customers'" |
|
history[-1][1] = error_response |
|
|
|
return "", history |
|
|
|
|
|
sap_agent = SAPBusinessPartnerAgent() |
|
|
|
|
|
def create_interface(): |
|
with gr.Blocks( |
|
title="SAP Business Partner Agent with Workflows", |
|
theme=gr.themes.Soft(), |
|
css=""" |
|
.gradio-container { |
|
max-width: 1200px !important; |
|
} |
|
.agent-header { |
|
text-align: center; |
|
background: linear-gradient(90deg, #0070f3, #00d4ff); |
|
color: white; |
|
padding: 20px; |
|
border-radius: 10px; |
|
margin-bottom: 20px; |
|
} |
|
""" |
|
) as demo: |
|
|
|
|
|
gr.HTML(""" |
|
<div class="agent-header"> |
|
<h1>π€ SAP Business Partner Agent</h1> |
|
<p>Intelligent assistant with multi-step workflow capabilities</p> |
|
</div> |
|
""") |
|
|
|
|
|
with gr.Row(): |
|
with gr.Column(scale=4): |
|
chatbot = gr.Chatbot( |
|
height=600, |
|
label="Chat with SAP Business Partner Agent", |
|
placeholder="Start by asking about business partners or request a workflow..." |
|
) |
|
|
|
with gr.Row(): |
|
msg = gr.Textbox( |
|
placeholder="Try: 'Run credit analysis for German customers' or 'Find partners with name Demo'", |
|
label="Your Message", |
|
scale=4 |
|
) |
|
submit_btn = gr.Button("Send", variant="primary", scale=1) |
|
|
|
|
|
gr.Markdown("### π‘ Quick Examples:") |
|
with gr.Row(): |
|
example1 = gr.Button("Credit Analysis Workflow", size="sm", variant="primary") |
|
example2 = gr.Button("Find German customers", size="sm") |
|
example3 = gr.Button("Search Demo partners", size="sm") |
|
clear_btn = gr.Button("Clear Chat", size="sm", variant="secondary") |
|
|
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown(""" |
|
### π§ Agent Capabilities |
|
|
|
**Multi-Step Workflows:** |
|
- π¦ Credit limit analysis |
|
- π Risk assessment |
|
- π Sequential API calls |
|
|
|
**Search & Retrieve:** |
|
- Find business partners |
|
- Filter by criteria |
|
- Location-based search |
|
|
|
**Partner Types:** |
|
- π€ Persons |
|
- π’ Organizations |
|
- π Groups |
|
|
|
### π Advanced Features |
|
- **Workflow Tracking:** Step-by-step progress |
|
- **Error Handling:** Graceful failure recovery |
|
- **Data Integration:** Multiple SAP APIs |
|
- **Smart Analysis:** AI-powered insights |
|
|
|
### π Workflow Examples |
|
- "Run credit analysis for German customers" |
|
- "Analyze credit limits for suppliers" |
|
- "Find high-risk customers" |
|
""") |
|
|
|
|
|
msg.submit( |
|
sap_agent.process_user_query, |
|
inputs=[msg, chatbot], |
|
outputs=[msg, chatbot] |
|
) |
|
|
|
submit_btn.click( |
|
sap_agent.process_user_query, |
|
inputs=[msg, chatbot], |
|
outputs=[msg, chatbot] |
|
) |
|
|
|
|
|
example1.click(lambda: "Run credit analysis for German customers", outputs=msg) |
|
example2.click(lambda: "Find customers in Germany", outputs=msg) |
|
example3.click(lambda: "Find partners with name Demo", outputs=msg) |
|
clear_btn.click(lambda: [], outputs=chatbot) |
|
|
|
|
|
gr.Markdown(""" |
|
--- |
|
**SAP Business Partner Agent with Multi-Step Workflows** | Powered by OpenAI & SAP APIs | Built for Advanced Agentic AI Learning |
|
""") |
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
demo = create_interface() |
|
demo.launch() |
|
|