Spaces:
Running
Running
import os | |
import random | |
import gradio as gr | |
from datetime import datetime | |
from transformers import pipeline | |
from simple_salesforce import Salesforce, SalesforceLogin | |
from dotenv import load_dotenv | |
import xml.etree.ElementTree as ET | |
# ---------- Load Environment Variables ---------- | |
load_dotenv() | |
SF_USERNAME = os.getenv("SF_USERNAME") | |
SF_PASSWORD = os.getenv("SF_PASSWORD") | |
SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN") | |
# ---------- Label Mapping ---------- | |
label_to_issue_type = { | |
"LABEL_0": "Performance", | |
"LABEL_1": "Error", | |
"LABEL_2": "Security", | |
"LABEL_3": "Best Practice" | |
} | |
suggestions = { | |
"Performance": "Consider optimizing loops and database access. Use collections to reduce SOQL queries.", | |
"Error": "Add proper error handling and null checks. Use try-catch blocks effectively.", | |
"Security": "Avoid dynamic SOQL. Use binding variables to prevent SOQL injection.", | |
"Best Practice": "Refactor for readability and use bulk-safe patterns, such as processing records in batches." | |
} | |
severities = { | |
"Performance": "Medium", | |
"Error": "High", | |
"Security": "High", | |
"Best Practice": "Low" | |
} | |
# ---------- Load QnA Model (no fallback) ---------- | |
qa_pipeline = pipeline("text2text-generation", model="google/flan-t5-large") | |
# ---------- Logging ---------- | |
def log_to_console(data, log_type): | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
print(f"[{timestamp}] {log_type} Log: {data}") | |
# ---------- Salesforce Connection ---------- | |
try: | |
session_id, instance = SalesforceLogin( | |
username=SF_USERNAME, | |
password=SF_PASSWORD, | |
security_token=SF_SECURITY_TOKEN | |
) | |
sf = Salesforce(instance=instance, session_id=session_id) | |
print("β Connected to Salesforce successfully") | |
except Exception as e: | |
sf = None | |
print(f"β Failed to connect to Salesforce: {e}") | |
# ---------- Code Analyzer ---------- | |
def analyze_code(code): | |
if not code.strip(): | |
return "No code provided.", "", "" | |
label = random.choice(list(label_to_issue_type.keys())) | |
issue_type = label_to_issue_type[label] | |
suggestion = suggestions[issue_type] | |
severity = severities[issue_type] | |
review_data = { | |
"Name": f"Review_{issue_type}", | |
"CodeSnippet__c": code, | |
"IssueType__c": issue_type, | |
"Suggestion__c": suggestion, | |
"Severity__c": severity | |
} | |
log_to_console(review_data, "Code Review") | |
if sf: | |
try: | |
result = sf.CodeReviewResult__c.create(review_data) | |
if result.get("success"): | |
log_to_console({"Salesforce Record ID": result["id"]}, "Salesforce Create") | |
else: | |
log_to_console(result, "Salesforce Error") | |
except Exception as e: | |
log_to_console({"Salesforce Exception": str(e)}, "Salesforce Error") | |
else: | |
log_to_console("Salesforce not connected.", "Salesforce Error") | |
return issue_type, suggestion, severity | |
# ---------- Metadata Validator ---------- | |
def validate_metadata(metadata, admin_id=None): | |
if not metadata.strip(): | |
return "No metadata provided.", "", "" | |
mtype = "Field" | |
issue = "Unknown" | |
recommendation = "No recommendation found." | |
try: | |
root = ET.fromstring(metadata) | |
description_found = any(elem.tag.endswith('description') for elem in root) | |
if not description_found: | |
issue = "Missing description" | |
recommendation = "Add a meaningful <description> to improve maintainability and clarity." | |
else: | |
issue = "Unused field detected" | |
recommendation = "Remove it to improve performance or document its purpose." | |
except Exception as e: | |
issue = "Invalid XML" | |
recommendation = f"Could not parse metadata XML. Error: {str(e)}" | |
log_data = { | |
"Name": f"MetadataLog_{mtype}", | |
"MetadataType__c": mtype, | |
"IssueDescription__c": issue, | |
"Recommendation__c": recommendation, | |
"Status__c": "Open" | |
} | |
if admin_id: | |
log_data["Admin__c"] = admin_id | |
log_to_console(log_data, "Metadata Validation") | |
if sf: | |
try: | |
result = sf.MetadataAuditLog__c.create(log_data) | |
if result.get("success"): | |
log_to_console({"Salesforce MetadataAuditLog Record ID": result["id"]}, "Salesforce Create") | |
else: | |
log_to_console(result, "Salesforce Metadata Error") | |
except Exception as e: | |
log_to_console({"Salesforce Exception": str(e)}, "Salesforce Error") | |
else: | |
log_to_console("Salesforce not connected.", "Salesforce Error") | |
return mtype, issue, recommendation | |
# ---------- Salesforce Chatbot (Improved Prompt) ---------- | |
conversation_history = [] | |
def salesforce_chatbot(query, history=[]): | |
global conversation_history | |
if not query.strip(): | |
return "Please provide a valid Salesforce-related question." | |
salesforce_keywords = [ | |
"apex", "soql", "trigger", "lwc", "aura", "visualforce", "salesforce", "governor limits", | |
"dml", "metadata", "batch apex", "queueable", "future method", "api", "sfdc", "heap", "limits" | |
] | |
if not any(keyword.lower() in query.lower() for keyword in salesforce_keywords): | |
return "Please ask a Salesforce-related question." | |
history_summary = "\n".join([f"User: {q}\nAssistant: {a}" for q, a in conversation_history[-4:]]) | |
prompt = f""" | |
You are a certified Salesforce developer and architect. Your role is to answer with 100% accurate and detailed technical explanations, especially about limits, code, and platform best practices. | |
Your answers MUST: | |
- Always be at least two lines long. | |
- Be correct, clear, and production-safe. | |
- Include official Salesforce governor limits when applicable. | |
- Use bullet points or code snippets when needed. | |
- Recommend Trailhead or official docs if the answer isn't definitive. | |
- Follow real-world practices (bulkification, error handling, etc). | |
Conversation History: | |
{history_summary} | |
User: {query.strip()} | |
Assistant: | |
""" | |
try: | |
result = qa_pipeline(prompt, max_new_tokens=1024, do_sample=False, temperature=0.1, top_k=50) | |
output = result[0]["generated_text"].strip() | |
if output.startswith("Assistant:"): | |
output = output.replace("Assistant:", "").strip() | |
if len(output.split()) < 15: | |
output += "\n\nRefer to: https://developer.salesforce.com/docs for more." | |
conversation_history.append((query, output)) | |
conversation_history = conversation_history[-6:] | |
log_to_console({"Question": query, "Answer": output}, "Chatbot Query") | |
return output | |
except Exception as e: | |
return f"β οΈ Error generating response: {str(e)}" | |
# ---------- Gradio UI ---------- | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# π€ Advanced Salesforce AI Code Review & Chatbot") | |
with gr.Tab("Code Review"): | |
code_input = gr.Textbox(label="Apex / LWC Code", lines=8, placeholder="Enter your Apex or LWC code here") | |
issue_type = gr.Textbox(label="Issue Type") | |
suggestion = gr.Textbox(label="AI Suggestion") | |
severity = gr.Textbox(label="Severity") | |
code_button = gr.Button("Analyze Code") | |
code_button.click(analyze_code, inputs=code_input, outputs=[issue_type, suggestion, severity]) | |
with gr.Tab("Metadata Validation"): | |
metadata_input = gr.Textbox(label="Metadata XML", lines=8, placeholder="Enter your metadata XML here") | |
mtype = gr.Textbox(label="Type") | |
issue = gr.Textbox(label="Issue") | |
recommendation = gr.Textbox(label="Recommendation") | |
metadata_button = gr.Button("Validate Metadata") | |
metadata_button.click(validate_metadata, inputs=metadata_input, outputs=[mtype, issue, recommendation]) | |
with gr.Tab("Salesforce Chatbot"): | |
chatbot_output = gr.Chatbot(label="Conversation History", height=400) | |
query_input = gr.Textbox(label="Your Question", placeholder="e.g., How many DML operations are allowed in Apex?") | |
with gr.Row(): | |
chatbot_button = gr.Button("Ask") | |
clear_button = gr.Button("Clear Chat") | |
chat_state = gr.State(value=[]) | |
def update_chatbot(query, chat_history): | |
if not query.strip(): | |
return chat_history, "Please enter a valid question." | |
response = salesforce_chatbot(query, chat_history) | |
chat_history.append((query, response)) | |
return chat_history, "" | |
def clear_chat(): | |
global conversation_history | |
conversation_history = [] | |
return [], "" | |
chatbot_button.click(fn=update_chatbot, inputs=[query_input, chat_state], outputs=[chatbot_output, query_input]) | |
clear_button.click(fn=clear_chat, inputs=None, outputs=[chatbot_output, query_input]) | |
if __name__ == "__main__": | |
demo.launch() | |