|
import glob |
|
|
|
import gradio as gr |
|
import gspread |
|
import numpy as np |
|
import pandas as pd |
|
import tqdm |
|
from google.cloud import documentai_v1 as documentai |
|
from google.oauth2 import service_account |
|
|
|
credentials = None |
|
|
|
|
|
def process_document_form_parser(credentials_file, images_folder): |
|
"""Processes a document using the Form Parser.""" |
|
|
|
global credentials |
|
|
|
credentials = service_account.Credentials.from_service_account_file( |
|
credentials_file, |
|
scopes=[ |
|
"https://www.googleapis.com/auth/cloud-platform", |
|
"https://www.googleapis.com/auth/spreadsheets", |
|
"https://www.googleapis.com/auth/drive", |
|
], |
|
) |
|
|
|
project_id = "cru-ocr" |
|
location = "us" |
|
processor_id = "26630ebbc76345a1" |
|
|
|
image_paths = glob.glob(f"{images_folder}/*.pdf")[:3] |
|
|
|
all_fields = { |
|
"Name": np.empty(len(image_paths), dtype=object), |
|
"Phone": np.empty(len(image_paths), dtype=object), |
|
"Email": np.empty(len(image_paths), dtype=object), |
|
"Cadet": np.empty(len(image_paths), dtype=object), |
|
"Greek or Going Greek": np.empty(len(image_paths), dtype=object), |
|
"Transfer Student": np.empty(len(image_paths), dtype=object), |
|
"Military Veteran": np.empty(len(image_paths), dtype=object), |
|
"International Student": np.empty(len(image_paths), dtype=object), |
|
"Res Hall": np.empty(len(image_paths), dtype=object), |
|
"Room #": np.empty(len(image_paths), dtype=object), |
|
"Off Campus": np.empty(len(image_paths), dtype=object), |
|
"Fr": np.empty(len(image_paths), dtype=object), |
|
"So": np.empty(len(image_paths), dtype=object), |
|
"Jr": np.empty(len(image_paths), dtype=object), |
|
"Sr": np.empty(len(image_paths), dtype=object), |
|
"Grad Student": np.empty(len(image_paths), dtype=object), |
|
"Male": np.empty(len(image_paths), dtype=object), |
|
"Female": np.empty(len(image_paths), dtype=object), |
|
"Non Binary": np.empty(len(image_paths), dtype=object), |
|
} |
|
|
|
client = documentai.DocumentProcessorServiceClient(credentials=credentials) |
|
name = client.processor_path(project_id, location, processor_id) |
|
|
|
for file_idx, file_path in enumerate(tqdm.tqdm(image_paths)): |
|
with open(file_path, "rb") as image_file: |
|
image_content = image_file.read() |
|
|
|
raw_document = documentai.RawDocument( |
|
content=image_content, mime_type="application/pdf" |
|
) |
|
request = documentai.ProcessRequest(name=name, raw_document=raw_document) |
|
result = client.process_document(request=request) |
|
document = result.document |
|
|
|
for page in document.pages: |
|
for form_field in page.form_fields: |
|
field_name = ( |
|
form_field.field_name.text_anchor.content |
|
if form_field.field_name |
|
else "Unnamed Field" |
|
) |
|
field_value = ( |
|
form_field.field_value.text_anchor.content |
|
if form_field.field_value |
|
else "No Value" |
|
) |
|
|
|
field_name = field_name.strip().replace(":", "") |
|
field_value = field_value.strip().replace(":", "") |
|
|
|
if field_name == "Name" and "\n" in field_value: |
|
field_value = " ".join(field_value.split("\n")[1:]) |
|
|
|
|
|
if field_name in all_fields.keys(): |
|
if field_name == "Email": |
|
|
|
field_value = field_value.replace("ut.edu", "vt.edu") |
|
|
|
|
|
field_value = field_value.lower() |
|
|
|
|
|
field_value = field_value.replace(" ", "") |
|
|
|
if field_name == "Phone": |
|
|
|
field_value = "".join(filter(str.isdigit, field_value)) |
|
|
|
|
|
if field_value == "β": |
|
field_value = "Yes" |
|
|
|
|
|
|
|
all_fields[field_name][file_idx] = field_value |
|
else: |
|
print( |
|
f"Unused field name: {field_name}, field value: {field_value}" |
|
) |
|
|
|
df = pd.DataFrame(all_fields, columns=all_fields.keys()) |
|
df["Year"] = df.apply(condense_year, axis=1) |
|
df = df.drop(columns=["Fr", "So", "Jr", "Sr", "Grad Student"]) |
|
df = df.replace({"β": "", None: ""}) |
|
|
|
return df |
|
|
|
|
|
def condense_year(row): |
|
""" |
|
Handles logic to condense year values in the DataFrame row. |
|
""" |
|
years = ["Fr", "So", "Jr", "Sr", "Grad Student"] |
|
year_vals = [row[year] for year in years] |
|
|
|
|
|
if "Yes" in year_vals: |
|
return years[year_vals.index("Yes")] |
|
|
|
|
|
elif year_vals.count("β") == 4: |
|
for val in year_vals: |
|
if not val or "β" not in val: |
|
return years[year_vals.index(val)] |
|
return "" |
|
|
|
|
|
def upload_to_google_sheets(df): |
|
"""Uploads the edited DataFrame to a Google Sheet.""" |
|
|
|
global credentials |
|
|
|
spreadsheet_name = "Cru Connect Cards" |
|
worksheet_name = "Sheet1" |
|
|
|
|
|
gc = gspread.authorize(credentials) |
|
|
|
|
|
try: |
|
spreadsheet = gc.open(spreadsheet_name) |
|
except gspread.SpreadsheetNotFound: |
|
spreadsheet = gc.create(spreadsheet_name) |
|
|
|
|
|
try: |
|
worksheet = spreadsheet.worksheet(worksheet_name) |
|
except gspread.WorksheetNotFound: |
|
worksheet = spreadsheet.add_worksheet( |
|
title=worksheet_name, rows="100", cols="20" |
|
) |
|
|
|
|
|
worksheet.clear() |
|
|
|
|
|
worksheet.update([df.columns.values.tolist()] + df.values.tolist()) |
|
|
|
return f"Data uploaded successfully to {spreadsheet_name} - {worksheet_name}." |
|
|
|
|
|
|
|
def gradio_interface(credentials_file, images_folder): |
|
return process_document_form_parser(credentials_file.name, images_folder) |
|
|
|
|
|
def upload_handler(df_data): |
|
df = pd.DataFrame(df_data) |
|
result = upload_to_google_sheets(df) |
|
return result |
|
|
|
|
|
with gr.Blocks() as iface: |
|
gr.Markdown("# Document Processing and Upload to Google Sheets") |
|
|
|
credentials_file = gr.File( |
|
label="Credentials File" |
|
) |
|
images_folder_path = gr.Textbox(label="Path to Images Folder", value="images_v2") |
|
process_button = gr.Button("Process Documents") |
|
|
|
output_dataframe = gr.Dataframe(label="Output", interactive=True) |
|
upload_button = gr.Button("Upload to Google Sheets") |
|
upload_status = gr.Textbox(label="Upload Status") |
|
|
|
process_button.click( |
|
fn=gradio_interface, |
|
inputs=[credentials_file, images_folder_path], |
|
outputs=[output_dataframe], |
|
) |
|
|
|
upload_button.click( |
|
fn=upload_handler, inputs=[output_dataframe], outputs=[upload_status] |
|
) |
|
|
|
iface.launch() |
|
|