import os import re import json import tempfile import gradio as gr from paddleocr import PaddleOCR import fitz # PyMuPDF from simple_salesforce import Salesforce from dotenv import load_dotenv import logging from fastapi import FastAPI, UploadFile, File from fastapi.responses import JSONResponse import time import base64 from reportlab.lib.pagesizes import letter from reportlab.pdfgen import canvas from io import BytesIO # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Load environment variables from .env file load_dotenv() # Salesforce credentials from the .env file SF_USERNAME = os.getenv('SF_USERNAME') SF_PASSWORD = os.getenv('SF_PASSWORD') SF_SECURITY_TOKEN = os.getenv('SF_SECURITY_TOKEN') # Initialize PaddleOCR with use_angle_cls=True to handle text orientation try: ocr = PaddleOCR(use_angle_cls=True, lang='en') # Use English language model logger.info("PaddleOCR initialized successfully") except Exception as e: logger.error(f"Failed to initialize PaddleOCR: {str(e)}") ocr = None # List of required values required_values = [ "Vendor Name", "Tax Identification Number (TIN)", "Address", "Certification Details", "Contract Terms", "Payment Terms", "Signature" ] # Define valid flags picklist values (for Category_Match__c) VALID_CATEGORIES = ['Compliant', 'Partially Compliant', 'Non-Compliant', 'Not Applicable'] # Define possible flags for Flags__c (multi-select picklist) VALID_FLAGS = ['Compliant', 'Partially Compliant', 'Non-Compliant', 'Not Applicable'] # FastAPI app initialization app = FastAPI() # PDF generation helper def generate_pdf_from_text(text, vendor_name): try: pdf_buffer = BytesIO() c = canvas.Canvas(pdf_buffer, pagesize=letter) width, height = letter text_object = c.beginText(40, height - 40) lines = text.split('\n') for line in lines: text_object.textLine(line) c.drawText(text_object) c.showPage() c.save() pdf_buffer.seek(0) return pdf_buffer except Exception as e: logger.error(f"Error generating PDF: {e}") return None # Upload PDF to Salesforce ContentVersion and get public URL def upload_pdf_to_salesforce(sf, pdf_buffer, vendor_name): try: encoded_pdf = base64.b64encode(pdf_buffer.getvalue()).decode('utf-8') timestamp = int(time.time()) file_name = f"{vendor_name}_ExtractedText_{timestamp}.pdf" content_version_data = { "Title": file_name, "PathOnClient": file_name, "VersionData": encoded_pdf } content_version = sf.ContentVersion.create(content_version_data) file_url = f"https://{sf.sf_instance}/sfc/servlet.shepherd/version/download/{content_version['id']}" logger.info(f"PDF uploaded to Salesforce: {file_url}") return file_url except Exception as e: logger.error(f"Error uploading PDF to Salesforce: {e}") return None def process_pdf(pdf_file_path): try: if not pdf_file_path or not os.path.exists(pdf_file_path): logger.error("No valid file path provided or file does not exist") return "No valid file provided", "Error", 0, "Error", "Error" # Validate PDF file try: with open(pdf_file_path, 'rb') as f: if not f.read(4).startswith(b'%PDF'): logger.error("Uploaded file is not a valid PDF") return "Invalid PDF file", "Error", 0, "Error", "Error" except Exception as e: logger.error(f"Error reading file: {e}") return f"Error reading file: {e}", "Error", 0, "Error", "Error" with tempfile.TemporaryDirectory() as path: logger.info(f"Temporary directory created at {path}") # Open PDF with fitz try: pdf_document = fitz.open(pdf_file_path) num_pages = pdf_document.page_count logger.info(f"PDF has {num_pages} pages.") except Exception as e: logger.error(f"Failed to open PDF with fitz: {e}") return f"Failed to open PDF: {e}", "Error", 0, "Error", "Error" extracted_text = "" for page_num in range(num_pages): page = pdf_document.load_page(page_num) try: # Improved pixmap generation with zoom and no alpha channel zoom = 2 mat = fitz.Matrix(zoom, zoom) pix = page.get_pixmap(matrix=mat, alpha=False) page_path = os.path.join(path, f"page_{page_num + 1}.png") pix.save(page_path) logger.info(f"Processing page {page_num + 1} saved at {page_path}.") except Exception as e: logger.error(f"Error creating pixmap for page {page_num + 1}: {e}") try: pix = page.get_pixmap(alpha=False) page_path = os.path.join(path, f"page_{page_num + 1}_fallback.png") pix.save(page_path) logger.info(f"Fallback processing succeeded for page {page_num + 1} at {page_path}.") except Exception as e2: logger.error(f"Fallback failed for page {page_num + 1}: {e2}") continue # Use PaddleOCR without the cls parameter if ocr is None: logger.error("PaddleOCR not initialized") return "OCR engine not initialized", "Error", 0, "Error", "Error" try: # Remove cls=True since it's causing the error result = ocr.ocr(page_path) if result and result[0]: page_text = "\n".join([line[1][0] for line in result[0] if line[1][0]]) + "\n" extracted_text += page_text logger.info(f"Extracted text from page {page_num + 1} (first 200 chars): {page_text[:200]}...") else: logger.warning(f"No text extracted from page {page_num + 1}") except Exception as e: logger.error(f"Error performing OCR on page {page_num + 1}: {e}") continue logger.info(f"Full extracted text (first 200 chars): {extracted_text[:200]}...") if not extracted_text.strip(): logger.error("No text extracted from PDF") # Fallback: Try extracting text directly from PDF using PyMuPDF try: for page_num in range(num_pages): page = pdf_document.load_page(page_num) page_text = page.get_text("text") if page_text: extracted_text += page_text + "\n" logger.info(f"Fallback text extracted from page {page_num + 1} using PyMuPDF.") if not extracted_text.strip(): return "No text extracted from PDF even after fallback", "Error", 0, "Error", "Error" except Exception as e: logger.error(f"Error in fallback text extraction: {e}") return "No text extracted from PDF even after fallback", "Error", 0, "Error", "Error" vendor_name = extract_vendor_name(extracted_text) logger.info(f"Extracted Vendor Name: {vendor_name}") missing_values = analyze_document(extracted_text) missing_count = len(missing_values) logger.info(f"Missing values: {missing_values}") if missing_count == 0: category = 'Compliant' score = 100 comments = 'Document contains all required values.' flags = 'Compliant' elif missing_count == 1: category = 'Partially Compliant' score = 85 comments = 'Document is missing one required value.' flags = 'Partially Compliant' elif missing_count > 1 and missing_count < 3: category = 'Non-Compliant' score = 60 comments = 'Document is missing two required values.' flags = 'Non-Compliant' else: category = 'Not Applicable' score = 40 comments = 'Document is missing three or more required values.' flags = 'Not Applicable' insert_result = insert_into_salesforce(vendor_name, extracted_text, category, score, comments, flags) logger.info(f"Salesforce Insert Result: {insert_result}") return extracted_text, category, score, comments, flags except Exception as e: logger.error(f"Error processing PDF: {e}") return f"Error: {e}", "Error", 0, "Error", "Error" def extract_vendor_name(text): patterns = [ r"Vendor Name[:\s]*([^\n]+)", r"Vendor[:\s]*([^\n]+)", r"Company Name[:\s]*([^\n]+)", r"Supplier[:\s]*([^\n]+)", r"Name[:\s]*([^\n]+)" ] for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: vendor_name = match.group(1).strip() logger.info(f"Vendor name extracted: {vendor_name}") return vendor_name logger.warning("No vendor name found in text, attempting to extract email") email_match = re.search(r"[\w\.-]+@[\w\.-]+\.\w+", text) if email_match: email = email_match.group(0) logger.info(f"Email extracted: {email}") return email logger.warning("No email found in text") return "Unknown Vendor" def analyze_document(document_text): missing_values = [] for value in required_values: if value.lower() not in document_text.lower().strip(): missing_values.append(value) return missing_values def insert_into_salesforce(vendor_name_or_email, extracted_text, category, score, comments, flags): try: sf = Salesforce(username=SF_USERNAME, password=SF_PASSWORD, security_token=SF_SECURITY_TOKEN) logger.info(f"Salesforce authentication successful.") vendor_id = None vendor_name_clean = vendor_name_or_email.strip() logger.info(f"Querying Salesforce for Vendor: {vendor_name_clean}") is_email = bool(re.match(r"[\w\.-]+@[\w\.-]+\.\w+", vendor_name_clean)) if is_email: query = f"SELECT Id, Name FROM Vendor__c WHERE Email__c = '{vendor_name_clean}' LIMIT 1" vendor_record = sf.query(query) if vendor_record['totalSize'] > 0: vendor_id = vendor_record['records'][0]['Id'] vendor_name_clean = vendor_record['records'][0]['Name'] logger.info(f"Vendor found by email with ID: {vendor_id}, Name: {vendor_name_clean}") else: logger.warning(f"Vendor with email '{vendor_name_clean}' not found!") else: vendor_name_clean = vendor_name_clean.replace("'", "''") query = f"SELECT Id FROM Vendor__c WHERE Name = '{vendor_name_clean}' LIMIT 1" vendor_record = sf.query(query) if vendor_record['totalSize'] > 0: vendor_id = vendor_record['records'][0]['Id'] logger.info(f"Vendor found by name with ID: {vendor_id}") else: logger.warning(f"Vendor '{vendor_name_clean}' not found!") pdf_buffer = generate_pdf_from_text(extracted_text, vendor_name_clean) pdf_url = None if pdf_buffer: pdf_url = upload_pdf_to_salesforce(sf, pdf_buffer, vendor_name_clean) else: logger.error("Failed to generate PDF from extracted text.") vendor_field_value = vendor_name_clean logger.info(f"Setting Vendor_Name__c field to: {vendor_field_value}") extracted_text_truncated = extracted_text[:32768] if len(extracted_text) > 32768 else extracted_text flags_value = flags # Can be extended to "Value1;Value2" for multi-select scorecard_data = { 'Vendor_Name__c': vendor_field_value, 'Extracted_Text_URL__c': pdf_url or "", 'Score__c': score, 'Category_Match__c': category, 'Comments__c': comments, 'Flags__c': flags_value, 'Uploaded_File__c': extracted_text_truncated } result = sf.Vendor_Scorecard__c.create(scorecard_data) if result and 'id' in result: logger.info(f"Record inserted successfully with ID: {result['id']}") return result else: logger.error("Failed to insert Vendor_Scorecard__c record.") return "Failed to insert record" except Exception as e: logger.error(f"Error inserting into Salesforce: {e}") return f"Error: {e}" @app.post("/process_pdf/") async def process_pdf_api(file: UploadFile = File(...)): try: contents = await file.read() if not contents: logger.error("Uploaded file is empty") return JSONResponse(content={"error": "Uploaded file is empty"}, status_code=400) with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: temp_file.write(contents) temp_file.flush() temp_file.close() extracted_text, category, score, comments, flags = process_pdf(temp_file.name) os.unlink(temp_file.name) return JSONResponse(content={ "extracted_text": extracted_text, "category": category, "score": score, "comments": comments, "flags": flags }) except Exception as e: logger.error(f"Error processing the file via API: {e}") return JSONResponse(content={"error": str(e)}, status_code=500) def gradio_interface(pdf_file): if pdf_file is None: return "No file uploaded", "Error", 0, "Error", "Error" return process_pdf(pdf_file.name) gr_interface = gr.Interface( fn=gradio_interface, inputs=gr.File(label="Upload PDF Document"), outputs=[ gr.Textbox(label="Extracted Text"), gr.Textbox(label="Category Match"), gr.Number(label="Score"), gr.Textbox(label="Comments"), gr.Textbox(label="Flags") ], live=True ) if __name__ == "__main__": import threading def run_gradio(): gr_interface.launch() threading.Thread(target=run_gradio).start() import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)