neerajkalyank's picture
Update app.py
e2c450b verified
import os
import re
import json
import tempfile
import gradio as gr
from paddleocr import PaddleOCR
import fitz # PyMuPDF
from simple_salesforce import Salesforce
from dotenv import load_dotenv
import logging
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import JSONResponse
import time
import base64
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from io import BytesIO
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables from .env file
load_dotenv()
# Salesforce credentials from the .env file
SF_USERNAME = os.getenv('SF_USERNAME')
SF_PASSWORD = os.getenv('SF_PASSWORD')
SF_SECURITY_TOKEN = os.getenv('SF_SECURITY_TOKEN')
# Initialize PaddleOCR with use_angle_cls=True to handle text orientation
try:
ocr = PaddleOCR(use_angle_cls=True, lang='en') # Use English language model
logger.info("PaddleOCR initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize PaddleOCR: {str(e)}")
ocr = None
# List of required values
required_values = [
"Vendor Name",
"Tax Identification Number (TIN)",
"Address",
"Certification Details",
"Contract Terms",
"Payment Terms",
"Signature"
]
# Define valid flags picklist values (for Category_Match__c)
VALID_CATEGORIES = ['Compliant', 'Partially Compliant', 'Non-Compliant', 'Not Applicable']
# Define possible flags for Flags__c (multi-select picklist)
VALID_FLAGS = ['Compliant', 'Partially Compliant', 'Non-Compliant', 'Not Applicable']
# FastAPI app initialization
app = FastAPI()
# PDF generation helper
def generate_pdf_from_text(text, vendor_name):
try:
pdf_buffer = BytesIO()
c = canvas.Canvas(pdf_buffer, pagesize=letter)
width, height = letter
text_object = c.beginText(40, height - 40)
lines = text.split('\n')
for line in lines:
text_object.textLine(line)
c.drawText(text_object)
c.showPage()
c.save()
pdf_buffer.seek(0)
return pdf_buffer
except Exception as e:
logger.error(f"Error generating PDF: {e}")
return None
# Upload PDF to Salesforce ContentVersion and get public URL
def upload_pdf_to_salesforce(sf, pdf_buffer, vendor_name):
try:
encoded_pdf = base64.b64encode(pdf_buffer.getvalue()).decode('utf-8')
timestamp = int(time.time())
file_name = f"{vendor_name}_ExtractedText_{timestamp}.pdf"
content_version_data = {
"Title": file_name,
"PathOnClient": file_name,
"VersionData": encoded_pdf
}
content_version = sf.ContentVersion.create(content_version_data)
file_url = f"https://{sf.sf_instance}/sfc/servlet.shepherd/version/download/{content_version['id']}"
logger.info(f"PDF uploaded to Salesforce: {file_url}")
return file_url
except Exception as e:
logger.error(f"Error uploading PDF to Salesforce: {e}")
return None
def process_pdf(pdf_file_path):
try:
if not pdf_file_path or not os.path.exists(pdf_file_path):
logger.error("No valid file path provided or file does not exist")
return "No valid file provided", "Error", 0, "Error", "Error"
# Validate PDF file
try:
with open(pdf_file_path, 'rb') as f:
if not f.read(4).startswith(b'%PDF'):
logger.error("Uploaded file is not a valid PDF")
return "Invalid PDF file", "Error", 0, "Error", "Error"
except Exception as e:
logger.error(f"Error reading file: {e}")
return f"Error reading file: {e}", "Error", 0, "Error", "Error"
with tempfile.TemporaryDirectory() as path:
logger.info(f"Temporary directory created at {path}")
# Open PDF with fitz
try:
pdf_document = fitz.open(pdf_file_path)
num_pages = pdf_document.page_count
logger.info(f"PDF has {num_pages} pages.")
except Exception as e:
logger.error(f"Failed to open PDF with fitz: {e}")
return f"Failed to open PDF: {e}", "Error", 0, "Error", "Error"
extracted_text = ""
for page_num in range(num_pages):
page = pdf_document.load_page(page_num)
try:
# Improved pixmap generation with zoom and no alpha channel
zoom = 2
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat, alpha=False)
page_path = os.path.join(path, f"page_{page_num + 1}.png")
pix.save(page_path)
logger.info(f"Processing page {page_num + 1} saved at {page_path}.")
except Exception as e:
logger.error(f"Error creating pixmap for page {page_num + 1}: {e}")
try:
pix = page.get_pixmap(alpha=False)
page_path = os.path.join(path, f"page_{page_num + 1}_fallback.png")
pix.save(page_path)
logger.info(f"Fallback processing succeeded for page {page_num + 1} at {page_path}.")
except Exception as e2:
logger.error(f"Fallback failed for page {page_num + 1}: {e2}")
continue
# Use PaddleOCR without the cls parameter
if ocr is None:
logger.error("PaddleOCR not initialized")
return "OCR engine not initialized", "Error", 0, "Error", "Error"
try:
# Remove cls=True since it's causing the error
result = ocr.ocr(page_path)
if result and result[0]:
page_text = "\n".join([line[1][0] for line in result[0] if line[1][0]]) + "\n"
extracted_text += page_text
logger.info(f"Extracted text from page {page_num + 1} (first 200 chars): {page_text[:200]}...")
else:
logger.warning(f"No text extracted from page {page_num + 1}")
except Exception as e:
logger.error(f"Error performing OCR on page {page_num + 1}: {e}")
continue
logger.info(f"Full extracted text (first 200 chars): {extracted_text[:200]}...")
if not extracted_text.strip():
logger.error("No text extracted from PDF")
# Fallback: Try extracting text directly from PDF using PyMuPDF
try:
for page_num in range(num_pages):
page = pdf_document.load_page(page_num)
page_text = page.get_text("text")
if page_text:
extracted_text += page_text + "\n"
logger.info(f"Fallback text extracted from page {page_num + 1} using PyMuPDF.")
if not extracted_text.strip():
return "No text extracted from PDF even after fallback", "Error", 0, "Error", "Error"
except Exception as e:
logger.error(f"Error in fallback text extraction: {e}")
return "No text extracted from PDF even after fallback", "Error", 0, "Error", "Error"
vendor_name = extract_vendor_name(extracted_text)
logger.info(f"Extracted Vendor Name: {vendor_name}")
missing_values = analyze_document(extracted_text)
missing_count = len(missing_values)
logger.info(f"Missing values: {missing_values}")
if missing_count == 0:
category = 'Compliant'
score = 100
comments = 'Document contains all required values.'
flags = 'Compliant'
elif missing_count == 1:
category = 'Partially Compliant'
score = 85
comments = 'Document is missing one required value.'
flags = 'Partially Compliant'
elif missing_count > 1 and missing_count < 3:
category = 'Non-Compliant'
score = 60
comments = 'Document is missing two required values.'
flags = 'Non-Compliant'
else:
category = 'Not Applicable'
score = 40
comments = 'Document is missing three or more required values.'
flags = 'Not Applicable'
insert_result = insert_into_salesforce(vendor_name, extracted_text, category, score, comments, flags)
logger.info(f"Salesforce Insert Result: {insert_result}")
return extracted_text, category, score, comments, flags
except Exception as e:
logger.error(f"Error processing PDF: {e}")
return f"Error: {e}", "Error", 0, "Error", "Error"
def extract_vendor_name(text):
patterns = [
r"Vendor Name[:\s]*([^\n]+)",
r"Vendor[:\s]*([^\n]+)",
r"Company Name[:\s]*([^\n]+)",
r"Supplier[:\s]*([^\n]+)",
r"Name[:\s]*([^\n]+)"
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
vendor_name = match.group(1).strip()
logger.info(f"Vendor name extracted: {vendor_name}")
return vendor_name
logger.warning("No vendor name found in text, attempting to extract email")
email_match = re.search(r"[\w\.-]+@[\w\.-]+\.\w+", text)
if email_match:
email = email_match.group(0)
logger.info(f"Email extracted: {email}")
return email
logger.warning("No email found in text")
return "Unknown Vendor"
def analyze_document(document_text):
missing_values = []
for value in required_values:
if value.lower() not in document_text.lower().strip():
missing_values.append(value)
return missing_values
def insert_into_salesforce(vendor_name_or_email, extracted_text, category, score, comments, flags):
try:
sf = Salesforce(username=SF_USERNAME, password=SF_PASSWORD, security_token=SF_SECURITY_TOKEN)
logger.info(f"Salesforce authentication successful.")
vendor_id = None
vendor_name_clean = vendor_name_or_email.strip()
logger.info(f"Querying Salesforce for Vendor: {vendor_name_clean}")
is_email = bool(re.match(r"[\w\.-]+@[\w\.-]+\.\w+", vendor_name_clean))
if is_email:
query = f"SELECT Id, Name FROM Vendor__c WHERE Email__c = '{vendor_name_clean}' LIMIT 1"
vendor_record = sf.query(query)
if vendor_record['totalSize'] > 0:
vendor_id = vendor_record['records'][0]['Id']
vendor_name_clean = vendor_record['records'][0]['Name']
logger.info(f"Vendor found by email with ID: {vendor_id}, Name: {vendor_name_clean}")
else:
logger.warning(f"Vendor with email '{vendor_name_clean}' not found!")
else:
vendor_name_clean = vendor_name_clean.replace("'", "''")
query = f"SELECT Id FROM Vendor__c WHERE Name = '{vendor_name_clean}' LIMIT 1"
vendor_record = sf.query(query)
if vendor_record['totalSize'] > 0:
vendor_id = vendor_record['records'][0]['Id']
logger.info(f"Vendor found by name with ID: {vendor_id}")
else:
logger.warning(f"Vendor '{vendor_name_clean}' not found!")
pdf_buffer = generate_pdf_from_text(extracted_text, vendor_name_clean)
pdf_url = None
if pdf_buffer:
pdf_url = upload_pdf_to_salesforce(sf, pdf_buffer, vendor_name_clean)
else:
logger.error("Failed to generate PDF from extracted text.")
vendor_field_value = vendor_name_clean
logger.info(f"Setting Vendor_Name__c field to: {vendor_field_value}")
extracted_text_truncated = extracted_text[:32768] if len(extracted_text) > 32768 else extracted_text
flags_value = flags # Can be extended to "Value1;Value2" for multi-select
scorecard_data = {
'Vendor_Name__c': vendor_field_value,
'Extracted_Text_URL__c': pdf_url or "",
'Score__c': score,
'Category_Match__c': category,
'Comments__c': comments,
'Flags__c': flags_value,
'Uploaded_File__c': extracted_text_truncated
}
result = sf.Vendor_Scorecard__c.create(scorecard_data)
if result and 'id' in result:
logger.info(f"Record inserted successfully with ID: {result['id']}")
return result
else:
logger.error("Failed to insert Vendor_Scorecard__c record.")
return "Failed to insert record"
except Exception as e:
logger.error(f"Error inserting into Salesforce: {e}")
return f"Error: {e}"
@app.post("/process_pdf/")
async def process_pdf_api(file: UploadFile = File(...)):
try:
contents = await file.read()
if not contents:
logger.error("Uploaded file is empty")
return JSONResponse(content={"error": "Uploaded file is empty"}, status_code=400)
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file:
temp_file.write(contents)
temp_file.flush()
temp_file.close()
extracted_text, category, score, comments, flags = process_pdf(temp_file.name)
os.unlink(temp_file.name)
return JSONResponse(content={
"extracted_text": extracted_text,
"category": category,
"score": score,
"comments": comments,
"flags": flags
})
except Exception as e:
logger.error(f"Error processing the file via API: {e}")
return JSONResponse(content={"error": str(e)}, status_code=500)
def gradio_interface(pdf_file):
if pdf_file is None:
return "No file uploaded", "Error", 0, "Error", "Error"
return process_pdf(pdf_file.name)
gr_interface = gr.Interface(
fn=gradio_interface,
inputs=gr.File(label="Upload PDF Document"),
outputs=[
gr.Textbox(label="Extracted Text"),
gr.Textbox(label="Category Match"),
gr.Number(label="Score"),
gr.Textbox(label="Comments"),
gr.Textbox(label="Flags")
],
live=True
)
if __name__ == "__main__":
import threading
def run_gradio():
gr_interface.launch()
threading.Thread(target=run_gradio).start()
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)