from pathlib import Path import gradio as gr import threading import queue import numpy as np import base64 import tempfile import os from dotenv import load_dotenv from modules.image_analysis import pil_to_base64_dict, analyze_damage_image from modules.transcription import FireworksTranscription from modules.incident_processing import process_transcript_description from modules.claim_processing import generate_claim_report_pdf load_dotenv() _FILE_PATH = Path(__file__).parents[1] class ClaimsAssistantApp: def __init__(self): self.damage_analysis = None self.incident_data = None self.live_transcription = "" self.transcription_lock = threading.Lock() self.is_recording = False self.transcription_service = None self.audio_queue = queue.Queue() self.final_report_pdf = None self.claim_reference = "" self.pdf_temp_path = None @staticmethod def format_function_calls_display(incident_data): """Format function calls and external data for display""" if not incident_data or "function_calls_made" not in incident_data: return "", False function_calls = incident_data.get("function_calls_made", []) external_data = incident_data.get("external_data_retrieved", {}) if not function_calls: return "", False display_html = """
The AI automatically gathered additional context by calling external functions:
""" for i, call in enumerate(function_calls, 1): status_icon = "✅" if call["status"] == "success" else "❌" function_name = call["function_name"] display_html += f"""Status: {call['status'].title()} - {call['message']}
""" if call["status"] == "success" and function_name in external_data: result = external_data[function_name] if function_name == "weather_lookup": display_html += f"""PDF report will appear here after generation
", label="Claim Report PDF", ) with gr.Row(): download_btn = gr.DownloadButton( "💾 Download PDF Report", visible=False ) submit_btn = gr.Button( "✅ Submit Claim", variant="stop", visible=False ) # Event Handlers def handle_damage_analysis(image, api_key): if image is None: return ( "❌ Please upload an image first", gr.update(visible=False), ) if not api_key.strip(): return ( "❌ Please enter your Fireworks AI API key first", gr.update(visible=False), ) try: # Update status to show processing yield ( "🔄 Analyzing damage... Please wait", gr.update(visible=False), ) image_dict = pil_to_base64_dict(image) self.damage_analysis = analyze_damage_image( image=image_dict, api_key=api_key ) yield ( "✅ Damage analysis completed successfully!", gr.update(value=self.damage_analysis, visible=True), ) return None except Exception as e: yield ( f"❌ Error analyzing damage: {str(e)}", gr.update(visible=False), ) return None def live_transcription_callback(text): """Callback for live transcription updates""" with self.transcription_lock: self.live_transcription = text def initialize_transcription_service(api_key): """Initialize transcription service when audio starts""" if not api_key.strip(): return False if not self.transcription_service: self.transcription_service = FireworksTranscription(api_key) self.transcription_service.set_callback(live_transcription_callback) if not self.is_recording: self.is_recording = True self.live_transcription = "" return self.transcription_service._connect() return True def process_audio_stream(audio_tuple, api_key): """Process incoming audio stream for live transcription""" if not audio_tuple: with self.transcription_lock: return self.live_transcription # Initialize transcription service if needed if not self.is_recording: if not initialize_transcription_service(api_key): return "❌ Failed to initialize transcription service. Check your API key." try: sample_rate, audio_data = audio_tuple # Convert audio data to proper format if not isinstance(audio_data, np.ndarray): audio_data = np.array(audio_data, dtype=np.float32) if audio_data.dtype != np.float32: if audio_data.dtype == np.int16: audio_data = audio_data.astype(np.float32) / 32768.0 elif audio_data.dtype == np.int32: audio_data = audio_data.astype(np.float32) / 2147483648.0 else: audio_data = audio_data.astype(np.float32) # Skip if audio is too quiet if np.max(np.abs(audio_data)) < 0.01: with self.transcription_lock: return self.live_transcription # Convert to mono if stereo if len(audio_data.shape) > 1: audio_data = np.mean(audio_data, axis=1) # Resample to 16kHz if needed if sample_rate != 16000: ratio = 16000 / sample_rate new_length = int(len(audio_data) * ratio) if new_length > 0: audio_data = np.interp( np.linspace(0, len(audio_data) - 1, new_length), np.arange(len(audio_data)), audio_data, ) # Convert to bytes and send to transcription service audio_bytes = (audio_data * 32767).astype(np.int16).tobytes() if ( self.transcription_service and self.transcription_service.is_connected ): self.transcription_service._send_audio_chunk(audio_bytes) except Exception as e: print(f"Error processing audio stream: {e}") # Return current transcription with self.transcription_lock: return self.live_transcription def handle_incident_processing(api_key): """Process the recorded transcription into structured incident data with function calling""" if not self.live_transcription.strip(): return ( "❌ No transcription available. Please record audio first.", gr.update(visible=False), gr.update(visible=False), ) if not api_key.strip(): return ( "❌ Please enter your Fireworks AI API key first", gr.update(visible=False), gr.update(visible=False), ) try: # Update status yield ( "🔄 Processing incident data ... Please wait", gr.update(visible=False), gr.update(visible=False), ) # Use enhanced Fireworks processing with function calling incident_analysis = process_transcript_description( transcript=self.live_transcription, api_key=api_key ) # Convert Pydantic model to dict for JSON display self.incident_data = incident_analysis.model_dump() # Format function calls for display function_calls_html, show_calls = ( self.format_function_calls_display(self.incident_data) ) # Update status message based on function calls if show_calls: status_message = f"✅ Incident processing completed with {len(self.incident_data.get('function_calls_made', []))} AI function calls!" else: status_message = ( "✅ Incident processing completed successfully!" ) yield ( status_message, gr.update(value=function_calls_html, visible=show_calls), gr.update(value=self.incident_data, visible=True), ) return None except Exception as e: yield ( f"❌ Error processing incident: {str(e)}", gr.update(visible=False), gr.update(visible=False), ) return None def handle_report_generation(api_key): """Generate comprehensive claim report as PDF using AI""" if not self.damage_analysis or not self.incident_data: return ( "❌ Please complete damage analysis and incident processing first", "PDF report will appear here after generation
", gr.update(visible=False), gr.update(visible=False), gr.update(open=False), ) if not api_key.strip(): return ( "❌ Please enter your Fireworks AI API key first", "PDF report will appear here after generation
", gr.update(visible=False), gr.update(visible=False), gr.update(open=False), ) try: # Show processing status yield ( "🔄 Generating comprehensive PDF claim report... Please wait", "PDF report will appear here after generation
", gr.update(visible=False), gr.update(visible=False), gr.update(open=False), ) # Generate the PDF report self.final_report_pdf = generate_claim_report_pdf( damage_analysis=self.damage_analysis, incident_data=self.incident_data, ) # Extract claim reference for download filename from datetime import datetime timestamp = datetime.now() self.claim_reference = f"CLM-{timestamp.strftime('%Y%m%d')}-{timestamp.strftime('%H%M%S')}" # Save PDF to temporary file for viewing and downloading if self.pdf_temp_path and os.path.exists(self.pdf_temp_path): os.remove(self.pdf_temp_path) temp_dir = tempfile.gettempdir() self.pdf_temp_path = os.path.join( temp_dir, f"{self.claim_reference}.pdf" ) with open(self.pdf_temp_path, "wb") as f: f.write(self.final_report_pdf) # Create PDF viewer HTML pdf_base64 = base64.b64encode(self.final_report_pdf).decode("utf-8") pdf_viewer_html = f"""📄 Professional PDF report generated successfully! Use the download button below to save.
Error generating PDF report
", gr.update(visible=False), gr.update(visible=False), gr.update(open=False), ) return None def handle_claim_submission(): """Handle final claim submission""" if not self.final_report_pdf: return "❌ No report available to submit" return f"🎉 Claim submitted successfully! Reference: {self.claim_reference}" def cleanup_temp_files(): """Clean up temporary PDF files""" if self.pdf_temp_path and os.path.exists(self.pdf_temp_path): try: os.remove(self.pdf_temp_path) except Exception as e: print(f"Error deleting temporary PDF file: {e}") pass # Wire up the events analyze_btn.click( fn=handle_damage_analysis, inputs=[image_input, api_key], outputs=[damage_status, damage_results], ) # Handle streaming audio for live transcription audio_input.stream( fn=process_audio_stream, inputs=[audio_input, api_key], outputs=[transcription_display], show_progress="hidden", ) # Updated to include function calls display process_incident_btn.click( fn=handle_incident_processing, inputs=[api_key], outputs=[incident_status, function_calls_display, incident_results], ) generate_report_btn.click( fn=handle_report_generation, inputs=[api_key], outputs=[ report_status, pdf_viewer, download_btn, submit_btn, report_accordion, ], ) submit_btn.click(fn=handle_claim_submission, outputs=[report_status]) # Clean up on app close demo.load(lambda: None) return demo def create_claims_app(): """Factory function to create the claims assistant app""" app = ClaimsAssistantApp() return app.create_interface() # Create and launch the demo if __name__ == "__main__": print("Starting AI Claims Assistant Demo with Function Calling") demo = create_claims_app() demo.launch(share=True)