Spaces:
Sleeping
Sleeping
File size: 14,862 Bytes
db33631 e2c450b db33631 e2c450b 3767c30 db33631 ded79c9 db33631 ded79c9 db33631 ded79c9 db33631 ded79c9 db33631 ded79c9 db33631 ded79c9 db33631 ded79c9 e2c450b ded79c9 db33631 ded79c9 e2c450b ded79c9 e2c450b 12eb018 ded79c9 12eb018 ded79c9 e2c450b ded79c9 e2c450b ded79c9 3767c30 ded79c9 3767c30 ded79c9 3767c30 ded79c9 db33631 ded79c9 12eb018 ded79c9 12eb018 e2c450b 12eb018 db33631 12eb018 db33631 12eb018 ded79c9 db33631 12eb018 e2c450b 12eb018 db33631 e2c450b 12eb018 db33631 ded79c9 db33631 ded79c9 db33631 ded79c9 db33631 3767c30 e2c450b 3767c30 db33631 ded79c9 db33631 3767c30 e2c450b db33631 ded79c9 db33631 12eb018 db33631 12eb018 db33631 ded79c9 db33631 |
|
import os
import re
import json
import tempfile
import gradio as gr
from paddleocr import PaddleOCR
import fitz # PyMuPDF
from simple_salesforce import Salesforce
from dotenv import load_dotenv
import logging
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import JSONResponse
import time
import base64
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from io import BytesIO
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables from .env file
load_dotenv()
# Salesforce credentials from the .env file
SF_USERNAME = os.getenv('SF_USERNAME')
SF_PASSWORD = os.getenv('SF_PASSWORD')
SF_SECURITY_TOKEN = os.getenv('SF_SECURITY_TOKEN')
# Initialize PaddleOCR with use_angle_cls=True to handle text orientation
try:
ocr = PaddleOCR(use_angle_cls=True, lang='en') # Use English language model
logger.info("PaddleOCR initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize PaddleOCR: {str(e)}")
ocr = None
# List of required values
required_values = [
"Vendor Name",
"Tax Identification Number (TIN)",
"Address",
"Certification Details",
"Contract Terms",
"Payment Terms",
"Signature"
]
# Define valid flags picklist values (for Category_Match__c)
VALID_CATEGORIES = ['Compliant', 'Partially Compliant', 'Non-Compliant', 'Not Applicable']
# Define possible flags for Flags__c (multi-select picklist)
VALID_FLAGS = ['Compliant', 'Partially Compliant', 'Non-Compliant', 'Not Applicable']
# FastAPI app initialization
app = FastAPI()
# PDF generation helper
def generate_pdf_from_text(text, vendor_name):
try:
pdf_buffer = BytesIO()
c = canvas.Canvas(pdf_buffer, pagesize=letter)
width, height = letter
text_object = c.beginText(40, height - 40)
lines = text.split('\n')
for line in lines:
text_object.textLine(line)
c.drawText(text_object)
c.showPage()
c.save()
pdf_buffer.seek(0)
return pdf_buffer
except Exception as e:
logger.error(f"Error generating PDF: {e}")
return None
# Upload PDF to Salesforce ContentVersion and get public URL
def upload_pdf_to_salesforce(sf, pdf_buffer, vendor_name):
try:
encoded_pdf = base64.b64encode(pdf_buffer.getvalue()).decode('utf-8')
timestamp = int(time.time())
file_name = f"{vendor_name}_ExtractedText_{timestamp}.pdf"
content_version_data = {
"Title": file_name,
"PathOnClient": file_name,
"VersionData": encoded_pdf
}
content_version = sf.ContentVersion.create(content_version_data)
file_url = f"https://{sf.sf_instance}/sfc/servlet.shepherd/version/download/{content_version['id']}"
logger.info(f"PDF uploaded to Salesforce: {file_url}")
return file_url
except Exception as e:
logger.error(f"Error uploading PDF to Salesforce: {e}")
return None
def process_pdf(pdf_file_path):
try:
if not pdf_file_path or not os.path.exists(pdf_file_path):
logger.error("No valid file path provided or file does not exist")
return "No valid file provided", "Error", 0, "Error", "Error"
# Validate PDF file
try:
with open(pdf_file_path, 'rb') as f:
if not f.read(4).startswith(b'%PDF'):
logger.error("Uploaded file is not a valid PDF")
return "Invalid PDF file", "Error", 0, "Error", "Error"
except Exception as e:
logger.error(f"Error reading file: {e}")
return f"Error reading file: {e}", "Error", 0, "Error", "Error"
with tempfile.TemporaryDirectory() as path:
logger.info(f"Temporary directory created at {path}")
# Open PDF with fitz
try:
pdf_document = fitz.open(pdf_file_path)
num_pages = pdf_document.page_count
logger.info(f"PDF has {num_pages} pages.")
except Exception as e:
logger.error(f"Failed to open PDF with fitz: {e}")
return f"Failed to open PDF: {e}", "Error", 0, "Error", "Error"
extracted_text = ""
for page_num in range(num_pages):
page = pdf_document.load_page(page_num)
try:
# Improved pixmap generation with zoom and no alpha channel
zoom = 2
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat, alpha=False)
page_path = os.path.join(path, f"page_{page_num + 1}.png")
pix.save(page_path)
logger.info(f"Processing page {page_num + 1} saved at {page_path}.")
except Exception as e:
logger.error(f"Error creating pixmap for page {page_num + 1}: {e}")
try:
pix = page.get_pixmap(alpha=False)
page_path = os.path.join(path, f"page_{page_num + 1}_fallback.png")
pix.save(page_path)
logger.info(f"Fallback processing succeeded for page {page_num + 1} at {page_path}.")
except Exception as e2:
logger.error(f"Fallback failed for page {page_num + 1}: {e2}")
continue
# Use PaddleOCR without the cls parameter
if ocr is None:
logger.error("PaddleOCR not initialized")
return "OCR engine not initialized", "Error", 0, "Error", "Error"
try:
# Remove cls=True since it's causing the error
result = ocr.ocr(page_path)
if result and result[0]:
page_text = "\n".join([line[1][0] for line in result[0] if line[1][0]]) + "\n"
extracted_text += page_text
logger.info(f"Extracted text from page {page_num + 1} (first 200 chars): {page_text[:200]}...")
else:
logger.warning(f"No text extracted from page {page_num + 1}")
except Exception as e:
logger.error(f"Error performing OCR on page {page_num + 1}: {e}")
continue
logger.info(f"Full extracted text (first 200 chars): {extracted_text[:200]}...")
if not extracted_text.strip():
logger.error("No text extracted from PDF")
# Fallback: Try extracting text directly from PDF using PyMuPDF
try:
for page_num in range(num_pages):
page = pdf_document.load_page(page_num)
page_text = page.get_text("text")
if page_text:
extracted_text += page_text + "\n"
logger.info(f"Fallback text extracted from page {page_num + 1} using PyMuPDF.")
if not extracted_text.strip():
return "No text extracted from PDF even after fallback", "Error", 0, "Error", "Error"
except Exception as e:
logger.error(f"Error in fallback text extraction: {e}")
return "No text extracted from PDF even after fallback", "Error", 0, "Error", "Error"
vendor_name = extract_vendor_name(extracted_text)
logger.info(f"Extracted Vendor Name: {vendor_name}")
missing_values = analyze_document(extracted_text)
missing_count = len(missing_values)
logger.info(f"Missing values: {missing_values}")
if missing_count == 0:
category = 'Compliant'
score = 100
comments = 'Document contains all required values.'
flags = 'Compliant'
elif missing_count == 1:
category = 'Partially Compliant'
score = 85
comments = 'Document is missing one required value.'
flags = 'Partially Compliant'
elif missing_count > 1 and missing_count < 3:
category = 'Non-Compliant'
score = 60
comments = 'Document is missing two required values.'
flags = 'Non-Compliant'
else:
category = 'Not Applicable'
score = 40
comments = 'Document is missing three or more required values.'
flags = 'Not Applicable'
insert_result = insert_into_salesforce(vendor_name, extracted_text, category, score, comments, flags)
logger.info(f"Salesforce Insert Result: {insert_result}")
return extracted_text, category, score, comments, flags
except Exception as e:
logger.error(f"Error processing PDF: {e}")
return f"Error: {e}", "Error", 0, "Error", "Error"
def extract_vendor_name(text):
patterns = [
r"Vendor Name[:\s]*([^\n]+)",
r"Vendor[:\s]*([^\n]+)",
r"Company Name[:\s]*([^\n]+)",
r"Supplier[:\s]*([^\n]+)",
r"Name[:\s]*([^\n]+)"
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
vendor_name = match.group(1).strip()
logger.info(f"Vendor name extracted: {vendor_name}")
return vendor_name
logger.warning("No vendor name found in text, attempting to extract email")
email_match = re.search(r"[\w\.-]+@[\w\.-]+\.\w+", text)
if email_match:
email = email_match.group(0)
logger.info(f"Email extracted: {email}")
return email
logger.warning("No email found in text")
return "Unknown Vendor"
def analyze_document(document_text):
missing_values = []
for value in required_values:
if value.lower() not in document_text.lower().strip():
missing_values.append(value)
return missing_values
def insert_into_salesforce(vendor_name_or_email, extracted_text, category, score, comments, flags):
try:
sf = Salesforce(username=SF_USERNAME, password=SF_PASSWORD, security_token=SF_SECURITY_TOKEN)
logger.info(f"Salesforce authentication successful.")
vendor_id = None
vendor_name_clean = vendor_name_or_email.strip()
logger.info(f"Querying Salesforce for Vendor: {vendor_name_clean}")
is_email = bool(re.match(r"[\w\.-]+@[\w\.-]+\.\w+", vendor_name_clean))
if is_email:
query = f"SELECT Id, Name FROM Vendor__c WHERE Email__c = '{vendor_name_clean}' LIMIT 1"
vendor_record = sf.query(query)
if vendor_record['totalSize'] > 0:
vendor_id = vendor_record['records'][0]['Id']
vendor_name_clean = vendor_record['records'][0]['Name']
logger.info(f"Vendor found by email with ID: {vendor_id}, Name: {vendor_name_clean}")
else:
logger.warning(f"Vendor with email '{vendor_name_clean}' not found!")
else:
vendor_name_clean = vendor_name_clean.replace("'", "''")
query = f"SELECT Id FROM Vendor__c WHERE Name = '{vendor_name_clean}' LIMIT 1"
vendor_record = sf.query(query)
if vendor_record['totalSize'] > 0:
vendor_id = vendor_record['records'][0]['Id']
logger.info(f"Vendor found by name with ID: {vendor_id}")
else:
logger.warning(f"Vendor '{vendor_name_clean}' not found!")
pdf_buffer = generate_pdf_from_text(extracted_text, vendor_name_clean)
pdf_url = None
if pdf_buffer:
pdf_url = upload_pdf_to_salesforce(sf, pdf_buffer, vendor_name_clean)
else:
logger.error("Failed to generate PDF from extracted text.")
vendor_field_value = vendor_name_clean
logger.info(f"Setting Vendor_Name__c field to: {vendor_field_value}")
extracted_text_truncated = extracted_text[:32768] if len(extracted_text) > 32768 else extracted_text
flags_value = flags # Can be extended to "Value1;Value2" for multi-select
scorecard_data = {
'Vendor_Name__c': vendor_field_value,
'Extracted_Text_URL__c': pdf_url or "",
'Score__c': score,
'Category_Match__c': category,
'Comments__c': comments,
'Flags__c': flags_value,
'Uploaded_File__c': extracted_text_truncated
}
result = sf.Vendor_Scorecard__c.create(scorecard_data)
if result and 'id' in result:
logger.info(f"Record inserted successfully with ID: {result['id']}")
return result
else:
logger.error("Failed to insert Vendor_Scorecard__c record.")
return "Failed to insert record"
except Exception as e:
logger.error(f"Error inserting into Salesforce: {e}")
return f"Error: {e}"
@app.post("/process_pdf/")
async def process_pdf_api(file: UploadFile = File(...)):
try:
contents = await file.read()
if not contents:
logger.error("Uploaded file is empty")
return JSONResponse(content={"error": "Uploaded file is empty"}, status_code=400)
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file:
temp_file.write(contents)
temp_file.flush()
temp_file.close()
extracted_text, category, score, comments, flags = process_pdf(temp_file.name)
os.unlink(temp_file.name)
return JSONResponse(content={
"extracted_text": extracted_text,
"category": category,
"score": score,
"comments": comments,
"flags": flags
})
except Exception as e:
logger.error(f"Error processing the file via API: {e}")
return JSONResponse(content={"error": str(e)}, status_code=500)
def gradio_interface(pdf_file):
if pdf_file is None:
return "No file uploaded", "Error", 0, "Error", "Error"
return process_pdf(pdf_file.name)
gr_interface = gr.Interface(
fn=gradio_interface,
inputs=gr.File(label="Upload PDF Document"),
outputs=[
gr.Textbox(label="Extracted Text"),
gr.Textbox(label="Category Match"),
gr.Number(label="Score"),
gr.Textbox(label="Comments"),
gr.Textbox(label="Flags")
],
live=True
)
if __name__ == "__main__":
import threading
def run_gradio():
gr_interface.launch()
threading.Thread(target=run_gradio).start()
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) |