Spaces:
Sleeping
Sleeping
# Setup Hugging Face Inference API for LLAMA3 | |
import os | |
import requests | |
import json | |
import gradio as gr | |
from typing import List, Dict, Any, Optional | |
import logging | |
import time | |
# Setup logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Configuration - Set these as environment variables in Hugging Face Spaces | |
SAP_API_KEY = os.getenv('SAP_API_KEY') # Set in Space secrets | |
HF_TOKEN = os.getenv('HF_TOKEN') # Set in Space secrets | |
SAP_BASE_URL = "https://sandbox.api.sap.com/s4hanacloud/sap/opu/odata/sap" | |
# Hugging Face Inference API endpoints | |
HF_API_BASE = "https://api-inference.huggingface.co/models" | |
LLAMA3_MODEL = "meta-llama/Meta-Llama-3-8B-Instruct" # Using inference API | |
class LLAMA3Client: | |
def __init__(self, hf_token: str): | |
self.hf_token = hf_token | |
self.api_url = f"{HF_API_BASE}/{LLAMA3_MODEL}" | |
self.headers = { | |
"Authorization": f"Bearer {hf_token}", | |
"Content-Type": "application/json" | |
} | |
# Warm up the model | |
self._warm_up_model() | |
def _warm_up_model(self): | |
"""Warm up the model to avoid cold start delays""" | |
try: | |
logger.info("Warming up LLAMA3 model...") | |
self._make_inference_request("Hello", max_new_tokens=10) | |
logger.info("Model warmed up successfully") | |
except Exception as e: | |
logger.warning(f"Model warm-up failed: {e}") | |
def _make_inference_request(self, prompt: str, max_new_tokens: int = 500, temperature: float = 0.1, max_retries: int = 3) -> str: | |
"""Make inference request to Hugging Face API with retry logic""" | |
payload = { | |
"inputs": prompt, | |
"parameters": { | |
"max_new_tokens": max_new_tokens, | |
"temperature": temperature, | |
"do_sample": True, | |
"top_p": 0.9, | |
"return_full_text": False | |
} | |
} | |
for attempt in range(max_retries): | |
try: | |
response = requests.post( | |
self.api_url, | |
headers=self.headers, | |
json=payload, | |
timeout=60 | |
) | |
if response.status_code == 503: | |
# Model is loading, wait and retry | |
wait_time = min(20 * (attempt + 1), 60) | |
logger.info(f"Model loading, waiting {wait_time}s...") | |
time.sleep(wait_time) | |
continue | |
response.raise_for_status() | |
result = response.json() | |
if isinstance(result, list) and len(result) > 0: | |
return result[0].get('generated_text', '').strip() | |
elif isinstance(result, dict) and 'generated_text' in result: | |
return result['generated_text'].strip() | |
else: | |
logger.error(f"Unexpected response format: {result}") | |
return "I received an unexpected response format." | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Request failed (attempt {attempt + 1}): {e}") | |
if attempt == max_retries - 1: | |
return f"Failed to get response after {max_retries} attempts: {str(e)}" | |
time.sleep(2 ** attempt) # Exponential backoff | |
except Exception as e: | |
logger.error(f"Unexpected error: {e}") | |
return f"An unexpected error occurred: {str(e)}" | |
return "Failed to generate response" | |
def generate_response(self, prompt: str, max_length: int = 500, temperature: float = 0.1) -> str: | |
"""Generate response using LLAMA3 via Inference API""" | |
# Format prompt for LLAMA3 instruction format | |
formatted_prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|> | |
You are a helpful SAP data analyst. Provide clear, concise answers based on the provided data. Keep responses under 300 words.<|eot_id|><|start_header_id|>user<|end_header_id|> | |
{prompt}<|eot_id|><|start_header_id|>assistant<|end_header_id|> | |
""" | |
try: | |
response = self._make_inference_request( | |
formatted_prompt, | |
max_new_tokens=min(max_length, 400), # Limit tokens to avoid timeouts | |
temperature=temperature | |
) | |
# Clean up the response | |
if response and len(response.strip()) > 0: | |
return response | |
else: | |
return "I couldn't generate a proper response. Please try rephrasing your question." | |
except Exception as e: | |
logger.error(f"Error generating response: {e}") | |
return f"I encountered an error while processing your question: {str(e)}" | |
class SAPDataFetcher: | |
def __init__(self, api_key: str): | |
self.api_key = api_key | |
self.headers = { | |
"APIKey": api_key, | |
"Accept": "application/json", | |
"Content-Type": "application/json" | |
} | |
def _make_request(self, url: str, timeout: int = 30) -> Optional[Dict]: | |
"""Make HTTP request with proper error handling""" | |
try: | |
logger.info(f"Making request to: {url}") | |
response = requests.get(url, headers=self.headers, timeout=timeout) | |
response.raise_for_status() | |
data = response.json() | |
logger.info(f"Request successful. Response size: {len(str(data))}") | |
return data | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Request failed: {e}") | |
return None | |
except json.JSONDecodeError as e: | |
logger.error(f"JSON decode error: {e}") | |
return None | |
def fetch_sales_orders(self, top: int = 30) -> List[Dict]: | |
"""Fetch sales orders with error handling""" | |
url = f"{SAP_BASE_URL}/API_SALES_ORDER_SRV/A_SalesOrder?$top={top}&$inlinecount=allpages" | |
data = self._make_request(url) | |
if data and 'd' in data and 'results' in data['d']: | |
orders = data['d']['results'] | |
# Simplify the data structure | |
simplified_orders = [] | |
for order in orders: | |
simplified_order = { | |
"SalesOrder": order.get("SalesOrder", ""), | |
"SalesOrderType": order.get("SalesOrderType", ""), | |
"SalesOrganization": order.get("SalesOrganization", ""), | |
"SoldToParty": order.get("SoldToParty", ""), | |
"CreationDate": order.get("CreationDate", ""), | |
"CreatedByUser": order.get("CreatedByUser", ""), | |
"TransactionCurrency": order.get("TransactionCurrency", ""), | |
"TotalNetAmount": order.get("TotalNetAmount", "0") | |
} | |
simplified_orders.append(simplified_order) | |
return simplified_orders | |
else: | |
logger.error("Failed to fetch sales orders or invalid response format") | |
return [] | |
def fetch_purchase_orders(self, top: int = 30) -> List[Dict]: | |
"""Fetch purchase order headers""" | |
url = f"{SAP_BASE_URL}/API_PURCHASEORDER_PROCESS_SRV/A_PurchaseOrder?$top={top}&$inlinecount=allpages" | |
data = self._make_request(url) | |
if data and 'd' in data and 'results' in data['d']: | |
orders = data['d']['results'] | |
simplified_orders = [] | |
for order in orders: | |
simplified_order = { | |
"PurchaseOrder": order.get("PurchaseOrder", ""), | |
"CompanyCode": order.get("CompanyCode", ""), | |
"PurchaseOrderType": order.get("PurchaseOrderType", ""), | |
"CreatedByUser": order.get("CreatedByUser", ""), | |
"CreationDate": order.get("CreationDate", ""), | |
"Supplier": order.get("Supplier", ""), | |
"PurchasingOrganization": order.get("PurchasingOrganization", ""), | |
"PurchasingGroup": order.get("PurchasingGroup", ""), | |
"PurchaseOrderDate": order.get("PurchaseOrderDate", ""), | |
"DocumentCurrency": order.get("DocumentCurrency", ""), | |
"ExchangeRate": order.get("ExchangeRate", "1.0") | |
} | |
simplified_orders.append(simplified_order) | |
return simplified_orders | |
else: | |
logger.error("Failed to fetch purchase orders or invalid response format") | |
return [] | |
def fetch_purchase_order_items(self, purchase_orders: List[str]) -> List[Dict]: | |
"""Fetch purchase order items for given order numbers""" | |
all_items = [] | |
for po_number in purchase_orders[:5]: # Reduced limit for faster processing | |
url = f"{SAP_BASE_URL}/API_PURCHASEORDER_PROCESS_SRV/A_PurchaseOrderItem?$filter=PurchaseOrder eq '{po_number}'" | |
data = self._make_request(url) | |
if data and 'd' in data and 'results' in data['d']: | |
items = data['d']['results'] | |
for item in items: | |
simplified_item = { | |
"PurchaseOrder": item.get("PurchaseOrder", ""), | |
"PurchaseOrderItem": item.get("PurchaseOrderItem", ""), | |
"Plant": item.get("Plant", ""), | |
"StorageLocation": item.get("StorageLocation", ""), | |
"MaterialGroup": item.get("MaterialGroup", ""), | |
"OrderQuantity": item.get("OrderQuantity", "0"), | |
"PurchaseOrderQuantityUnit": item.get("PurchaseOrderQuantityUnit", ""), | |
"DocumentCurrency": item.get("DocumentCurrency", ""), | |
"NetPriceAmount": item.get("NetPriceAmount", "0"), | |
"NetPriceQuantity": item.get("NetPriceQuantity", "0") | |
} | |
all_items.append(simplified_item) | |
return all_items | |
class SAPAgent: | |
def __init__(self, data_fetcher: SAPDataFetcher, llama_client: LLAMA3Client): | |
self.data_fetcher = data_fetcher | |
self.llama_client = llama_client | |
def categorize_query(self, question: str) -> str: | |
"""Determine if query is about sales or purchase orders""" | |
category_prompt = f"""Analyze this question and determine if it's about Sales Orders or Purchase Orders: | |
Question: "{question}" | |
Guidelines: | |
- Sales Orders: customer orders, sales transactions, revenue, sold to party | |
- Purchase Orders: supplier orders, procurement, purchasing, vendor transactions | |
Respond with exactly one word: "sales" or "purchase" """ | |
try: | |
response = self.llama_client.generate_response(category_prompt, max_length=20, temperature=0) | |
category = response.strip().lower() | |
return "sales" if "sales" in category else "purchase" | |
except Exception as e: | |
logger.error(f"Error in categorization: {e}") | |
return "purchase" # Default to purchase | |
def needs_item_details(self, question: str) -> bool: | |
"""Determine if question requires item-level details""" | |
detail_prompt = f"""Does this question require detailed item-level information (quantities, prices, materials, line items)? | |
Question: "{question}" | |
Answer only "yes" or "no" """ | |
try: | |
response = self.llama_client.generate_response(detail_prompt, max_length=20, temperature=0) | |
answer = response.strip().lower() | |
return "yes" in answer | |
except Exception as e: | |
logger.error(f"Error determining detail needs: {e}") | |
return False | |
def process_query(self, question: str) -> str: | |
"""Main function to process user queries""" | |
logger.info(f"Processing query: {question}") | |
# Categorize the query | |
category = self.categorize_query(question) | |
logger.info(f"Query categorized as: {category}") | |
# Fetch appropriate data | |
if category == "sales": | |
data = self.data_fetcher.fetch_sales_orders() | |
data_type = "Sales Orders" | |
context = {"orders": data} | |
else: | |
# Fetch purchase order headers | |
po_headers = self.data_fetcher.fetch_purchase_orders() | |
context = {"headers": po_headers} | |
data_type = "Purchase Order Headers" | |
# Check if item details are needed | |
if self.needs_item_details(question) and po_headers: | |
logger.info("Fetching item-level details") | |
po_numbers = [po["PurchaseOrder"] for po in po_headers[:5] if po["PurchaseOrder"]] # Limit for performance | |
po_items = self.data_fetcher.fetch_purchase_order_items(po_numbers) | |
context["items"] = po_items | |
data_type = "Purchase Orders with Item Details" | |
# Calculate total value | |
total_value = 0.0 | |
for item in po_items: | |
try: | |
net_price = float(item.get("NetPriceAmount", 0)) | |
quantity = float(item.get("OrderQuantity", 0)) | |
total_value += net_price * quantity | |
except (ValueError, TypeError): | |
continue | |
context["total_value"] = total_value | |
# Generate response using LLAMA3 | |
return self.generate_response(question, context, data_type) | |
def generate_response(self, question: str, context: Dict, data_type: str) -> str: | |
"""Generate response using LLAMA3""" | |
# Limit context size for API efficiency | |
context_str = json.dumps(context, indent=2) | |
if len(context_str) > 2000: # Smaller limit for API | |
context_str = context_str[:2000] + "... (truncated)" | |
prompt = f"""Data Type: {data_type} | |
Available Data: | |
{context_str} | |
User Question: {question} | |
Instructions: | |
1. Provide a clear, concise answer based on the data | |
2. Include specific numbers, dates, or values when relevant | |
3. If the data doesn't contain enough information to answer fully, mention this | |
4. Format your response in a user-friendly way | |
5. Keep response under 250 words""" | |
try: | |
return self.llama_client.generate_response(prompt, max_length=400, temperature=0.1) | |
except Exception as e: | |
logger.error(f"Error generating response: {e}") | |
return f"I encountered an error while processing your question: {str(e)}" | |
# Initialize the system | |
try: | |
if not HF_TOKEN: | |
logger.error("HF_TOKEN not found in environment variables") | |
sap_agent = None | |
else: | |
llama_client = LLAMA3Client(HF_TOKEN) | |
if SAP_API_KEY: | |
data_fetcher = SAPDataFetcher(SAP_API_KEY) | |
sap_agent = SAPAgent(data_fetcher, llama_client) | |
logger.info("SAP Agent initialized successfully") | |
else: | |
logger.warning("SAP_API_KEY not found. Demo mode enabled.") | |
sap_agent = None | |
except Exception as e: | |
logger.error(f"Failed to initialize SAP Agent: {e}") | |
sap_agent = None | |
# Gradio Interface | |
def chat_with_sap(message, history): | |
"""Handle chat interactions""" | |
if not sap_agent: | |
return history + [("System", "SAP Agent not initialized. Please check your HF_TOKEN and SAP_API_KEY in Space secrets.")] | |
if not message.strip(): | |
return history | |
try: | |
# Add typing indicator | |
history = history or [] | |
history.append((message, "π€ Thinking...")) | |
yield history | |
# Process the query | |
response = sap_agent.process_query(message) | |
history[-1] = (message, response) | |
yield history | |
except Exception as e: | |
error_msg = f"Error processing your request: {str(e)}" | |
history = history or [] | |
if history and history[-1][1] == "π€ Thinking...": | |
history[-1] = (message, error_msg) | |
else: | |
history.append((message, error_msg)) | |
yield history | |
def clear_chat(): | |
return [] | |
# Create Gradio interface | |
with gr.Blocks(title="SAP Order Analytics Agent with LLAMA3") as demo: | |
gr.Markdown(""" | |
# π SAP Order Analytics Agent (Powered by LLAMA3 via Inference API) | |
This AI agent uses Meta's LLAMA3 model via Hugging Face Inference API to analyze SAP data. Ask questions like: | |
- "How many sales orders do we have?" | |
- "What's the total value of all purchase orders?" | |
- "Show me recent purchase orders" | |
- "What are the top suppliers?" | |
**Setup Required:** | |
1. Set `HF_TOKEN` in Space secrets (your Hugging Face token) | |
2. Set `SAP_API_KEY` in Space secrets (your SAP API key) | |
3. Ensure you have access to LLAMA3 model on Hugging Face | |
""") | |
chatbot = gr.Chatbot( | |
height=500, | |
placeholder="Ask me anything about your SAP orders...", | |
show_copy_button=True | |
) | |
with gr.Row(): | |
msg = gr.Textbox( | |
label="Your Question", | |
placeholder="Type your question here...", | |
scale=4 | |
) | |
submit_btn = gr.Button("Send", scale=1, variant="primary") | |
clear_btn = gr.Button("Clear", scale=1) | |
# Event handlers | |
submit_btn.click(chat_with_sap, [msg, chatbot], [chatbot]) | |
msg.submit(chat_with_sap, [msg, chatbot], [chatbot]) | |
clear_btn.click(clear_chat, outputs=[chatbot]) | |
# Clear input after submission | |
submit_btn.click(lambda: "", outputs=[msg]) | |
msg.submit(lambda: "", outputs=[msg]) | |
# Launch the interface | |
if __name__ == "__main__": | |
demo.launch(share=True) |