|
import os |
|
from paddleocr import PaddleOCR |
|
from PIL import Image |
|
import gradio as gr |
|
import re |
|
from simple_salesforce import Salesforce |
|
import pandas as pd |
|
|
|
|
|
ATTRIBUTE_MAPPING = { |
|
"Name": "Patient_Name__c", |
|
"Age": "Age__c", |
|
"Gender": "Gender__c", |
|
"Phone Number": "Phone_Number__c" |
|
} |
|
|
|
|
|
ATTRIBUTE_ORDER = ["Name", "Age", "Gender", "Phone Number"] |
|
|
|
|
|
GENDER_MAPPING = { |
|
"Male": "Male", |
|
"Female": "Female", |
|
"Other": "Others" |
|
} |
|
|
|
|
|
SALESFORCE_USERNAME = os.getenv("SALESFORCE_USERNAME") |
|
SALESFORCE_PASSWORD = os.getenv("SALESFORCE_PASSWORD") |
|
SALESFORCE_SECURITY_TOKEN = os.getenv("SALESFORCE_SECURITY_TOKEN") |
|
|
|
|
|
print(f"Using Salesforce credentials - Username: {SALESFORCE_USERNAME}") |
|
print(f"Password set: {'Yes' if SALESFORCE_PASSWORD else 'No'}") |
|
print(f"Security token set: {'Yes' if SALESFORCE_SECURITY_TOKEN else 'No'}") |
|
|
|
|
|
ocr = PaddleOCR(use_angle_cls=True, lang='en') |
|
|
|
|
|
def extract_text(image): |
|
result = ocr.ocr(image) |
|
extracted_text = [] |
|
for line in result[0]: |
|
extracted_text.append(line[1][0]) |
|
return "\n".join(extracted_text) |
|
|
|
|
|
def clean_extracted_text(text): |
|
|
|
text = text.replace('\r\n', '\n').replace('\r', '\n') |
|
|
|
lines = text.split('\n') |
|
cleaned_lines = [re.sub(r'\s+', ' ', line.strip()) for line in lines] |
|
return '\n'.join(cleaned_lines) |
|
|
|
|
|
def extract_attributes(extracted_text): |
|
attributes = {} |
|
|
|
|
|
cleaned_text = clean_extracted_text(extracted_text) |
|
print(f"Raw extracted text: '{extracted_text}'") |
|
print(f"Cleaned extracted text: '{cleaned_text}'") |
|
|
|
|
|
patterns = { |
|
"Name": r"Name\s*[:\-]?\s*([\w\s\-\.\',]+)", |
|
"Age": r"Age\s*[:\-]?\s*(\d{1,3})", |
|
"Gender": r"Gender\s*[:\-]?\s*(Male|Female|Other)", |
|
"Phone Number": r"(?:(?:Phone Number)|Phone|Mobile|Phonenumber)\s*[:\-]?\s*(?:\+91)?([6-9]\d{9})" |
|
} |
|
|
|
|
|
lines = cleaned_text.split('\n') |
|
for line in lines: |
|
for readable_attr, pattern in patterns.items(): |
|
match = re.search(pattern, line, re.IGNORECASE) |
|
if match: |
|
attributes[readable_attr] = match.group(1).strip() |
|
print(f"Extracted {readable_attr}: '{attributes[readable_attr]}' from line: '{line}'") |
|
break |
|
|
|
if "Gender" in attributes: |
|
attributes["Gender"] = GENDER_MAPPING.get(attributes["Gender"], attributes["Gender"]) |
|
|
|
return attributes |
|
|
|
|
|
def filter_valid_attributes(attributes, valid_fields): |
|
filtered = {ATTRIBUTE_MAPPING[key]: value for key, value in attributes.items() if ATTRIBUTE_MAPPING[key] in valid_fields} |
|
return filtered |
|
|
|
|
|
def interact_with_salesforce(attributes): |
|
try: |
|
|
|
if not all([SALESFORCE_USERNAME, SALESFORCE_PASSWORD, SALESFORCE_SECURITY_TOKEN]): |
|
raise ValueError("One or more Salesforce credentials are missing. Check environment variables.") |
|
|
|
|
|
sf = Salesforce( |
|
username=SALESFORCE_USERNAME, |
|
password=SALESFORCE_PASSWORD, |
|
security_token=SALESFORCE_SECURITY_TOKEN, |
|
domain="login", |
|
version="60.0" |
|
) |
|
print(f"Successfully connected to Salesforce as {SALESFORCE_USERNAME}") |
|
|
|
|
|
object_name = "Patient_Registration__c" |
|
sf_object = sf.__getattr__(object_name) |
|
|
|
|
|
schema = sf_object.describe() |
|
valid_fields = {field["name"] for field in schema["fields"]} |
|
print(f"Valid fields for {object_name}: {valid_fields}") |
|
|
|
|
|
field_details = {field["name"]: { |
|
"createable": field["createable"], |
|
"required": not field["nillable"] and not field["defaultedOnCreate"], |
|
"picklist_values": [val["value"] for val in field.get("picklistValues", [])] if field.get("picklistValues") else None |
|
} for field in schema["fields"]} |
|
print(f"Field details: {field_details}") |
|
|
|
|
|
filtered_attributes = filter_valid_attributes(attributes, valid_fields) |
|
|
|
|
|
if "Patient_Name__c" not in filtered_attributes or not filtered_attributes["Patient_Name__c"]: |
|
raise ValueError("Patient_Name__c is required but was not provided.") |
|
|
|
|
|
print(f"Attributes being sent to Salesforce: {filtered_attributes}") |
|
|
|
|
|
if "Age__c" in filtered_attributes: |
|
filtered_attributes["Age__c"] = int(filtered_attributes["Age__c"]) |
|
|
|
|
|
if "Gender__c" in filtered_attributes: |
|
gender_values = field_details.get("Gender__c", {}).get("picklist_values", []) |
|
if gender_values and filtered_attributes["Gender__c"] not in gender_values: |
|
raise ValueError(f"Invalid value for Gender__c: '{filtered_attributes['Gender__c']}'. Allowed values: {gender_values}") |
|
|
|
|
|
result = sf_object.create(filtered_attributes) |
|
return f"β
Successfully created Patient Registration record with ID: {result['id']}." |
|
|
|
except Exception as e: |
|
return f"β Error interacting with Salesforce: {str(e)}" |
|
|
|
|
|
def process_image(image): |
|
extracted_text = extract_text(image) |
|
if not extracted_text: |
|
return "No text detected in the image.", None, None |
|
|
|
attributes = extract_attributes(extracted_text) |
|
|
|
|
|
ordered_attributes = {attr: attributes.get(attr, "") for attr in ATTRIBUTE_ORDER} |
|
|
|
|
|
df = pd.DataFrame(list(ordered_attributes.items()), columns=["Attribute", "Value"]) |
|
return f"Extracted Text:\n{extracted_text}", df, None |
|
|
|
|
|
def export_to_salesforce(edited_df): |
|
try: |
|
|
|
edited_attributes = dict(zip(edited_df["Attribute"], edited_df["Value"])) |
|
|
|
|
|
message = interact_with_salesforce(edited_attributes) |
|
return message |
|
|
|
except Exception as e: |
|
return f"β Error exporting to Salesforce: {str(e)}" |
|
|
|
|
|
def app(): |
|
with gr.Blocks() as demo: |
|
with gr.Tab("π₯ OCR Processing"): |
|
with gr.Row(): |
|
image_input = gr.Image(type="numpy", label="π Upload Image") |
|
extract_button = gr.Button("Extract Text and Attributes") |
|
extracted_text_output = gr.Text(label="π Extracted Image Data") |
|
editable_df_output = gr.Dataframe(label="βοΈ Edit Attributes (Key-Value Pairs)", interactive=True) |
|
ok_button = gr.Button("OK") |
|
result_output = gr.Text(label="π Result") |
|
|
|
|
|
extract_button.click( |
|
fn=process_image, |
|
inputs=[image_input], |
|
outputs=[extracted_text_output, editable_df_output, result_output] |
|
) |
|
ok_button.click( |
|
fn=export_to_salesforce, |
|
inputs=[editable_df_output], |
|
outputs=[result_output] |
|
) |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
app().launch(share=True) |