Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import mimetypes | |
import re | |
import zipfile | |
import gradio as gr | |
import pandas as pd | |
from google.cloud import documentai_v1 as documentai | |
from utils import ( | |
ALL_FIELDS_COMBINED, | |
CREDENTIALS, | |
LOCATION, | |
PROCESSOR_ID, | |
PROJECT_ID, | |
upload_to_google_sheets, | |
) | |
def upload_and_process_next(df_data, parsed_documents, current_idx): | |
df = pd.DataFrame( | |
df_data.values[:, 1].reshape(1, -1), | |
columns=df_data.values[:, 0], | |
) | |
result = upload_to_google_sheets(df) | |
return process_next(result, parsed_documents, current_idx) | |
def skip_and_process_next(parsed_documents, current_idx): | |
return process_next("Skipped", parsed_documents, current_idx) | |
def process_next(message, parsed_documents, current_idx): | |
current_idx += 1 | |
if current_idx < len(parsed_documents): | |
new_df = parsed_documents[current_idx] | |
# Extract values for UI components | |
ui_values = extract_ui_values_from_dataframe(new_df) | |
return [message, new_df, parsed_documents, current_idx] + ui_values | |
else: | |
return [ | |
"No more connect cards to process!", | |
pd.DataFrame(), | |
parsed_documents, | |
current_idx, | |
] + [gr.update() for _ in range(28)] | |
def extract_ui_values_from_dataframe(df): | |
"""Extract values from dataframe for UI components in the correct order""" | |
# Create a dictionary for easy lookup | |
data_dict = dict(zip(df["Attribute"], df["Value"])) | |
# Return values in the same order as all_inputs list | |
return [ | |
data_dict.get("Name", ""), # name_input | |
data_dict.get("Phone", ""), # phone_input | |
data_dict.get("Email", ""), # email_input | |
data_dict.get("Cadet", "") == "Yes", # cadet_cb | |
data_dict.get("Greek", "") == "Yes", # greek_cb | |
data_dict.get("Transfer", "") == "Yes", # transfer_cb | |
data_dict.get("Military", "") == "Yes", # military_cb | |
data_dict.get("International", "") == "Yes", # intl_cb | |
data_dict.get("Res Hall", ""), # res_hall_input | |
data_dict.get("Room #", ""), # room_input | |
data_dict.get("Off Campus", "") == "Yes", # off_campus_cb | |
data_dict.get("Fr", "") == "Yes", # fr_cb | |
data_dict.get("So", "") == "Yes", # so_cb | |
data_dict.get("Jr", "") == "Yes", # jr_cb | |
data_dict.get("Sr", "") == "Yes", # sr_cb | |
data_dict.get("Grad Student", "") == "Yes", # grad_cb | |
data_dict.get("Male", "") == "Yes", # male_cb | |
data_dict.get("Female", "") == "Yes", # female_cb | |
data_dict.get("Non-binary", "") == "Yes", # nonbinary_cb | |
# Manual checkboxes - these don't get updated by Document AI | |
data_dict.get("Spiritual Survey Yes", "") == "Yes", # Spiritual Survey Yes | |
data_dict.get("Spiritual Survey No", "") == "Yes", # ss_no_cb | |
data_dict.get("Spiritual Survey Maybe", "") == "Yes", # ss_maybe_cb | |
data_dict.get("Social Event Yes", "") == "Yes", # se_yes_cb | |
data_dict.get("Social Event No", "") == "Yes", # se_no_cb | |
data_dict.get("Social Event Maybe", "") == "Yes", # se_maybe_cb | |
data_dict.get("Small Group Yes", "") == "Yes", # sg_yes_cb | |
data_dict.get("Small Group No", "") == "Yes", # sg_no_cb | |
data_dict.get("Small Group Maybe", "") == "Yes", # sg_maybe_cb | |
] | |
def create_sample_data(): | |
"""Create sample dataframe structure""" | |
return pd.DataFrame( | |
[ | |
{"Attribute": attr, "Value": val} | |
for attr, val in zip(ALL_FIELDS_COMBINED, [""] * len(ALL_FIELDS_COMBINED)) | |
] | |
) | |
def update_dataframe(*args): | |
"""Update dataframe from inputs""" | |
return pd.DataFrame( | |
[ | |
{"Attribute": attr, "Value": val} | |
for attr, val in zip(ALL_FIELDS_COMBINED, args) | |
] | |
) | |
def process_document_form_parser(zip_file): | |
if zip_file is None: | |
return [create_sample_data(), [], -1] + [ | |
"" if i < 5 else False for i in range(28) | |
] | |
# Initialize state | |
parsed_documents = [] | |
current_idx = -1 | |
raw_documents = extract_raw_documents_from_zip_file(zip_file) | |
if not raw_documents: | |
return [create_sample_data(), [], -1] + [ | |
"" if i < 5 else False for i in range(28) | |
] | |
client = documentai.DocumentProcessorServiceClient(credentials=CREDENTIALS) | |
name = client.processor_path(PROJECT_ID, LOCATION, PROCESSOR_ID) | |
# Process each document individually | |
for i, raw_document in enumerate(raw_documents): | |
# This is the slow operation - process one document at a time | |
request = documentai.ProcessRequest(name=name, raw_document=raw_document) | |
result = client.process_document(request=request) | |
# Extract dataframe from the processed document | |
df = extract_dataframe_from_document(result.document) | |
parsed_documents.append(df) | |
# Only yield for the first document to update UI, then let user work without interference | |
if i == 0: | |
current_idx = 0 | |
ui_values = extract_ui_values_from_dataframe(df) | |
yield [df, parsed_documents, current_idx] + ui_values | |
else: | |
# For subsequent documents, yield no-update signals to avoid overwriting user changes | |
yield [gr.update(), parsed_documents, gr.update()] + [ | |
gr.update() for _ in range(28) | |
] | |
def extract_dataframe_from_document(document): | |
# Initialize with empty values for ALL fields (Document AI + Manual) | |
result = {field: "" for field in ALL_FIELDS_COMBINED} | |
# Only process Document AI fields from the document | |
for page in document.pages: | |
for form_field in page.form_fields: | |
field_name = ( | |
form_field.field_name.text_anchor.content | |
if form_field.field_name | |
else "Unnamed Field" | |
) | |
field_value = ( | |
form_field.field_value.text_anchor.content | |
if form_field.field_value | |
else "No Value" | |
) | |
field_name = field_name.strip().replace(":", "") | |
field_value = field_value.strip().replace(":", "") | |
if field_name == "Name" and "\n" in field_value: | |
field_value = " ".join(field_value.split("\n")[1:]) | |
# Check if the field is in the original ALL_FIELDS (Document AI processable fields only) | |
if field_name in ALL_FIELDS_COMBINED: | |
if field_name == "Email": | |
# Validate email addresses | |
field_value = field_value.replace("ut.edu", "vt.edu") | |
field_value = field_value.replace("it.edu", "vt.edu") | |
# Make email addresses lowercase | |
field_value = field_value.lower() | |
# Remove spaces from email addresses | |
field_value = field_value.replace(" ", "") | |
field_value = field_value.replace(",", ".") | |
if field_name == "Phone": | |
# Remove non-numeric characters from phone numbers | |
field_value = "".join(filter(str.isdigit, field_value)) | |
# Parse checkboxes | |
if field_value == "☑": | |
field_value = "Yes" | |
result[field_name] = field_value | |
elif field_name in ["Yes", "No", "Maybe"]: | |
# ~0.75 -> spiritual survey | |
# ~0.83 -> social events | |
# ~0.89 -> small group | |
y_coord = form_field.field_name.bounding_poly.normalized_vertices.pb[ | |
0 | |
].y | |
if 0.70 < y_coord < 0.80: | |
field_name = "Spiritual Survey " + field_name | |
elif 0.80 < y_coord < 0.88: | |
field_name = "Social Event " + field_name | |
elif 0.88 < y_coord < 0.95: | |
field_name = "Small Group " + field_name | |
field_value = "Yes" if field_value == "☑" else "No" | |
result[field_name] = field_value | |
else: | |
print(f"Unused field name: {field_name}, field value: {field_value}") | |
return pd.DataFrame( | |
[ | |
{"Attribute": attr, "Value": val} | |
for attr, val in zip(ALL_FIELDS_COMBINED, result.values()) | |
] | |
) | |
def sort_key(filename): | |
# Extract timestamp and number from filename | |
match = re.match(r"Scanned_(\d{8}-\d{4})(?:\((\d+)\))?\.pdf", filename) | |
if match: | |
timestamp = match.group(1) | |
number = ( | |
int(match.group(2)) if match.group(2) else 0 | |
) # 0 for files without parentheses | |
return (timestamp, number) | |
return (filename, 0) # fallback | |
def extract_raw_documents_from_zip_file(zip_file): | |
raw_documents = [] | |
with zipfile.ZipFile(zip_file.name, "r") as z: | |
for filename in sorted(z.namelist(), key=sort_key): | |
with z.open(filename) as file_data: | |
file_content = file_data.read() | |
mime_type = mimetypes.guess_type(filename)[0] | |
raw_documents.append( | |
documentai.RawDocument(content=file_content, mime_type=mime_type) | |
) | |
return raw_documents | |
# Create the Gradio app with CSS for absolute positioning | |
with gr.Blocks( | |
title="Connect Card Editor", | |
css=""" | |
.card-container { | |
display: inline-block !important; | |
width: 600px !important; | |
} | |
.upload-images-file { | |
position: absolute !important; | |
top: 800px !important; | |
height: 100px !important; | |
width: 600px !important; | |
} | |
.card-image { | |
position: absolute !important; | |
top: 0 !important; | |
left: 0 !important; | |
width: 600px !important; | |
z-index: 1 !important; | |
} | |
.overlay-input { | |
position: absolute !important; | |
z-index: 10 !important; | |
border: 1px solid #ccc !important; | |
border-radius: 3px !important; | |
font-size: 12px !important; | |
} | |
.overlay-checkbox { | |
position: absolute !important; | |
z-index: 10 !important; | |
border-radius: 3px !important; | |
padding: 2px !important; | |
} | |
/* Position text inputs */ | |
.name-input { top: 100px !important; left: 100px !important; width: 450px !important; } | |
.phone-input { top: 190px !important; left: 100px !important; width: 450px !important; } | |
.email-input { top: 240px !important; left: 100px !important; width: 450px !important; } | |
.res-hall-input { top: 410px !important; left: 110px !important; width: 300px !important; } | |
.room-input { top: 410px !important; left: 515px !important; width: 75px !important; } | |
/* Position checkboxes */ | |
.male-cb { top: 16px !important; left: 449px !important; width: fit-content !important; } | |
.female-cb { top: 43px !important; left: 449px !important; width: fit-content !important; } | |
.nonbinary-cb { top: 71px !important; left: 449px !important; width: fit-content !important; } | |
.fr-cb { top: 160px !important; left: 100px !important; width: fit-content !important; } | |
.so-cb { top: 160px !important; left: 175px !important; width: fit-content !important; } | |
.jr-cb { top: 160px !important; left: 256px !important; width: fit-content !important; } | |
.sr-cb { top: 160px !important; left: 332px !important; width: fit-content !important; } | |
.grad-cb { top: 160px !important; left: 410px !important; width: fit-content !important; } | |
.cadet-cb { top: 339px !important; left: 27px !important; width: fit-content !important; } | |
.greek-cb { top: 339px !important; left: 137px !important; width: fit-content !important; } | |
.transfer-cb { top: 339px !important; left: 395px !important; width: fit-content !important; } | |
.military-cb { top: 379px !important; left: 27px !important; width: fit-content !important; } | |
.intl-cb { top: 379px !important; left: 224px !important; width: fit-content !important; } | |
.off-campus-cb { top: 473px !important; left: 124px !important; width: fit-content !important; } | |
/* Position manual (no document AI) checkboxes */ | |
.ss-yes-cb { top: 598px !important; left: 319px !important; width: fit-content !important; } | |
.ss-no-cb { top: 598px !important; left: 398px !important; width: fit-content !important; } | |
.ss-maybe-cb { top: 598px !important; left: 475px !important; width: fit-content !important; } | |
.se-yes-cb { top: 660px !important; left: 319px !important; width: fit-content !important; } | |
.se-no-cb { top: 660px !important; left: 398px !important; width: fit-content !important; } | |
.se-maybe-cb { top: 660px !important; left: 475px !important; width: fit-content !important; } | |
.sg-yes-cb { top: 710px !important; left: 319px !important; width: fit-content !important; } | |
.sg-no-cb { top: 710px !important; left: 398px !important; width: fit-content !important; } | |
.sg-maybe-cb { top: 710px !important; left: 475px !important; width: fit-content !important; } | |
""", | |
) as demo: | |
gr.Markdown("# Connect Card Editor with Overlaid Components") | |
# State variables to replace globals | |
parsed_documents_state = gr.State([]) | |
current_idx_state = gr.State(-1) | |
with gr.Row(): | |
with gr.Column(scale=3, elem_classes=["card-container"]): | |
# Background card image | |
card_image = gr.Image( | |
value="./blank_connection_card.jpg", | |
elem_classes=["card-image"], | |
interactive=False, | |
show_label=False, | |
) | |
male_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "male-cb"], | |
container=False, | |
) | |
female_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "female-cb"], | |
container=False, | |
) | |
nonbinary_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "nonbinary-cb"], | |
container=False, | |
) | |
name_input = gr.Textbox( | |
placeholder="", | |
elem_classes=["overlay-input", "name-input"], | |
show_label=False, | |
container=False, | |
) | |
fr_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "fr-cb"], | |
container=False, | |
) | |
so_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "so-cb"], | |
container=False, | |
) | |
jr_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "jr-cb"], | |
container=False, | |
) | |
sr_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "sr-cb"], | |
container=False, | |
) | |
grad_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "grad-cb"], | |
container=False, | |
) | |
phone_input = gr.Textbox( | |
placeholder="", | |
elem_classes=["overlay-input", "phone-input"], | |
show_label=False, | |
container=False, | |
) | |
email_input = gr.Textbox( | |
placeholder="", | |
elem_classes=["overlay-input", "email-input"], | |
show_label=False, | |
container=False, | |
) | |
cadet_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "cadet-cb"], | |
container=False, | |
) | |
greek_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "greek-cb"], | |
container=False, | |
) | |
transfer_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "transfer-cb"], | |
container=False, | |
) | |
military_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "military-cb"], | |
container=False, | |
) | |
intl_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "intl-cb"], | |
container=False, | |
) | |
res_hall_input = gr.Textbox( | |
placeholder="", | |
elem_classes=["overlay-input", "res-hall-input"], | |
show_label=False, | |
container=False, | |
) | |
room_input = gr.Textbox( | |
min_width=50, | |
placeholder="", | |
elem_classes=["overlay-input", "room-input"], | |
show_label=False, | |
container=False, | |
) | |
off_campus_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "off-campus-cb"], | |
container=False, | |
) | |
# Manual checkboxes that are not processed by Document AI | |
ss_yes_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "ss-yes-cb"], | |
container=False, | |
) | |
ss_no_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "ss-no-cb"], | |
container=False, | |
) | |
ss_maybe_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "ss-maybe-cb"], | |
container=False, | |
) | |
se_yes_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "se-yes-cb"], | |
container=False, | |
) | |
se_no_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "se-no-cb"], | |
container=False, | |
) | |
se_maybe_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "se-maybe-cb"], | |
container=False, | |
) | |
sg_yes_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "sg-yes-cb"], | |
container=False, | |
) | |
sg_no_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "sg-no-cb"], | |
container=False, | |
) | |
sg_maybe_cb = gr.Checkbox( | |
label="", | |
elem_classes=["overlay-checkbox", "sg-maybe-cb"], | |
container=False, | |
) | |
with gr.Column(scale=2): | |
# Data display and controls | |
output_df = gr.Dataframe( | |
value=create_sample_data(), | |
label="", | |
interactive=False, | |
column_widths=[1, 1], | |
) | |
upload_to_sheets_button = gr.Button("Upload and process next") | |
skip_upload_button = gr.Button("Skip and process next") | |
upload_to_sheets_status = gr.Textbox(label="Upload Status") | |
with gr.Column(scale=1): | |
zipfile_upload = gr.File( | |
label="Upload zipfile of images", file_types=[".zip"] | |
) | |
# Collect all inputs in the same order as extract_ui_values_from_dataframe returns them | |
all_inputs = [ | |
name_input, | |
phone_input, | |
email_input, | |
cadet_cb, | |
greek_cb, | |
transfer_cb, | |
military_cb, | |
intl_cb, | |
res_hall_input, | |
room_input, | |
off_campus_cb, | |
fr_cb, | |
so_cb, | |
jr_cb, | |
sr_cb, | |
grad_cb, | |
male_cb, | |
female_cb, | |
nonbinary_cb, | |
ss_yes_cb, | |
ss_no_cb, | |
ss_maybe_cb, | |
se_yes_cb, | |
se_no_cb, | |
se_maybe_cb, | |
sg_yes_cb, | |
sg_no_cb, | |
sg_maybe_cb, | |
] | |
# Set up event handlers | |
zipfile_upload.change( | |
fn=process_document_form_parser, | |
inputs=[zipfile_upload], | |
outputs=[output_df, parsed_documents_state, current_idx_state] + all_inputs, | |
) | |
upload_to_sheets_button.click( | |
fn=upload_and_process_next, | |
inputs=[output_df, parsed_documents_state, current_idx_state], | |
outputs=[ | |
upload_to_sheets_status, | |
output_df, | |
parsed_documents_state, | |
current_idx_state, | |
] | |
+ all_inputs, | |
) | |
skip_upload_button.click( | |
fn=skip_and_process_next, | |
inputs=[parsed_documents_state, current_idx_state], | |
outputs=[ | |
upload_to_sheets_status, | |
output_df, | |
parsed_documents_state, | |
current_idx_state, | |
] | |
+ all_inputs, | |
) | |
for input_component in all_inputs: | |
input_component.change( | |
fn=update_dataframe, inputs=all_inputs, outputs=[output_df] | |
) | |
if __name__ == "__main__": | |
demo.launch() | |