Spaces:
Sleeping
Sleeping
import os | |
import re | |
import json | |
import tempfile | |
import gradio as gr | |
from paddleocr import PaddleOCR | |
import fitz # PyMuPDF | |
from simple_salesforce import Salesforce | |
from dotenv import load_dotenv | |
import logging | |
from fastapi import FastAPI, UploadFile, File | |
from fastapi.responses import JSONResponse | |
import time | |
import base64 | |
from reportlab.lib.pagesizes import letter | |
from reportlab.pdfgen import canvas | |
from io import BytesIO | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Load environment variables from .env file | |
load_dotenv() | |
# Salesforce credentials from the .env file | |
SF_USERNAME = os.getenv('SF_USERNAME') | |
SF_PASSWORD = os.getenv('SF_PASSWORD') | |
SF_SECURITY_TOKEN = os.getenv('SF_SECURITY_TOKEN') | |
# Initialize PaddleOCR with use_angle_cls=True to handle text orientation | |
try: | |
ocr = PaddleOCR(use_angle_cls=True, lang='en') # Use English language model | |
logger.info("PaddleOCR initialized successfully") | |
except Exception as e: | |
logger.error(f"Failed to initialize PaddleOCR: {str(e)}") | |
ocr = None | |
# List of required values | |
required_values = [ | |
"Vendor Name", | |
"Tax Identification Number (TIN)", | |
"Address", | |
"Certification Details", | |
"Contract Terms", | |
"Payment Terms", | |
"Signature" | |
] | |
# Define valid flags picklist values (for Category_Match__c) | |
VALID_CATEGORIES = ['Compliant', 'Partially Compliant', 'Non-Compliant', 'Not Applicable'] | |
# Define possible flags for Flags__c (multi-select picklist) | |
VALID_FLAGS = ['Compliant', 'Partially Compliant', 'Non-Compliant', 'Not Applicable'] | |
# FastAPI app initialization | |
app = FastAPI() | |
# PDF generation helper | |
def generate_pdf_from_text(text, vendor_name): | |
try: | |
pdf_buffer = BytesIO() | |
c = canvas.Canvas(pdf_buffer, pagesize=letter) | |
width, height = letter | |
text_object = c.beginText(40, height - 40) | |
lines = text.split('\n') | |
for line in lines: | |
text_object.textLine(line) | |
c.drawText(text_object) | |
c.showPage() | |
c.save() | |
pdf_buffer.seek(0) | |
return pdf_buffer | |
except Exception as e: | |
logger.error(f"Error generating PDF: {e}") | |
return None | |
# Upload PDF to Salesforce ContentVersion and get public URL | |
def upload_pdf_to_salesforce(sf, pdf_buffer, vendor_name): | |
try: | |
encoded_pdf = base64.b64encode(pdf_buffer.getvalue()).decode('utf-8') | |
timestamp = int(time.time()) | |
file_name = f"{vendor_name}_ExtractedText_{timestamp}.pdf" | |
content_version_data = { | |
"Title": file_name, | |
"PathOnClient": file_name, | |
"VersionData": encoded_pdf | |
} | |
content_version = sf.ContentVersion.create(content_version_data) | |
file_url = f"https://{sf.sf_instance}/sfc/servlet.shepherd/version/download/{content_version['id']}" | |
logger.info(f"PDF uploaded to Salesforce: {file_url}") | |
return file_url | |
except Exception as e: | |
logger.error(f"Error uploading PDF to Salesforce: {e}") | |
return None | |
def process_pdf(pdf_file_path): | |
try: | |
if not pdf_file_path or not os.path.exists(pdf_file_path): | |
logger.error("No valid file path provided or file does not exist") | |
return "No valid file provided", "Error", 0, "Error", "Error" | |
# Validate PDF file | |
try: | |
with open(pdf_file_path, 'rb') as f: | |
if not f.read(4).startswith(b'%PDF'): | |
logger.error("Uploaded file is not a valid PDF") | |
return "Invalid PDF file", "Error", 0, "Error", "Error" | |
except Exception as e: | |
logger.error(f"Error reading file: {e}") | |
return f"Error reading file: {e}", "Error", 0, "Error", "Error" | |
with tempfile.TemporaryDirectory() as path: | |
logger.info(f"Temporary directory created at {path}") | |
# Open PDF with fitz | |
try: | |
pdf_document = fitz.open(pdf_file_path) | |
num_pages = pdf_document.page_count | |
logger.info(f"PDF has {num_pages} pages.") | |
except Exception as e: | |
logger.error(f"Failed to open PDF with fitz: {e}") | |
return f"Failed to open PDF: {e}", "Error", 0, "Error", "Error" | |
extracted_text = "" | |
for page_num in range(num_pages): | |
page = pdf_document.load_page(page_num) | |
try: | |
# Improved pixmap generation with zoom and no alpha channel | |
zoom = 2 | |
mat = fitz.Matrix(zoom, zoom) | |
pix = page.get_pixmap(matrix=mat, alpha=False) | |
page_path = os.path.join(path, f"page_{page_num + 1}.png") | |
pix.save(page_path) | |
logger.info(f"Processing page {page_num + 1} saved at {page_path}.") | |
except Exception as e: | |
logger.error(f"Error creating pixmap for page {page_num + 1}: {e}") | |
try: | |
pix = page.get_pixmap(alpha=False) | |
page_path = os.path.join(path, f"page_{page_num + 1}_fallback.png") | |
pix.save(page_path) | |
logger.info(f"Fallback processing succeeded for page {page_num + 1} at {page_path}.") | |
except Exception as e2: | |
logger.error(f"Fallback failed for page {page_num + 1}: {e2}") | |
continue | |
# Use PaddleOCR without the cls parameter | |
if ocr is None: | |
logger.error("PaddleOCR not initialized") | |
return "OCR engine not initialized", "Error", 0, "Error", "Error" | |
try: | |
# Remove cls=True since it's causing the error | |
result = ocr.ocr(page_path) | |
if result and result[0]: | |
page_text = "\n".join([line[1][0] for line in result[0] if line[1][0]]) + "\n" | |
extracted_text += page_text | |
logger.info(f"Extracted text from page {page_num + 1} (first 200 chars): {page_text[:200]}...") | |
else: | |
logger.warning(f"No text extracted from page {page_num + 1}") | |
except Exception as e: | |
logger.error(f"Error performing OCR on page {page_num + 1}: {e}") | |
continue | |
logger.info(f"Full extracted text (first 200 chars): {extracted_text[:200]}...") | |
if not extracted_text.strip(): | |
logger.error("No text extracted from PDF") | |
# Fallback: Try extracting text directly from PDF using PyMuPDF | |
try: | |
for page_num in range(num_pages): | |
page = pdf_document.load_page(page_num) | |
page_text = page.get_text("text") | |
if page_text: | |
extracted_text += page_text + "\n" | |
logger.info(f"Fallback text extracted from page {page_num + 1} using PyMuPDF.") | |
if not extracted_text.strip(): | |
return "No text extracted from PDF even after fallback", "Error", 0, "Error", "Error" | |
except Exception as e: | |
logger.error(f"Error in fallback text extraction: {e}") | |
return "No text extracted from PDF even after fallback", "Error", 0, "Error", "Error" | |
vendor_name = extract_vendor_name(extracted_text) | |
logger.info(f"Extracted Vendor Name: {vendor_name}") | |
missing_values = analyze_document(extracted_text) | |
missing_count = len(missing_values) | |
logger.info(f"Missing values: {missing_values}") | |
if missing_count == 0: | |
category = 'Compliant' | |
score = 100 | |
comments = 'Document contains all required values.' | |
flags = 'Compliant' | |
elif missing_count == 1: | |
category = 'Partially Compliant' | |
score = 85 | |
comments = 'Document is missing one required value.' | |
flags = 'Partially Compliant' | |
elif missing_count > 1 and missing_count < 3: | |
category = 'Non-Compliant' | |
score = 60 | |
comments = 'Document is missing two required values.' | |
flags = 'Non-Compliant' | |
else: | |
category = 'Not Applicable' | |
score = 40 | |
comments = 'Document is missing three or more required values.' | |
flags = 'Not Applicable' | |
insert_result = insert_into_salesforce(vendor_name, extracted_text, category, score, comments, flags) | |
logger.info(f"Salesforce Insert Result: {insert_result}") | |
return extracted_text, category, score, comments, flags | |
except Exception as e: | |
logger.error(f"Error processing PDF: {e}") | |
return f"Error: {e}", "Error", 0, "Error", "Error" | |
def extract_vendor_name(text): | |
patterns = [ | |
r"Vendor Name[:\s]*([^\n]+)", | |
r"Vendor[:\s]*([^\n]+)", | |
r"Company Name[:\s]*([^\n]+)", | |
r"Supplier[:\s]*([^\n]+)", | |
r"Name[:\s]*([^\n]+)" | |
] | |
for pattern in patterns: | |
match = re.search(pattern, text, re.IGNORECASE) | |
if match: | |
vendor_name = match.group(1).strip() | |
logger.info(f"Vendor name extracted: {vendor_name}") | |
return vendor_name | |
logger.warning("No vendor name found in text, attempting to extract email") | |
email_match = re.search(r"[\w\.-]+@[\w\.-]+\.\w+", text) | |
if email_match: | |
email = email_match.group(0) | |
logger.info(f"Email extracted: {email}") | |
return email | |
logger.warning("No email found in text") | |
return "Unknown Vendor" | |
def analyze_document(document_text): | |
missing_values = [] | |
for value in required_values: | |
if value.lower() not in document_text.lower().strip(): | |
missing_values.append(value) | |
return missing_values | |
def insert_into_salesforce(vendor_name_or_email, extracted_text, category, score, comments, flags): | |
try: | |
sf = Salesforce(username=SF_USERNAME, password=SF_PASSWORD, security_token=SF_SECURITY_TOKEN) | |
logger.info(f"Salesforce authentication successful.") | |
vendor_id = None | |
vendor_name_clean = vendor_name_or_email.strip() | |
logger.info(f"Querying Salesforce for Vendor: {vendor_name_clean}") | |
is_email = bool(re.match(r"[\w\.-]+@[\w\.-]+\.\w+", vendor_name_clean)) | |
if is_email: | |
query = f"SELECT Id, Name FROM Vendor__c WHERE Email__c = '{vendor_name_clean}' LIMIT 1" | |
vendor_record = sf.query(query) | |
if vendor_record['totalSize'] > 0: | |
vendor_id = vendor_record['records'][0]['Id'] | |
vendor_name_clean = vendor_record['records'][0]['Name'] | |
logger.info(f"Vendor found by email with ID: {vendor_id}, Name: {vendor_name_clean}") | |
else: | |
logger.warning(f"Vendor with email '{vendor_name_clean}' not found!") | |
else: | |
vendor_name_clean = vendor_name_clean.replace("'", "''") | |
query = f"SELECT Id FROM Vendor__c WHERE Name = '{vendor_name_clean}' LIMIT 1" | |
vendor_record = sf.query(query) | |
if vendor_record['totalSize'] > 0: | |
vendor_id = vendor_record['records'][0]['Id'] | |
logger.info(f"Vendor found by name with ID: {vendor_id}") | |
else: | |
logger.warning(f"Vendor '{vendor_name_clean}' not found!") | |
pdf_buffer = generate_pdf_from_text(extracted_text, vendor_name_clean) | |
pdf_url = None | |
if pdf_buffer: | |
pdf_url = upload_pdf_to_salesforce(sf, pdf_buffer, vendor_name_clean) | |
else: | |
logger.error("Failed to generate PDF from extracted text.") | |
vendor_field_value = vendor_name_clean | |
logger.info(f"Setting Vendor_Name__c field to: {vendor_field_value}") | |
extracted_text_truncated = extracted_text[:32768] if len(extracted_text) > 32768 else extracted_text | |
flags_value = flags # Can be extended to "Value1;Value2" for multi-select | |
scorecard_data = { | |
'Vendor_Name__c': vendor_field_value, | |
'Extracted_Text_URL__c': pdf_url or "", | |
'Score__c': score, | |
'Category_Match__c': category, | |
'Comments__c': comments, | |
'Flags__c': flags_value, | |
'Uploaded_File__c': extracted_text_truncated | |
} | |
result = sf.Vendor_Scorecard__c.create(scorecard_data) | |
if result and 'id' in result: | |
logger.info(f"Record inserted successfully with ID: {result['id']}") | |
return result | |
else: | |
logger.error("Failed to insert Vendor_Scorecard__c record.") | |
return "Failed to insert record" | |
except Exception as e: | |
logger.error(f"Error inserting into Salesforce: {e}") | |
return f"Error: {e}" | |
async def process_pdf_api(file: UploadFile = File(...)): | |
try: | |
contents = await file.read() | |
if not contents: | |
logger.error("Uploaded file is empty") | |
return JSONResponse(content={"error": "Uploaded file is empty"}, status_code=400) | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: | |
temp_file.write(contents) | |
temp_file.flush() | |
temp_file.close() | |
extracted_text, category, score, comments, flags = process_pdf(temp_file.name) | |
os.unlink(temp_file.name) | |
return JSONResponse(content={ | |
"extracted_text": extracted_text, | |
"category": category, | |
"score": score, | |
"comments": comments, | |
"flags": flags | |
}) | |
except Exception as e: | |
logger.error(f"Error processing the file via API: {e}") | |
return JSONResponse(content={"error": str(e)}, status_code=500) | |
def gradio_interface(pdf_file): | |
if pdf_file is None: | |
return "No file uploaded", "Error", 0, "Error", "Error" | |
return process_pdf(pdf_file.name) | |
gr_interface = gr.Interface( | |
fn=gradio_interface, | |
inputs=gr.File(label="Upload PDF Document"), | |
outputs=[ | |
gr.Textbox(label="Extracted Text"), | |
gr.Textbox(label="Category Match"), | |
gr.Number(label="Score"), | |
gr.Textbox(label="Comments"), | |
gr.Textbox(label="Flags") | |
], | |
live=True | |
) | |
if __name__ == "__main__": | |
import threading | |
def run_gradio(): | |
gr_interface.launch() | |
threading.Thread(target=run_gradio).start() | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) |