"""Utility functions for Salesforce B2B Commerce Migration Assistant""" import re import json import logging from typing import Dict, Tuple, List, Optional # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Apex syntax patterns for validation APEX_PATTERNS = { "class_declaration": r"(?:public|private|global|protected)\s+(?:virtual|abstract|with sharing|without sharing|inherited sharing)?\s*class\s+\w+", "trigger_declaration": r"trigger\s+\w+\s+on\s+\w+\s*\([^)]+\)", "method_declaration": r"(?:public|private|global|protected)\s+(?:static)?\s*(?:void|\w+)\s+\w+\s*\([^)]*\)", "soql_query": r"(?:\[|Database\.query\s*\()\s*SELECT\s+.*?\s+FROM\s+\w+.*?(?:\]|\))", "dml_operation": r"(?:insert|update|delete|undelete|upsert|merge)\s+\w+", "bulkification_issue": r"for\s*\([^)]+\)\s*{[^}]*(?:insert|update|delete|undelete)\s+", "hardcoded_id": r"(?:\'[a-zA-Z0-9]{15}\'|\'[a-zA-Z0-9]{18}\')", "missing_null_check": r"(\w+)\.(\w+)(?!\s*(?:!=|==)\s*null)", "governor_limit_risk": r"(?:for\s*\([^)]+\)\s*{[^}]*\[SELECT|Database\.query)", } # Common Apex errors and their fixes APEX_ERRORS = { "missing_semicolon": { "pattern": r"[^{};]\s*\n\s*(?:public|private|global|protected|if|for|while|try)", "message": "Missing semicolon at end of statement", "severity": "error" }, "unclosed_bracket": { "pattern": r"(?:\{(?:[^{}]|(?:\{[^{}]*\}))*$)|(?:^[^{}]*\})", "message": "Unclosed or extra bracket detected", "severity": "error" }, "invalid_soql": { "pattern": r"\[\s*SELECT\s+FROM\s+\w+", "message": "Invalid SOQL: Missing field selection", "severity": "error" }, "missing_try_catch_dml": { "pattern": r"(? Tuple[bool, List[Dict[str, str]]]: """Validate Apex syntax and return errors/warnings.""" issues = [] # Check for basic syntax errors for error_type, error_info in APEX_ERRORS.items(): matches = re.finditer(error_info["pattern"], code, re.MULTILINE | re.DOTALL) for match in matches: issues.append({ "type": error_info["severity"], "message": error_info["message"], "line": code[:match.start()].count('\n') + 1, "position": match.start() }) # Check for Apex-specific patterns if not re.search(APEX_PATTERNS["class_declaration"], code) and \ not re.search(APEX_PATTERNS["trigger_declaration"], code): issues.append({ "type": "error", "message": "No valid Apex class or trigger declaration found", "line": 1, "position": 0 }) # Check for bulkification issues bulk_issues = re.finditer(APEX_PATTERNS["bulkification_issue"], code, re.DOTALL) for match in bulk_issues: issues.append({ "type": "error", "message": "DML operation inside loop - violates bulkification best practices", "line": code[:match.start()].count('\n') + 1, "position": match.start() }) # Check for hardcoded IDs hardcoded_ids = re.finditer(APEX_PATTERNS["hardcoded_id"], code) for match in hardcoded_ids: issues.append({ "type": "warning", "message": "Hardcoded Salesforce ID detected - use Custom Settings or Custom Metadata", "line": code[:match.start()].count('\n') + 1, "position": match.start() }) # Check for governor limit risks gov_limit_risks = re.finditer(APEX_PATTERNS["governor_limit_risk"], code, re.DOTALL) for match in gov_limit_risks: issues.append({ "type": "warning", "message": "SOQL query inside loop - potential governor limit issue", "line": code[:match.start()].count('\n') + 1, "position": match.start() }) has_errors = any(issue["type"] == "error" for issue in issues) return not has_errors, issues def perform_skeptical_evaluation(code: str, context: str = "trigger") -> Dict[str, any]: """Perform skeptical evaluation of code looking for common issues.""" evaluation = { "syntax_issues": [], "security_concerns": [], "performance_issues": [], "best_practice_violations": [], "b2b_commerce_issues": [] } # Syntax validation is_valid, syntax_issues = validate_apex_syntax(code) evaluation["syntax_issues"] = syntax_issues # Security checks if re.search(r"without\s+sharing", code, re.IGNORECASE): evaluation["security_concerns"].append({ "type": "warning", "message": "Class declared 'without sharing' - ensure this is intentional" }) if not re.search(r"\.stripInaccessible\(", code) and re.search(r"(insert|update)\s+", code): evaluation["security_concerns"].append({ "type": "warning", "message": "DML operations without stripInaccessible - potential FLS violation" }) # Performance checks nested_loops = re.findall(r"for\s*\([^)]+\)\s*\{[^}]*for\s*\([^)]+\)", code, re.DOTALL) if nested_loops: evaluation["performance_issues"].append({ "type": "warning", "message": f"Nested loops detected ({len(nested_loops)} occurrences) - review for O(n²) complexity" }) # Check for missing test assertions (if it's a test class) if re.search(r"@isTest|testMethod", code, re.IGNORECASE): if not re.search(r"System\.assert|Assert\.", code): evaluation["best_practice_violations"].append({ "type": "error", "message": "Test class without assertions - tests must verify behavior" }) # B2B Commerce specific checks cloudcraze_refs = re.findall(B2B_COMMERCE_PATTERNS["cloudcraze_reference"], code) if cloudcraze_refs: evaluation["b2b_commerce_issues"].append({ "type": "error", "message": f"CloudCraze references found ({len(set(cloudcraze_refs))} unique) - must be migrated to B2B LEX" }) deprecated_methods = re.findall(B2B_COMMERCE_PATTERNS["deprecated_method"], code) if deprecated_methods: evaluation["b2b_commerce_issues"].append({ "type": "error", "message": f"Deprecated CloudCraze methods found: {', '.join(set(deprecated_methods))}" }) return evaluation def extract_code_blocks(text: str) -> str: """Enhanced code extraction with multiple strategies.""" # Strategy 1: Standard code blocks with language markers pattern = r"```(?:apex|java|Apex|Java|APEX|JAVA)?\s*(.*?)```" matches = re.findall(pattern, text, re.DOTALL | re.IGNORECASE) code_blocks = [] for block in matches: cleaned_block = block.strip() if cleaned_block: code_blocks.append(cleaned_block) # Strategy 2: Improved fallback detection for Apex-specific patterns if not code_blocks: apex_patterns = [ # Class declarations (including inner classes) r"((?:public|private|global|protected)\s+(?:virtual|abstract|with sharing|without sharing|inherited sharing)?\s*class\s+\w+(?:\s+extends\s+\w+)?(?:\s+implements\s+[\w\s,]+)?\s*\{(?:[^{}]|\{[^{}]*\})*\})", # Trigger declarations r"(trigger\s+\w+\s+on\s+\w+\s*\([^)]+\)\s*\{(?:[^{}]|\{[^{}]*\})*\})", # Interface declarations r"((?:public|private|global)\s+interface\s+\w+(?:\s+extends\s+[\w\s,]+)?\s*\{(?:[^{}]|\{[^{}]*\})*\})", # Enum declarations r"((?:public|private|global)\s+enum\s+\w+\s*\{[^}]+\})", # Annotated methods or classes r"(@\w+(?:\([^)]*\))?\s*(?:public|private|global|protected).*?(?:\{(?:[^{}]|\{[^{}]*\})*\}|;))" ] for pattern in apex_patterns: found = re.findall(pattern, text, re.DOTALL | re.MULTILINE) code_blocks.extend(found) # Strategy 3: Look for code between specific markers if not code_blocks: # Look for code after phrases like "corrected code:", "here's the code:", etc. marker_patterns = [ r"(?:corrected|fixed|updated|converted|modified)\s+code\s*:\s*\n((?:(?:public|private|global|trigger).*?)(?=\n\n|\Z))", r"(?:here'?s?|below is)\s+(?:the|your)\s+(?:corrected|fixed|updated)\s+\w+\s*:\s*\n((?:(?:public|private|global|trigger).*?)(?=\n\n|\Z))" ] for pattern in marker_patterns: found = re.findall(pattern, text, re.DOTALL | re.IGNORECASE) code_blocks.extend(found) return '\n\n'.join(filter(None, code_blocks)) def format_structured_explanation(response: str, code_output: str) -> str: """Format the explanation in a structured, brief manner.""" # Extract key sections using regex sections = { "key_changes": "", "critical_issues": "", "warnings": "" } # Extract KEY CHANGES section key_match = re.search(r"##\s*KEY CHANGES.*?\n((?:[-•]\s*.*?\n)+)", response, re.IGNORECASE | re.DOTALL) if key_match: sections["key_changes"] = key_match.group(1).strip() # Extract CRITICAL ISSUES section critical_match = re.search(r"##\s*CRITICAL ISSUES.*?\n((?:\d+\..*?\n)+)", response, re.IGNORECASE | re.DOTALL) if critical_match: sections["critical_issues"] = critical_match.group(1).strip() # Extract WARNINGS section warning_match = re.search(r"##\s*REMAINING WARNINGS.*?\n((?:[-•]\s*.*?\n)*)", response, re.IGNORECASE | re.DOTALL) if warning_match: sections["warnings"] = warning_match.group(1).strip() # Build formatted explanation formatted = "### Summary of Changes\n\n" if sections["key_changes"]: formatted += "**Key Changes:**\n" + sections["key_changes"] + "\n\n" if sections["critical_issues"]: formatted += "**Critical Issues Fixed:**\n" + sections["critical_issues"] + "\n\n" if sections["warnings"]: formatted += "**⚠️ Remaining Warnings:**\n" + sections["warnings"] # If structured extraction failed, provide a brief summary if not any(sections.values()): # Fall back to a simple extraction formatted = "### Code Correction Summary\n\n" formatted += "The code has been corrected and optimized. " formatted += "Check the code output for inline comments explaining specific changes.\n\n" formatted += "For detailed analysis, see the Full Model Response." return formatted.strip() def format_object_conversion_explanation(response: str, code_output: str) -> str: """Format the object conversion explanation in a structured manner.""" sections = { "mapping": "", "field_table": "", "steps": "", "warnings": "" } # Extract object mapping section mapping_match = re.search(r"##\s*B2B LEX OBJECT MAPPING.*?\n((?:[-•]\s*.*?\n)+)", response, re.IGNORECASE | re.DOTALL) if mapping_match: sections["mapping"] = mapping_match.group(1).strip() # Extract field mappings table table_match = re.search(r"##\s*FIELD MAPPINGS.*?\n((?:\|.*?\|.*?\n)+)", response, re.IGNORECASE | re.DOTALL) if table_match: sections["field_table"] = table_match.group(1).strip() # Extract migration steps steps_match = re.search(r"##\s*MIGRATION STEPS.*?\n((?:\d+\..*?\n)+)", response, re.IGNORECASE | re.DOTALL) if steps_match: sections["steps"] = steps_match.group(1).strip() # Extract warnings warning_match = re.search(r"##\s*WARNINGS.*?\n((?:[-•]\s*.*?\n)*)", response, re.IGNORECASE | re.DOTALL) if warning_match: sections["warnings"] = warning_match.group(1).strip() # Build formatted explanation formatted = "### Conversion Summary\n\n" if sections["mapping"]: formatted += "**Object Mapping:**\n" + sections["mapping"] + "\n\n" if sections["field_table"]: formatted += "**Field Mappings:**\n" + sections["field_table"] + "\n\n" if sections["steps"]: formatted += "**Migration Steps:**\n" + sections["steps"] + "\n\n" if sections["warnings"]: formatted += "**⚠️ Important Notes:**\n" + sections["warnings"] # Fallback if structured extraction failed if not any(sections.values()): formatted = "### Conversion Summary\n\n" formatted += "The CloudCraze object has been converted to B2B Lightning Experience format. " formatted += "Check the code output for the complete implementation.\n\n" formatted += "For detailed field mappings and migration steps, see the Full Model Response." return formatted.strip() def extract_validation_metrics(validation_text: str) -> Optional[Dict[str, float]]: """Enhanced JSON extraction for validation metrics.""" try: # Strategy 1: Look for JSON after specific markers json_patterns = [ r'(?:json|JSON|assessment|Assessment)[\s:]*({[^{}]*(?:{[^{}]*}[^{}]*)*})', r'```json\s*({[^`]+})\s*```', r'({[^{}]*"quality_rating"[^{}]*(?:{[^{}]*}[^{}]*)*})' ] for pattern in json_patterns: matches = re.findall(pattern, validation_text, re.DOTALL) for match in matches: try: data = json.loads(match) if "quality_rating" in data: return normalize_metrics(data) except json.JSONDecodeError: continue # Strategy 2: Extract individual metrics if JSON parsing fails metrics = {} metric_patterns = { "quality_rating": r"quality_rating[\"']?\s*:\s*(\d+(?:\.\d+)?)", "accuracy": r"accuracy[\"']?\s*:\s*(\d+(?:\.\d+)?)", "completeness": r"completeness[\"']?\s*:\s*(\d+(?:\.\d+)?)", "best_practices_alignment": r"best_practices_alignment[\"']?\s*:\s*(\d+(?:\.\d+)?)", "syntax_validity": r"syntax_validity[\"']?\s*:\s*(\d+(?:\.\d+)?)", "security_score": r"security_score[\"']?\s*:\s*(\d+(?:\.\d+)?)", "performance_score": r"performance_score[\"']?\s*:\s*(\d+(?:\.\d+)?)" } for metric, pattern in metric_patterns.items(): match = re.search(pattern, validation_text, re.IGNORECASE) if match: metrics[metric] = float(match.group(1)) if metrics: return normalize_metrics(metrics) return None except Exception as e: logger.error(f"Error extracting metrics: {e}") return None def normalize_metrics(data: Dict) -> Dict[str, float]: """Ensure metrics are in the correct format and range.""" normalized = { "quality_rating": min(10, max(0, float(data.get("quality_rating", 0)))), "accuracy": min(1.0, max(0.0, float(data.get("accuracy", 0.0)))), "completeness": min(1.0, max(0.0, float(data.get("completeness", 0.0)))), "best_practices_alignment": min(1.0, max(0.0, float(data.get("best_practices_alignment", 0.0)))), "syntax_validity": min(1.0, max(0.0, float(data.get("syntax_validity", 0.0)))), "security_score": min(1.0, max(0.0, float(data.get("security_score", 0.0)))), "performance_score": min(1.0, max(0.0, float(data.get("performance_score", 0.0)))) } return normalized def generate_test_cases(code_type: str, code: str) -> str: """Generate test cases for the given code.""" if code_type == "trigger": return f""" // Test class for the trigger @isTest private class Test_MigratedTrigger {{ @TestSetup static void setup() {{ // Create test data // TODO: Add specific test data setup }} @isTest static void testBulkInsert() {{ // Test bulk insert scenario List testRecords = new List(); for(Integer i = 0; i < 200; i++) {{ // TODO: Create test records }} Test.startTest(); insert testRecords; Test.stopTest(); // TODO: Add assertions System.assert(true, 'Bulk insert test needs implementation'); }} @isTest static void testBulkUpdate() {{ // Test bulk update scenario // TODO: Implement bulk update test }} @isTest static void testErrorHandling() {{ // Test error scenarios // TODO: Test validation rules, required fields, etc. }} @isTest static void testGovernorLimits() {{ // Test near governor limits // TODO: Test with large data volumes }} }} """ else: # object conversion return f""" // Test data creation for migrated object @isTest public class Test_MigratedObjectData {{ public static SObject createTestRecord() {{ // TODO: Create and return test instance return null; }} public static List createBulkTestRecords(Integer count) {{ List records = new List(); for(Integer i = 0; i < count) {{ // TODO: Create test records }} return records; }} public static void validateMigrationMapping() {{ // Validate that all fields are properly mapped // TODO: Add field mapping validation }} }} """ def handle_api_error(status_code: int, response_text: str) -> str: """Handle API errors with appropriate user-friendly messages.""" if status_code == 401: return "Authentication failed. Please check API configuration." elif status_code == 429: return "Rate limit exceeded. Please try again later." elif status_code == 403: return "Access forbidden. Please check your permissions." elif status_code >= 500: return "Service temporarily unavailable. Please try again." else: return f"Request failed with status {status_code}"