Spaces:
Running
Running
from pathlib import Path | |
import gradio as gr | |
import threading | |
import queue | |
import numpy as np | |
import base64 | |
import tempfile | |
import os | |
from dotenv import load_dotenv | |
from modules.image_analysis import pil_to_base64_dict, analyze_damage_image | |
from modules.transcription import FireworksTranscription | |
from modules.incident_processing import process_transcript_description | |
from modules.claim_processing import generate_claim_report_pdf | |
load_dotenv() | |
_FILE_PATH = Path(__file__).parents[1] | |
class ClaimsAssistantApp: | |
def __init__(self): | |
self.damage_analysis = None | |
self.incident_data = None | |
self.live_transcription = "" | |
self.transcription_lock = threading.Lock() | |
self.is_recording = False | |
self.transcription_service = None | |
self.audio_queue = queue.Queue() | |
self.final_report_pdf = None | |
self.claim_reference = "" | |
self.pdf_temp_path = None | |
def format_function_calls_display(incident_data): | |
"""Format function calls and external data for display""" | |
if not incident_data or "function_calls_made" not in incident_data: | |
return "", False | |
function_calls = incident_data.get("function_calls_made", []) | |
external_data = incident_data.get("external_data_retrieved", {}) | |
if not function_calls: | |
return "", False | |
display_html = """ | |
<div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
color: white; padding: 20px; border-radius: 12px; margin: 15px 0;"> | |
<h3 style="margin-top: 0; display: flex; align-items: center;"> | |
<span style="margin-right: 10px;">π§</span> | |
AI Function Calls Executed | |
</h3> | |
<p style="margin-bottom: 15px; opacity: 0.9;"> | |
The AI automatically gathered additional context by calling external functions: | |
</p> | |
""" | |
for i, call in enumerate(function_calls, 1): | |
status_icon = "β " if call["status"] == "success" else "β" | |
function_name = call["function_name"] | |
display_html += f""" | |
<div style="background: rgba(255,255,255,0.1); padding: 15px; border-radius: 8px; margin: 10px 0;"> | |
<h4 style="margin: 0 0 10px 0;"> | |
{status_icon} {i}. {function_name.replace('_', ' ').title()} | |
</h4> | |
<p style="margin: 5px 0; opacity: 0.8; font-size: 14px;"> | |
Status: {call['status'].title()} - {call['message']} | |
</p> | |
""" | |
if call["status"] == "success" and function_name in external_data: | |
result = external_data[function_name] | |
if function_name == "weather_lookup": | |
display_html += f""" | |
<div style="margin: 10px 0; padding: 10px; background: rgba(255,255,255,0.1); border-radius: 5px;"> | |
<strong>Weather Conditions:</strong><br/> | |
π‘οΈ Temperature: {result.get('temperature', 'N/A')}<br/> | |
βοΈ Conditions: {result.get('conditions', 'N/A')}<br/> | |
ποΈ Visibility: {result.get('visibility', 'N/A')}<br/> | |
π§οΈ Precipitation: {result.get('precipitation', 'N/A')} | |
</div> | |
""" | |
elif function_name == "driver_record_check": | |
display_html += f""" | |
<div style="margin: 10px 0; padding: 10px; background: rgba(255,255,255,0.1); border-radius: 5px;"> | |
<strong>Driver Record:</strong><br/> | |
π License: {result.get('license_status', 'N/A')}<br/> | |
π‘οΈ Insurance: {result.get('insurance_status', 'N/A')}<br/> | |
π Risk Level: {result.get('risk_assessment', 'N/A')}<br/> | |
π Previous Claims: {result.get('previous_claims', 0)} | |
</div> | |
""" | |
display_html += "</div>" | |
display_html += """ | |
<div style="margin-top: 15px; padding: 10px; background: rgba(255,255,255,0.1); border-radius: 5px;"> | |
<small style="opacity: 0.8;"> | |
π‘ This additional context helps provide more accurate claim assessment and risk evaluation. | |
</small> | |
</div> | |
</div> | |
""" | |
return display_html, True | |
def create_interface(self): | |
"""Create the main Gradio interface""" | |
with gr.Blocks(title="Scout Claims", theme=gr.themes.Soft()) as demo: | |
# Header | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("# π Scout | AI Claims Assistant π") | |
gr.Markdown( | |
"*Automated Insurance Claims Processing with AI Function Calling*" | |
) | |
# Sidebar (API Key) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### Powered by:") | |
gr.Image( | |
value=str(_FILE_PATH / "assets/fireworks_logo.png"), | |
height=30, | |
width=100, | |
show_label=False, | |
show_download_button=False, | |
container=False, | |
show_fullscreen_button=False, | |
show_share_button=False, | |
) | |
gr.Markdown("## βοΈ Configuration") | |
val = os.getenv("FIREWORKS_API_KEY", "") | |
api_key = gr.Textbox( | |
label="Fireworks AI API Key", | |
type="password", | |
placeholder="Enter your Fireworks AI API key", | |
value=val, | |
info="Required for AI processing", | |
) | |
gr.Markdown("## π Instructions") | |
gr.Markdown( | |
""" | |
**Step 1:** Upload car damage photo(s) \n | |
**Step 2:** Use microphone to describe incident \n | |
**Step 3:** Generate and review claim report \n | |
""" | |
) | |
# Main Content Area | |
with gr.Column(scale=3): | |
# Step 1: Upload Image | |
gr.Markdown("## π· Step 1: Upload Damage Photos π·") | |
with gr.Row(): | |
image_input = gr.Image( | |
label="Car Damage Photo", type="pil", height=300 | |
) | |
with gr.Column(): | |
analyze_btn = gr.Button( | |
"π Analyze Damage", variant="primary" | |
) | |
damage_status = gr.Textbox( | |
label="Analysis Status", | |
value="Ready to analyze damage", | |
interactive=False, | |
lines=2, | |
) | |
# Damage Analysis Results | |
damage_results = gr.JSON( | |
label="Damage Analysis Results", visible=False | |
) | |
gr.Markdown("---") | |
# Step 2: Incident Description with Live Streaming | |
gr.Markdown("## π€ Step 2: Describe the Incident π€") | |
with gr.Accordion( | |
"π‘ What to Include in Your Recording", open=True | |
): | |
gr.Markdown( | |
""" | |
**Please describe the following when you record:** | |
π **When & Where:** | |
- Date and time of the accident | |
- Street address or intersection | |
π₯ **Who Was Involved:** | |
- Other driver's name and contact info | |
- Vehicle details (make, model, color, license plate) | |
- Any witnesses | |
π **What Happened:** | |
- How the accident occurred | |
- Who was at fault and why | |
- Weather and road conditions | |
π₯ **Injuries & Damage:** | |
- Anyone hurt? How seriously? | |
- How severe is the vehicle damage? | |
""" | |
) | |
with gr.Row(): | |
# Direct audio input - no toggle button needed | |
with gr.Column(): | |
audio_input = gr.Audio( | |
label="π΅ Record Incident Description", | |
sources=["microphone"], | |
streaming=True, | |
format="wav", | |
show_download_button=False, | |
) | |
transcription_display = gr.Textbox( | |
label="Live Transcription", | |
placeholder="Click the 'Record' button above to start recording...", | |
lines=8, | |
interactive=False, | |
autoscroll=True, | |
) | |
process_incident_btn = gr.Button( | |
"π Process Incident", variant="primary" | |
) | |
incident_status = gr.Textbox( | |
label="Processing Status", | |
value="Record audio first to process incident", | |
interactive=False, | |
lines=2, | |
) | |
# NEW: Function calls display | |
function_calls_display = gr.HTML( | |
label="AI Function Calls", visible=False | |
) | |
# Incident Processing Results | |
incident_results = gr.JSON( | |
label="Incident Processing Results", visible=False | |
) | |
gr.Markdown("---") | |
# Step 3: Generate Claim Report | |
gr.Markdown("## π Step 3: Generate Claim Report π") | |
generate_report_btn = gr.Button( | |
"π Generate Claim Report", variant="primary", size="lg" | |
) | |
report_status = gr.Textbox( | |
label="Report Generation Status", | |
value="Complete steps 1 and 2 to generate report", | |
interactive=False, | |
lines=2, | |
) | |
# Final Report Display - Updated for PDF | |
with gr.Accordion( | |
"π Generated Claim Report (PDF)", open=False | |
) as report_accordion: | |
# PDF Viewer using HTML iframe | |
pdf_viewer = gr.HTML( | |
value="<p style='text-align: center; color: gray;'>PDF report will appear here after generation</p>", | |
label="Claim Report PDF", | |
) | |
with gr.Row(): | |
download_btn = gr.DownloadButton( | |
"πΎ Download PDF Report", visible=False | |
) | |
submit_btn = gr.Button( | |
"β Submit Claim", variant="stop", visible=False | |
) | |
# Event Handlers | |
def handle_damage_analysis(image, api_key): | |
if image is None: | |
return ( | |
"β Please upload an image first", | |
gr.update(visible=False), | |
) | |
if not api_key.strip(): | |
return ( | |
"β Please enter your Fireworks AI API key first", | |
gr.update(visible=False), | |
) | |
try: | |
# Update status to show processing | |
yield ( | |
"π Analyzing damage... Please wait", | |
gr.update(visible=False), | |
) | |
image_dict = pil_to_base64_dict(image) | |
self.damage_analysis = analyze_damage_image( | |
image=image_dict, api_key=api_key | |
) | |
yield ( | |
"β Damage analysis completed successfully!", | |
gr.update(value=self.damage_analysis, visible=True), | |
) | |
return None | |
except Exception as e: | |
yield ( | |
f"β Error analyzing damage: {str(e)}", | |
gr.update(visible=False), | |
) | |
return None | |
def live_transcription_callback(text): | |
"""Callback for live transcription updates""" | |
with self.transcription_lock: | |
self.live_transcription = text | |
def initialize_transcription_service(api_key): | |
"""Initialize transcription service when audio starts""" | |
if not api_key.strip(): | |
return False | |
if not self.transcription_service: | |
self.transcription_service = FireworksTranscription(api_key) | |
self.transcription_service.set_callback(live_transcription_callback) | |
if not self.is_recording: | |
self.is_recording = True | |
self.live_transcription = "" | |
return self.transcription_service._connect() | |
return True | |
def process_audio_stream(audio_tuple, api_key): | |
"""Process incoming audio stream for live transcription""" | |
if not audio_tuple: | |
with self.transcription_lock: | |
return self.live_transcription | |
# Initialize transcription service if needed | |
if not self.is_recording: | |
if not initialize_transcription_service(api_key): | |
return "β Failed to initialize transcription service. Check your API key." | |
try: | |
sample_rate, audio_data = audio_tuple | |
# Convert audio data to proper format | |
if not isinstance(audio_data, np.ndarray): | |
audio_data = np.array(audio_data, dtype=np.float32) | |
if audio_data.dtype != np.float32: | |
if audio_data.dtype == np.int16: | |
audio_data = audio_data.astype(np.float32) / 32768.0 | |
elif audio_data.dtype == np.int32: | |
audio_data = audio_data.astype(np.float32) / 2147483648.0 | |
else: | |
audio_data = audio_data.astype(np.float32) | |
# Skip if audio is too quiet | |
if np.max(np.abs(audio_data)) < 0.01: | |
with self.transcription_lock: | |
return self.live_transcription | |
# Convert to mono if stereo | |
if len(audio_data.shape) > 1: | |
audio_data = np.mean(audio_data, axis=1) | |
# Resample to 16kHz if needed | |
if sample_rate != 16000: | |
ratio = 16000 / sample_rate | |
new_length = int(len(audio_data) * ratio) | |
if new_length > 0: | |
audio_data = np.interp( | |
np.linspace(0, len(audio_data) - 1, new_length), | |
np.arange(len(audio_data)), | |
audio_data, | |
) | |
# Convert to bytes and send to transcription service | |
audio_bytes = (audio_data * 32767).astype(np.int16).tobytes() | |
if ( | |
self.transcription_service | |
and self.transcription_service.is_connected | |
): | |
self.transcription_service._send_audio_chunk(audio_bytes) | |
except Exception as e: | |
print(f"Error processing audio stream: {e}") | |
# Return current transcription | |
with self.transcription_lock: | |
return self.live_transcription | |
def handle_incident_processing(api_key): | |
"""Process the recorded transcription into structured incident data with function calling""" | |
if not self.live_transcription.strip(): | |
return ( | |
"β No transcription available. Please record audio first.", | |
gr.update(visible=False), | |
gr.update(visible=False), | |
) | |
if not api_key.strip(): | |
return ( | |
"β Please enter your Fireworks AI API key first", | |
gr.update(visible=False), | |
gr.update(visible=False), | |
) | |
try: | |
# Update status | |
yield ( | |
"π Processing incident data ... Please wait", | |
gr.update(visible=False), | |
gr.update(visible=False), | |
) | |
# Use enhanced Fireworks processing with function calling | |
incident_analysis = process_transcript_description( | |
transcript=self.live_transcription, api_key=api_key | |
) | |
# Convert Pydantic model to dict for JSON display | |
self.incident_data = incident_analysis.model_dump() | |
# Format function calls for display | |
function_calls_html, show_calls = ( | |
self.format_function_calls_display(self.incident_data) | |
) | |
# Update status message based on function calls | |
if show_calls: | |
status_message = f"β Incident processing completed with {len(self.incident_data.get('function_calls_made', []))} AI function calls!" | |
else: | |
status_message = ( | |
"β Incident processing completed successfully!" | |
) | |
yield ( | |
status_message, | |
gr.update(value=function_calls_html, visible=show_calls), | |
gr.update(value=self.incident_data, visible=True), | |
) | |
return None | |
except Exception as e: | |
yield ( | |
f"β Error processing incident: {str(e)}", | |
gr.update(visible=False), | |
gr.update(visible=False), | |
) | |
return None | |
def handle_report_generation(api_key): | |
"""Generate comprehensive claim report as PDF using AI""" | |
if not self.damage_analysis or not self.incident_data: | |
return ( | |
"β Please complete damage analysis and incident processing first", | |
"<p style='text-align: center; color: gray;'>PDF report will appear here after generation</p>", | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(open=False), | |
) | |
if not api_key.strip(): | |
return ( | |
"β Please enter your Fireworks AI API key first", | |
"<p style='text-align: center; color: gray;'>PDF report will appear here after generation</p>", | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(open=False), | |
) | |
try: | |
# Show processing status | |
yield ( | |
"π Generating comprehensive PDF claim report... Please wait", | |
"<p style='text-align: center; color: gray;'>PDF report will appear here after generation</p>", | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(open=False), | |
) | |
# Generate the PDF report | |
self.final_report_pdf = generate_claim_report_pdf( | |
damage_analysis=self.damage_analysis, | |
incident_data=self.incident_data, | |
) | |
# Extract claim reference for download filename | |
from datetime import datetime | |
timestamp = datetime.now() | |
self.claim_reference = f"CLM-{timestamp.strftime('%Y%m%d')}-{timestamp.strftime('%H%M%S')}" | |
# Save PDF to temporary file for viewing and downloading | |
if self.pdf_temp_path and os.path.exists(self.pdf_temp_path): | |
os.remove(self.pdf_temp_path) | |
temp_dir = tempfile.gettempdir() | |
self.pdf_temp_path = os.path.join( | |
temp_dir, f"{self.claim_reference}.pdf" | |
) | |
with open(self.pdf_temp_path, "wb") as f: | |
f.write(self.final_report_pdf) | |
# Create PDF viewer HTML | |
pdf_base64 = base64.b64encode(self.final_report_pdf).decode("utf-8") | |
pdf_viewer_html = f""" | |
<div style="text-align: center; margin: 20px 0;"> | |
<h3 style="color: #2563eb;">π Insurance Claim Report - {self.claim_reference}</h3> | |
<iframe | |
src="data:application/pdf;base64,{pdf_base64}" | |
width="100%" | |
height="800px" | |
style="border: 2px solid #e5e7eb; border-radius: 8px; box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);"> | |
<p>Your browser does not support PDF viewing. | |
<a href="data:application/pdf;base64,{pdf_base64}" download="{self.claim_reference}.pdf"> | |
Click here to download the PDF | |
</a></p> | |
</iframe> | |
<p style="margin-top: 15px; color: #6b7280; font-size: 14px;"> | |
π Professional PDF report generated successfully! Use the download button below to save. | |
</p> | |
</div> | |
""" | |
yield ( | |
"β Professional PDF claim report generated successfully!", | |
pdf_viewer_html, | |
gr.update(visible=True, value=self.pdf_temp_path), | |
gr.update(visible=True), | |
gr.update(open=True), | |
) | |
return None | |
except Exception as e: | |
yield ( | |
f"β Error generating PDF report: {str(e)}", | |
"<p style='text-align: center; color: red;'>Error generating PDF report</p>", | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(open=False), | |
) | |
return None | |
def handle_claim_submission(): | |
"""Handle final claim submission""" | |
if not self.final_report_pdf: | |
return "β No report available to submit" | |
return f"π Claim submitted successfully! Reference: {self.claim_reference}" | |
def cleanup_temp_files(): | |
"""Clean up temporary PDF files""" | |
if self.pdf_temp_path and os.path.exists(self.pdf_temp_path): | |
try: | |
os.remove(self.pdf_temp_path) | |
except Exception as e: | |
print(f"Error deleting temporary PDF file: {e}") | |
pass | |
# Wire up the events | |
analyze_btn.click( | |
fn=handle_damage_analysis, | |
inputs=[image_input, api_key], | |
outputs=[damage_status, damage_results], | |
) | |
# Handle streaming audio for live transcription | |
audio_input.stream( | |
fn=process_audio_stream, | |
inputs=[audio_input, api_key], | |
outputs=[transcription_display], | |
show_progress="hidden", | |
) | |
# Updated to include function calls display | |
process_incident_btn.click( | |
fn=handle_incident_processing, | |
inputs=[api_key], | |
outputs=[incident_status, function_calls_display, incident_results], | |
) | |
generate_report_btn.click( | |
fn=handle_report_generation, | |
inputs=[api_key], | |
outputs=[ | |
report_status, | |
pdf_viewer, | |
download_btn, | |
submit_btn, | |
report_accordion, | |
], | |
) | |
submit_btn.click(fn=handle_claim_submission, outputs=[report_status]) | |
# Clean up on app close | |
demo.load(lambda: None) | |
return demo | |
def create_claims_app(): | |
"""Factory function to create the claims assistant app""" | |
app = ClaimsAssistantApp() | |
return app.create_interface() | |
# Create and launch the demo | |
if __name__ == "__main__": | |
print("Starting AI Claims Assistant Demo with Function Calling") | |
demo = create_claims_app() | |
demo.launch(share=True) | |