Smart_Asha_Form / app.py
ParulPandey's picture
Update app.py
1e259cc verified
import sys
import platform
import re
import os
import json
import time
import hashlib
import pickle
import warnings
import gradio as gr
import pandas as pd
import torch
from transformers import AutoProcessor, AutoModelForImageTextToText, TextStreamer
from PIL import Image
import torchaudio
import numpy as np
import soundfile as sf
from dotenv import load_dotenv
import psutil
import gc
import tempfile
import requests
from huggingface_hub import HfApi
# --- Configuration ---
GEMMA_MODEL_ID = "google/gemma-3n-E4B-it"
CACHE_FILE = "image_cache.pkl"
AUDIO_CACHE_FILE = "audio_cache.pkl"
# --- Cache Loading ---
def load_cache(file_path):
"""Loads a cache file if it exists, otherwise returns an empty dictionary."""
if os.path.exists(file_path):
try:
with open(file_path, "rb") as f:
return pickle.load(f)
except Exception as e:
# Keep this print as it's an important warning about a failed operation.
print(f"[Cache] Could not load cache from {file_path}: {e}")
return {}
image_cache = load_cache(CACHE_FILE)
audio_cache = load_cache(AUDIO_CACHE_FILE)
# --- Hugging Face Token & Network Check ---
load_dotenv()
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
warnings.warn("HF_TOKEN not found in .env file. This may cause issues if models aren't cached.")
def check_network_connectivity():
"""Checks for a connection to Hugging Face servers."""
try:
requests.get("https://huggingface.co", timeout=5)
return True
except requests.ConnectionError:
return False
# --- Global Model Variables ---
processor = None
model = None
device = torch.device("cpu")
def load_model_with_fallback():
"""Loads the Gemma model, handling offline mode and caching."""
global processor, model, device
if model and model != "cached": return True
print("Loading model on CPU...")
cache_dir = os.path.expanduser("~/.cache/huggingface/hub")
model_cache_path = os.path.join(cache_dir, f"models--{GEMMA_MODEL_ID.replace('/', '--')}")
is_cached = os.path.exists(model_cache_path)
has_network = check_network_connectivity()
use_offline = is_cached and not has_network
if use_offline:
print("Network unavailable. Using offline mode with cached model.")
os.environ["TRANSFORMERS_OFFLINE"] = "1"
try:
processor = AutoProcessor.from_pretrained(GEMMA_MODEL_ID, token=hf_token, local_files_only=use_offline)
model = AutoModelForImageTextToText.from_pretrained(
GEMMA_MODEL_ID,
torch_dtype=torch.float32,
low_cpu_mem_usage=True,
token=hf_token,
local_files_only=use_offline
).to(device).eval()
print(f"Gemma model loaded successfully.")
return True
except Exception as e:
print(f"CRITICAL: Model loading failed: {e}", file=sys.stderr)
return False
# --- Utility Functions ---
class TokenStreamHandler(TextStreamer):
"""A silent streamer that updates a Gradio progress bar."""
def __init__(self, tokenizer, progress_callback=None):
super().__init__(tokenizer)
self.progress_callback = progress_callback
self.tokens_generated = 0
def put(self, value):
super().put(value)
self.tokens_generated += 1
if self.progress_callback:
progress_estimate = min(0.5 + (self.tokens_generated / 200) * 0.4, 0.9)
self.progress_callback(progress_estimate)
def on_finalized_text(self, text: str, stream_end: bool = False):
if stream_end and self.progress_callback:
self.progress_callback(0.95)
def get_image_hash(image_path):
if not image_path or not os.path.exists(image_path): return None
with open(image_path, "rb") as f: return hashlib.md5(f.read()).hexdigest()
def get_audio_hash(audio_data):
if isinstance(audio_data, tuple) and len(audio_data) == 2:
sr, audio_np_array = audio_data
return hashlib.md5(audio_np_array.tobytes() + str(sr).encode()).hexdigest()
return None
def save_to_csv(data, csv_path="output/forms.csv"):
"""Saves form data to a CSV file in long format."""
try:
os.makedirs(os.path.dirname(csv_path), exist_ok=True)
rows_to_save = [
{"Timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "Field_Name": field, "Field_Value": value}
for field, value in data.items()
]
df = pd.DataFrame(rows_to_save)
header = not os.path.exists(csv_path)
df.to_csv(csv_path, mode="a", header=header, index=False)
# Keep this print as it confirms a critical user action succeeded.
print(f"[CSV] Data for {len(rows_to_save)} fields saved to {csv_path}")
except Exception as e:
raise gr.Error(f"Failed to save to CSV: {e}")
# =============================================================================
# CORE AI PROCESSING FUNCTIONS - Main logic for form field extraction
# =============================================================================
def extract_fields_from_image(image_path, progress=None):
"""
Main extraction function - Extracts form field labels from images using Gemma 3n 4B
This is the core AI function that:
1. Loads and preprocesses form images (photos of paper forms)
2. Uses Google's Gemma 3n vision-language model to identify field labels
3. Parses the AI output to extract clean field names
4. Caches results to avoid re-processing the same image
Args:
image_path (str): Path to uploaded form image
progress (callable): Gradio progress callback for UI updates
Returns:
list: Clean list of extracted field names (e.g., ["Name", "Age", "Village"])
Performance: 30-60 seconds on CPU for 4B model
"""
global model, processor, device
start_time = time.time()
# Console logging for monitoring AI processing pipeline
print(f"\n{'='*60}")
print(f"[EXTRACTION START] Processing: {os.path.basename(image_path) if image_path else 'None'}")
print(f"[EXTRACTION START] Timestamp: {time.strftime('%H:%M:%S')}")
print(f"[EXTRACTION START] Using Gemma 3n 4B Vision-Language Model")
print(f"{'='*60}")
# Lazy loading: Load model only when first needed
if model == "cached":
print("[Model] Loading Gemma 3n 4B model on first use...")
if not load_model_with_fallback():
return ["[Error] Failed to load model on first use."]
if progress:
progress(0.05, desc="Starting image processing...")
print("[Status] AI extraction pipeline initialized")
# Smart caching: Check if we've processed this exact image before
print("[Cache] Checking for previously processed results...")
image_hash = get_image_hash(image_path)
print(f"[Cache] Image hash: {image_hash[:12] if image_hash else 'None'}...")
if image_hash in image_cache:
cached_fields = image_cache[image_hash]
elapsed = time.time() - start_time
print(f"[Cache] CACHE HIT! Found {len(cached_fields)} pre-extracted fields in {elapsed:.2f}s")
print(f"[Cache] Cached fields: {cached_fields}")
if progress:
for p in [0.3, 0.6, 0.9, 1.0]:
progress(p, desc="Using cached results...")
time.sleep(0.05)
return cached_fields
print("[Cache] New image detected - proceeding with fresh extraction...")
# Image preprocessing: Load and validate the uploaded form image
print("[Image] Loading and validating uploaded form...")
try:
image = Image.open(image_path).convert("RGB")
print(f"[Image] Successfully loaded! Original dimensions: {image.size}")
except Exception as e:
print(f"[Image] Failed to load image: {e}")
return []
if progress:
progress(0.15, desc="Optimizing image for processing...")
# Smart resizing: Balance quality vs speed for CPU processing
original_size = image.size
max_size = 1024
if image.width > max_size or image.height > max_size:
print(f"[Image] Resizing from {image.size} to fit {max_size}px...")
image.thumbnail((max_size, max_size), Image.Resampling.BICUBIC)
print(f"[Image] Optimized to: {image.size}")
else:
print(f"[Image] Size acceptable, keeping: {image.size}")
if progress:
progress(0.25, desc="Preparing prompt...")
# Prompt engineering: Carefully crafted prompt for multilingual form extraction
prompt = (
"<image_soft_token> Extract ALL possible form field labels from this image. "
"Return ONLY a JSON object where each key is a field name and each value is 'text'. "
"For Hindi/Devanagari forms, preserve the original script. "
"Do NOT include any introductory or concluding text. "
"If a field is partially visible, still include it. "
"Sample output: {\"ग्राम\": \"text\", \"उपकेन्द्र\": \"text\", \"आयु\": \"text\", \"लिंग\": \"text\", ...}"
)
#prompt = "<image_soft_token> Extract all the form field labels from this image. Return ONLY a JSON object where keys are field names and values are 'text'. For Hindi/Devanagari forms, preserve the original script. Do NOT include any introductory or concluding text."
print(f"[Prompt] Using engineered prompt: {prompt[:80]}...")
if progress:
progress(0.35, desc="Tokenizing inputs...")
# Tokenization: Convert image and text to model input format
print("[Tokenizer] Converting image and text to neural network format...")
tokenize_start = time.time()
inputs = processor(images=image, text=prompt, return_tensors="pt")
inputs = {k: v.to(model.device) if hasattr(v, 'to') else v for k, v in inputs.items()}
tokenize_time = time.time() - tokenize_start
print(f"[Tokenizer] Tokenization completed in {tokenize_time:.2f}s")
print(f"[Tokenizer] Input shapes: {', '.join([f'{k}: {v.shape}' for k, v in inputs.items() if hasattr(v, 'shape')])}")
if progress:
progress(0.50, desc="Running AI model (CPU processing)...")
# Model inference: Run Gemma 3n 4B on the form image
print("[Model] Starting Gemma 3n 4B inference...")
print("[Model] CPU Processing: Expected time 30-60 seconds")
generation_start = time.time()
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=256, # Optimized for CPU speed
do_sample=False, # Deterministic output
num_beams=1, # Single beam for efficiency
temperature=0.1, # Low temperature for consistency
repetition_penalty=1.1, # Prevent repetition
length_penalty=1.0,
early_stopping=True,
pad_token_id=processor.tokenizer.eos_token_id
)
generation_time = time.time() - generation_start
print(f"[Model] Inference complete! Processing time: {generation_time:.2f}s")
# Output decoding: Convert model output back to readable text
print("[Decoder] Converting model output to text...")
decode_start = time.time()
text = processor.batch_decode(outputs, skip_special_tokens=True)[0]
decode_time = time.time() - decode_start
print(f"[Decoder] Decoding completed in {decode_time:.3f}s")
print(f"[Output] Raw AI response preview: {text[:200]}...")
# JSON parsing: Extract structured field names from AI response
print("[Parser] Analyzing AI response for field data...")
fields = []
json_match = re.search(r"{.*}", text, re.DOTALL)
json_str = None
if json_match:
json_str = json_match.group(0)
print(f"[Parser] Found JSON structure: {json_str[:100]}...")
try:
parsed_json = json.loads(json_str)
print(f"[Parser] JSON parsed successfully, {len(parsed_json)} field definitions found")
# Handle nested JSON structures
def flatten_json(json_obj):
"""Recursively flatten JSON to extract field names"""
flat = {}
for k, v in json_obj.items():
if isinstance(v, dict) and "type" in v:
flat[k] = v["type"]
elif isinstance(v, dict):
flat.update(flatten_json(v))
else:
flat[k] = v
return flat
flat_json = flatten_json(parsed_json)
fields = list(flat_json.keys())
print(f"[Parser] Extracted {len(fields)} fields: {fields}")
except Exception as e:
print(f"[Parser] JSON parsing failed: {e}")
else:
print("[Parser] No JSON structure detected in output")
# Fallback extraction: Use regex patterns if JSON parsing fails
if not fields:
print("[Fallback] Using regex-based field extraction...")
lines = [line.strip() for line in text.split('\n') if line.strip()]
numbered_pattern = re.compile(r'^\s*(\d+)[\.|\)]?\s*(.+?)(?:\s*[::](.*))?$')
fallback_fields = []
for i, line in enumerate(lines):
# Skip instructional text from AI
if "extract" in line.lower() or "json" in line.lower() or "example" in line.lower():
continue
field_name = None
match = numbered_pattern.match(line)
if match:
raw_field = match.group(2).strip()
field_name = raw_field.rstrip('::')
elif ":" in line:
field_name = line.split(":", 1)[0].strip()
# Quality filter for reasonable field names
if field_name and 2 < len(field_name) < 50:
fallback_fields.append(field_name)
print(f"[Fallback] Line {i+1}: '{line}' -> '{field_name}'")
fields = fallback_fields
print(f"[Fallback] Extracted {len(fields)} fields using pattern matching")
# Final processing: Clean up results
clean_fields = [f.strip() for f in fields if f.strip()]
total_time = time.time() - start_time
# Results summary
print(f"\n[RESULT] Successfully extracted {len(clean_fields)} form fields:")
for i, field in enumerate(clean_fields, 1):
print(f"[RESULT] {i}. {field}")
# Cache results for future use
if image_hash and clean_fields:
print(f"[Cache] Saving {len(clean_fields)} extracted fields to cache...")
image_cache[image_hash] = clean_fields
with open(CACHE_FILE, "wb") as f:
pickle.dump(image_cache, f)
print("[Cache] Results cached successfully")
# Performance summary
print(f"\n{'='*60}")
print(f"[EXTRACTION COMPLETE] Total time: {total_time:.2f}s")
print(f"[EXTRACTION COMPLETE] Status: {'SUCCESS' if len(clean_fields) > 0 else 'NO FIELDS FOUND'}")
print(f"[EXTRACTION COMPLETE] Ready for digital form filling")
print(f"{'='*60}\n")
if progress:
progress(1.0, desc="Extraction complete")
return clean_fields
def transcribe_audio(audio_data, progress=gr.Progress()):
"""Fast audio transcription using Gemma for STT with caching"""
global model, processor, device, audio_cache
if progress:
progress(0.05, desc="Starting audio transcription...")
audio_hash = get_audio_hash(audio_data)
if audio_hash and audio_hash in audio_cache:
cached_result = audio_cache[audio_hash]
if progress:
progress(1.0, desc="Transcription complete!")
return cached_result
if model == "cached":
if not load_model_with_fallback():
return "[Error] Failed to load Gemma model on first use."
if model is None or processor is None:
return "[Error] Gemma model or processor not loaded properly."
if audio_data is None:
return "[Error] No audio provided."
# manage_memory() # Uncomment if you have a manage_memory function
audio_np_array, sr = (None, None)
if isinstance(audio_data, str):
audio_np_array, sr = sf.read(audio_data)
elif isinstance(audio_data, tuple) and len(audio_data) == 2:
sr, audio_np_array = audio_data
else:
return "[Error] Invalid audio input format."
if audio_np_array is None or len(audio_np_array) == 0:
return "[Error] Empty audio data."
if audio_np_array.ndim > 1:
audio_np_array = np.mean(audio_np_array, axis=1)
if np.max(np.abs(audio_np_array)) > 0:
audio_np_array = audio_np_array / (np.max(np.abs(audio_np_array)) + 1e-9)
target_sr = 16000
if sr != target_sr:
ratio = target_sr / sr
new_length = int(len(audio_np_array) * ratio)
audio_np_array = np.interp(
np.linspace(0, len(audio_np_array), new_length),
np.arange(len(audio_np_array)),
audio_np_array
)
sr = target_sr
prompt = "Transcribe this audio in Hindi (Devanagari script)."
messages = [
{
"role": "user",
"content": [
{"type": "audio", "audio": audio_np_array},
{"type": "text", "text": prompt}
]
}
]
if progress:
progress(0.2, desc="Tokenizing audio and prompt...")
input_dict = processor.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors="pt",
sampling_rate=sr
)
final_inputs_for_model = {k: v.to(model.device) for k, v in input_dict.items()}
if progress:
progress(0.6, desc="Generating transcription...")
with torch.inference_mode():
predicted_ids = model.generate(
**final_inputs_for_model,
max_new_tokens=32, # Lowered for faster short-form transcription
do_sample=False,
num_beams=1
)
if progress:
progress(0.9, desc="Decoding transcription...")
transcription = processor.batch_decode(
predicted_ids,
skip_special_tokens=True
)[0].strip()
# Remove any 'user:' or 'model:' tokens anywhere in the output
transcription = re.sub(r'\b(user|model)\s*[::\-]+', '', transcription, flags=re.IGNORECASE)
# Remove standalone 'user' or 'model' words (with or without surrounding whitespace)
transcription = re.sub(r'\b(user|model)\b', '', transcription, flags=re.IGNORECASE)
# Remove the prompt text anywhere in the output (not just at the start)
prompt_text = "Transcribe this audio in Hindi (Devanagari script)."
transcription = transcription.replace(prompt_text, '')
# Remove extra whitespace and leading/trailing punctuation
transcription = transcription.strip(' :\n\t-')
if not transcription or transcription.strip() == "":
transcription = "Audio not clear."
if audio_hash and transcription:
audio_cache[audio_hash] = transcription
with open(AUDIO_CACHE_FILE, "wb") as f:
pickle.dump(audio_cache, f)
if progress:
progress(1.0, desc="Transcription complete!")
return transcription
# --- Gradio UI ---
def main_ui():
"""Builds the main Gradio user interface."""
with gr.Blocks(theme=gr.themes.Soft(), css="""
/* Apple-like system font stack */
.gradio-container {
font-family: -apple-system, BlinkMacSystemFont, 'San Francisco', 'Segoe UI', 'Roboto', 'Helvetica Neue', Arial, 'Noto Sans', sans-serif;
background: #fff !important; /* white background */
}
.skyblue-btn button {
background: #87ceeb !important;
color: #1565c0 !important;
border: none !important;
font-weight: 600 !important;
transition: background 0.2s;
}
.skyblue-btn button:disabled {
background: #b3e5fc !important;
color: #90a4ae !important;
}
.skyblue-btn button:hover:not(:disabled) {
background: #4fc3f7 !important;
color: #0d47a1 !important;
}
.asha-title {
font-family: -apple-system, BlinkMacSystemFont, 'San Francisco', 'Segoe UI', 'Roboto', 'Helvetica Neue', Arial, 'Noto Sans', sans-serif;
font-size: 2.2rem;
font-weight: 700;
color: #1565c0; /* deeper skyblue for title */
letter-spacing: 0.5px;
margin-bottom: 0.5em;
text-align: center;
}
.asha-subtitle {
font-family: -apple-system, BlinkMacSystemFont, 'San Francisco', 'Segoe UI', 'Roboto', 'Helvetica Neue', Arial, 'Noto Sans', sans-serif;
font-size: 1.2rem;
color: #1976d2;
margin-bottom: 1.2em;
text-align: center;
}
.asha-section { background: #e3f2fd; border-radius: 14px; box-shadow: 0 2px 12px #b3e5fc; padding: 2.2em 2em 1.5em 2em; margin-bottom: 1.5em; }
.asha-list { font-size: 1.08rem; color: #263238; margin-bottom: 1.2em; }
.asha-list li { margin-bottom: 0.5em; }
.asha-note { color: #01579b; font-size: 1.05rem; font-style: italic; margin-top: 1em; }
.tab-disabled { opacity: 0.5 !important; pointer-events: none !important; }
.tab-enabled { opacity: 1 !important; }
.sample-img { border: 2px solid #b3e5fc; border-radius: 8px; margin: 4px; max-width: 120px; cursor: pointer; transition: border 0.2s; }
.sample-img:hover { border: 2px solid #0288d1; }
.cancel-audio-x {
display: inline-block;
color: #d32f2f;
font-size: 1.3em;
font-weight: bold;
cursor: pointer;
margin-left: 0.5em;
vertical-align: middle;
user-select: none;
transition: color 0.2s;
}
.cancel-audio-x:hover {
color: #b71c1c;
}
/* Icon-style button for cancel audio */
.cancel-audio-x-btn button {
background: transparent !important;
border: none !important;
box-shadow: none !important;
color: #d32f2f !important;
font-size: 1.3em !important;
font-weight: bold !important;
padding: 0 0.3em !important;
margin-left: 0.5em !important;
vertical-align: middle !important;
cursor: pointer !important;
min-width: 1.7em !important;
min-height: 1.7em !important;
border-radius: 50% !important;
transition: color 0.2s !important;
}
.cancel-audio-x-btn button:hover {
color: #b71c1c !important;
background: #fbe9e7 !important;
}
""") as demo:
fields_state = gr.State([])
extraction_in_progress = gr.State(False)
MAX_FIELDS = 15
# --- Tab Gating State ---
image_uploaded = gr.State(False)
fields_extracted = gr.State(False)
# --- Tab UI ---
with gr.Tabs() as tabs:
# --- About Tab ---
with gr.TabItem("1. About & Demo Info", id=0):
with gr.Column(elem_classes=["asha-section"]):
gr.Markdown(
"""
# ON DEVICE SMART ASHA Form
### **Digitizing rural health, empowering ASHA workers with AI.**
This application is designed to help **ASHA (Accredited Social Health Activist) workers**—the backbone of India's rural healthcare system—quickly digitize handwritten forms and transcribe Hindi voice input. ASHA workers are often the first point of contact for healthcare in villages, but their work is slowed by manual paperwork and language barriers. This tool streamlines their workflow, making data entry faster, more accurate, and accessible even for those more comfortable with Hindi speech than typing.
- **Image-based field extraction:** Upload a photo of an ASHA form and the app will automatically detect and extract all field labels, ready for digital entry.
- **Hindi voice transcription:** Fill any field by speaking in Hindi (Devanagari script) for instant, accurate transcription.
- **Data export:** All submitted data is saved in a CSV for further use or analysis.
## Powered by Gemma-3n-4B
This demo runs on the **Gemma 3n 4B model**, a lightweight yet capable model for on-device AI tasks like image-to-text and Hindi speech transcription.
## On-device CPU Loading
Since this application runs entirely **on-device using CPU**, the model takes some time to **load the first time**. After the initial load, processing is smooth and does not require an internet connection.
## Why this matters
ASHA workers serve over 900 million people in rural India, often with limited digital literacy and resources. By making form digitization and voice transcription seamless, this app saves time, reduces errors, and helps bring rural health data into the digital age—empowering both workers and the communities they serve.
*Note:*
While you are reading this, the **Gemma 3n model** is being loaded in the background to ensure a smooth and fast demo experience. Please explore each step—the workflow is strictly gated for demo clarity. All features are designed for real-world usability and hackathon evaluation.
""",
elem_id="asha_about"
)
model_status = gr.Textbox(value="Loading model in background...", interactive=False, show_label=False, visible=True)
# --- Upload Tab ---
with gr.TabItem("2. Upload Image | छवि अपलोड करें", id=1):
with gr.Row():
with gr.Column(scale=2):
image_input = gr.Image(type="filepath", label="Upload Form Image | फॉर्म की छवि अपलोड करें", height=350, sources=["upload"]) # Only allow file upload
with gr.Row():
extract_btn = gr.Button("Extract Fields | फ़ील्ड निकालें", variant="secondary", elem_classes=["skyblue-btn"])
gr.Markdown("**Or try a sample image | या नमूना छवि आज़माएँ:**", elem_id="sample-image-label")
with gr.Row():
sample_gallery = gr.Gallery(
value=[
"samples/sample_form_2.png",
],
label=None,
show_label=False,
elem_id="sample-gallery",
height=110,
columns=[3],
object_fit="contain",
allow_preview=True,
interactive=True,
elem_classes=["sample-img"]
)
# Removed status_box column
# --- Fill Form Tab ---
with gr.TabItem("3. Fill Form | फॉर्म भरें", id=2):
form_placeholder = gr.Markdown("""Please extract fields from Step 2 first |कृपया पहले चरण 2 से फ़ील्ड निकालें।""", visible=True)
with gr.Column(visible=False) as form_container:
text_inputs, audio_inputs, field_rows = [], [], []
for i in range(MAX_FIELDS):
with gr.Row(visible=False) as row:
text_input = gr.Textbox(interactive=True, label="Enter value | मान दर्ज करें")
audio_input = gr.Audio(sources=["microphone"], type="numpy", label="🎤 Speak to fill | बोलें और भरें", streaming=False)
text_inputs.append(text_input)
audio_inputs.append(audio_input)
field_rows.append(row)
audio_input.change(fn=transcribe_audio, inputs=audio_input, outputs=text_input, show_progress="full")
with gr.Row(visible=False) as action_row:
submit_btn = gr.Button("Submit Form | फॉर्म सबमिट करें", variant="primary")
new_form_btn = gr.Button("Start New Form | नया फॉर्म शुरू करें")
# If you have a record_again_btn, add cancel_previous=True to its click event as well
# Example:
# record_again_btn.click(fn=on_record_again, inputs=[], outputs=[text_input, record_again_btn], cancel_previous=True)
# Removed cancel_x buttons and their logic
# --- Model Preload on App Start (Tab 1) ---
def preload_model():
ok = load_model_with_fallback()
return gr.update(value="Model loaded and ready!" if ok else "Model failed to load. Please check setup.")
# Schedule model loading as soon as the app starts (Tab 1 is shown)
demo.load(fn=preload_model, inputs=None, outputs=model_status, queue=False)
# --- UI Callback Functions ---
def clear_cache_and_reset():
global image_cache, audio_cache
image_cache.clear()
audio_cache.clear()
if os.path.exists(CACHE_FILE): os.remove(CACHE_FILE)
if os.path.exists(AUDIO_CACHE_FILE): os.remove(AUDIO_CACHE_FILE)
return "All caches cleared successfully!"
def on_sample_click(evt: gr.SelectData):
# evt.value can be a dict or a string path
value = evt.value
if isinstance(value, dict) and "image" in value and "path" in value["image"]:
value = value["image"]["path"]
return gr.update(value=value), True
# Dynamically determine the number of outputs for extract_btn
extract_outputs = [extract_btn, form_placeholder, form_container, fields_state, action_row, extraction_in_progress] + field_rows + text_inputs
N = len(extract_outputs)
def on_extract(img_path):
def fill_outputs(updates):
if len(updates) < N:
updates += [gr.update()] * (N - len(updates))
return updates[:N]
print(f"[on_extract] Called with img_path: {img_path}")
if not img_path:
print("[on_extract] No image path provided.")
yield fill_outputs([
gr.update(value="Please upload an image first. | कृपया पहले छवि अपलोड करें।", interactive=True, variant="secondary"),
gr.update(visible=False),
gr.update(visible=True),
gr.update(),
gr.update(visible=True),
False # extraction_in_progress
])
return
# Set extraction_in_progress True
yield fill_outputs([
gr.update(value="Extracting... | निकाल रहे हैं...", interactive=False, variant="secondary", elem_classes=["skyblue-btn"]),
gr.update(visible=False),
gr.update(visible=True),
gr.update(),
gr.update(visible=True),
True # extraction_in_progress
])
try:
fields = extract_fields_from_image(img_path)
print(f"[on_extract] extract_fields_from_image returned: {fields}")
except gr.Error as e:
print(f"[on_extract] Exception during extraction: {e}")
yield fill_outputs([
gr.update(value=str(e), interactive=True, variant="stop"),
gr.update(visible=False),
gr.update(visible=True),
gr.update(),
gr.update(visible=True),
False # extraction_in_progress
])
return
if not fields:
print("[on_extract] No fields found after extraction.")
yield fill_outputs([
gr.update(value="No fields found. | कोई फ़ील्ड नहीं मिली।", interactive=True, variant="stop"),
gr.update(visible=False),
gr.update(visible=True),
gr.update(),
gr.update(visible=True),
False # extraction_in_progress
])
return
num_fields = min(len(fields), MAX_FIELDS)
print(f"[on_extract] num_fields to show: {num_fields}")
row_updates = [gr.update(visible=i < num_fields) for i in range(MAX_FIELDS)]
text_updates = [gr.update(label=fields[i] if i < num_fields else "") for i in range(MAX_FIELDS)]
result = [
gr.update(value=f"Extracted! | निकाला गया!", interactive=False, variant="secondary", elem_classes=["skyblue-btn"]),
gr.update(visible=False),
gr.update(visible=True),
gr.State(fields),
gr.update(visible=True),
False # extraction_in_progress
] + row_updates + text_updates
print(f"[on_extract] Yielding result with {len(result)} outputs.")
yield fill_outputs(result)
def submit_form(*values):
fields = values[-1]
text_values = values[:MAX_FIELDS]
# If fields is a gr.State, get its value
if hasattr(fields, 'value'):
fields = fields.value
if not fields:
return gr.update(value="Error: No fields to submit.", variant="stop", interactive=False), gr.update(visible=True)
data = dict(zip(fields, text_values[:len(fields)]))
try:
save_to_csv(data)
return gr.update(value="Submitted! | सबमिट हो गया", variant="success", interactive=False), gr.update(visible=True)
except gr.Error as e:
return gr.update(value=str(e), variant="stop", interactive=False), gr.update(visible=True)
# --- Start New Form Logic ---
def start_new_form():
# Reset all form fields and UI state
return [
gr.update(value="", interactive=True, variant="secondary"), # status box
gr.update(visible=True), # form_placeholder
gr.update(visible=False), # form_container
gr.State([]), # fields_state
gr.update(visible=False), # action_row
False, # extraction_in_progress
] + [gr.update(visible=False) for _ in range(MAX_FIELDS)] + [gr.update(value="", label="") for _ in range(MAX_FIELDS)]
# --- Button/Callback Wiring ---
sample_gallery.select(fn=on_sample_click, inputs=None, outputs=[image_input, image_uploaded])
extract_btn.click(
fn=on_extract,
inputs=image_input,
outputs=extract_outputs,
show_progress="full",
concurrency_limit=1
)
submit_btn.click(
fn=submit_form,
inputs=text_inputs + [fields_state],
outputs=[submit_btn, action_row],
show_progress="full",
concurrency_limit=1
)
new_form_btn.click(
fn=start_new_form,
inputs=None,
outputs=extract_outputs,
concurrency_limit=1
)
# Optionally, add a button to clear cache
# clear_cache_btn.click(fn=clear_cache_and_reset, inputs=None, outputs=status_box)
return demo
# --- App Launch Block ---
if __name__ == "__main__":
ui = main_ui()
ui.queue().launch(server_name="0.0.0.0", server_port=7860, share=False)