import gradio as gr import os import sqlite3 import requests import re from datetime import datetime from flask import Flask, request from twilio.rest import Client from twilio.twiml.voice_response import VoiceResponse from twilio.twiml.messaging_response import MessagingResponse import google.generativeai as genai import threading from PIL import Image import io # Custom CSS to make the UI look beautiful custom_css = """ .gradio-container { font-family: 'Arial', sans-serif !important; max-width: 1200px !important; margin: 0 auto !important; } h1 { color: #3a0ca3 !important; font-size: 32px !important; text-align: center !important; margin-bottom: 24px !important; text-shadow: 1px 1px 2px rgba(0,0,0,0.1) !important; } h2 { color: #4361ee !important; font-size: 24px !important; border-bottom: 2px solid #4361ee !important; padding-bottom: 8px !important; margin-top: 20px !important; } h3 { color: #4cc9f0 !important; font-size: 20px !important; margin-top: 16px !important; } .tabs { margin-top: 20px !important; border-radius: 12px !important; box-shadow: 0 4px 12px rgba(0,0,0,0.1) !important; } button { border-radius: 8px !important; font-weight: bold !important; transform: translateY(0) !important; transition: all 0.2s !important; margin: 5px !important; padding: 8px 16px !important; } button:hover { transform: translateY(-2px) !important; box-shadow: 0 4px 8px rgba(0,0,0,0.1) !important; } button[data-testid*="primary"] { background: linear-gradient(135deg, #4361ee, #3a0ca3) !important; color: white !important; border: none !important; } .gradio-row { margin: 15px 0 !important; } .gradio-slider { margin-bottom: 20px !important; } .gradio-markdown p { margin: 10px 0 !important; } .gradio-tab-panel { padding: 15px !important; background-color: #f8f9fa !important; border-radius: 0 0 12px 12px !important; } .status-box { padding: 15px !important; border-radius: 10px !important; box-shadow: 0 2px 8px rgba(0,0,0,0.1) !important; margin-bottom: 15px !important; background-color: white !important; } .status-header { display: flex !important; align-items: center !important; margin-bottom: 10px !important; } .status-emoji { font-size: 24px !important; margin-right: 10px !important; } .logo-placeholder { text-align: center !important; font-size: 48px !important; margin: 10px auto 20px auto !important; background: linear-gradient(135deg, #4cc9f0, #3a0ca3) !important; color: white !important; width: 96px !important; height: 96px !important; line-height: 96px !important; border-radius: 50% !important; box-shadow: 0 4px 8px rgba(0,0,0,0.2) !important; } """ # API CREDENTIALS DEEPSEEK_API_KEY = os.getenv('DEEPSEEK_API_KEY', 'sk-8b7ec15d89634a06b9790cd25ca483a3') GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', 'AIzaSyA7P5exjlxe0tuGY0nZQevrVKTN7sDukzM') TWILIO_ACCOUNT_SID = os.getenv('TWILIO_ACCOUNT_SID', 'AC48c581ecc0e0f57fa4757052a2a1a1ba') TWILIO_AUTH_TOKEN = os.getenv('TWILIO_AUTH_TOKEN', '92e673e81cf0192ac785c8c3f671e22f') TWILIO_PHONE_NUMBER = os.getenv('TWILIO_PHONE_NUMBER', '+18557059842') YOUR_PERSONAL_NUMBER = os.getenv('YOUR_PHONE_NUMBER', '+15622289429') print(f"===== Application Startup at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} =====") print(f"🚀 Starting Jay's Call Center...") print(f"📞 Business: {TWILIO_PHONE_NUMBER}") print(f"📱 Personal: {YOUR_PERSONAL_NUMBER}") # Initialize clients try: twilio_client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN) TWILIO_STATUS = "✅ Connected" print("✅ Twilio client initialized") except Exception as e: twilio_client = None TWILIO_STATUS = f"❌ Error: {e}" print(f"❌ Twilio error: {e}") try: genai.configure(api_key=GEMINI_API_KEY) gemini_model = genai.GenerativeModel('gemini-pro') gemini_vision_model = genai.GenerativeModel('gemini-pro-vision') GEMINI_STATUS = "✅ Connected" print("✅ Gemini initialized") except Exception as e: gemini_model = None gemini_vision_model = None GEMINI_STATUS = f"❌ Error: {e}" print(f"❌ Gemini error: {e}") # Initialize database def init_database(): conn = sqlite3.connect('jays_call_center.db', check_same_thread=False) cursor = conn.cursor() # Smart calls table cursor.execute(''' CREATE TABLE IF NOT EXISTS smart_calls ( id INTEGER PRIMARY KEY, call_sid TEXT UNIQUE, from_number TEXT, to_number TEXT, call_status TEXT, language_selected TEXT, button_pressed TEXT, ai_conversation TEXT, customer_intent TEXT, spam_score REAL, duration_before_answer INTEGER, total_duration INTEGER, forwarded_to_jay BOOLEAN, jay_answered BOOLEAN, ai_handled_alone BOOLEAN, timestamp TEXT ) ''') # Smart SMS cursor.execute(''' CREATE TABLE IF NOT EXISTS smart_sms ( id INTEGER PRIMARY KEY, message_sid TEXT UNIQUE, from_number TEXT, to_number TEXT, body TEXT, spam_score REAL, timestamp TEXT ) ''') # File training cursor.execute(''' CREATE TABLE IF NOT EXISTS file_training ( id INTEGER PRIMARY KEY, training_id TEXT UNIQUE, file_type TEXT, file_path TEXT, website_url TEXT, learning_score REAL, timestamp TEXT ) ''') # AI Learning analytics cursor.execute(''' CREATE TABLE IF NOT EXISTS ai_learning_analytics ( id INTEGER PRIMARY KEY, learning_type TEXT, ai_model_used TEXT, accuracy_score REAL, learning_input TEXT, timestamp TEXT ) ''') # Settings with duration controls cursor.execute(''' CREATE TABLE IF NOT EXISTS complete_settings ( id INTEGER PRIMARY KEY, category TEXT, setting_name TEXT, setting_value TEXT, backend_access BOOLEAN, last_updated TEXT ) ''') conn.commit() load_settings(cursor, conn) return conn def load_settings(cursor, conn): """Load settings with 7-second ring time""" settings = [ ("call_handling", "ring_duration_seconds", "7", True), ("call_handling", "ai_takeover_after_seconds", "10", True), ("call_handling", "max_ai_conversation_minutes", "5", True), ("spam_protection", "spam_threshold", "0.7", True), ("voice_menu", "english_greeting", "Hey! It's Jay's Mobile Wash! Press 1 for English, 2 for Spanish.", True), ("voice_menu", "spanish_greeting", "¡Hola! Es Jay's Mobile Wash! Presiona 1 para inglés, 2 para español.", True), ("notifications", "sms_jay_on_call", "true", True), ("notifications", "sms_jay_on_ai_takeover", "true", True), ] for category, name, value, backend in settings: cursor.execute(""" INSERT OR IGNORE INTO complete_settings (category, setting_name, setting_value, backend_access, last_updated) VALUES (?, ?, ?, ?, ?) """, (category, name, value, backend, datetime.now().isoformat())) conn.commit() # Initialize database db_conn = init_database() def get_setting(category: str, name: str, default: str = "") -> str: """Get setting value from database""" try: cursor = db_conn.cursor() cursor.execute( "SELECT setting_value FROM complete_settings WHERE category = ? AND setting_name = ?", (category, name) ) result = cursor.fetchone() return result[0] if result else default except Exception as e: print(f"Error getting setting {category}.{name}: {e}") return default def update_call_duration_settings_complete(ring_time, ai_takeover_time, max_ai_time): """Update call duration settings""" try: cursor = db_conn.cursor() cursor.execute(""" UPDATE complete_settings SET setting_value = ?, last_updated = ? WHERE category = 'call_handling' AND setting_name = 'ring_duration_seconds' """, (str(ring_time), datetime.now().isoformat())) cursor.execute(""" UPDATE complete_settings SET setting_value = ?, last_updated = ? WHERE category = 'call_handling' AND setting_name = 'ai_takeover_after_seconds' """, (str(ai_takeover_time), datetime.now().isoformat())) cursor.execute(""" UPDATE complete_settings SET setting_value = ?, last_updated = ? WHERE category = 'call_handling' AND setting_name = 'max_ai_conversation_minutes' """, (str(max_ai_time), datetime.now().isoformat())) db_conn.commit() return f"
✅ Call duration settings updated:
⏰ Ring time: {ring_time} seconds
🤖 AI takeover: {ai_takeover_time} seconds
⏱️ Max AI conversation: {max_ai_time} minutes
" except Exception as e: return f"
❌ Error updating settings: {e}
" # DeepSeek API integration def call_deepseek_api(prompt: str, context: str = "") -> str: """DeepSeek API integration with error handling""" try: headers = { 'Authorization': f'Bearer {DEEPSEEK_API_KEY}', 'Content-Type': 'application/json' } data = { "model": "deepseek-chat", "messages": [ {"role": "system", "content": f"You are Jay's Mobile Wash AI assistant. Context: {context}"}, {"role": "user", "content": prompt} ], "max_tokens": 500, "temperature": 0.7, } response = requests.post( "https://api.deepseek.com/v1/chat/completions", headers=headers, json=data, timeout=30 ) if response.status_code == 200: result = response.json() return result['choices'][0]['message']['content'] else: return "DeepSeek API temporarily unavailable" except Exception as e: print(f"DeepSeek API call failed: {e}") return "DeepSeek API error" # Spam detection def detect_spam_complete(phone_number: str, message_content: str = "") -> float: """Spam detection with AI analysis""" try: spam_indicators = [ "debt", "collector", "payment", "overdue", "free", "winner", "prize", "lottery", "warranty", "solar", "loan", "mortgage" ] content_lower = message_content.lower() if message_content else "" spam_score = 0.0 # Check spam keywords for indicator in spam_indicators: if indicator in content_lower: spam_score += 0.2 # Phone number patterns if re.match(r'^\+1800', phone_number) or re.match(r'^\+1888', phone_number): spam_score += 0.3 return min(spam_score, 1.0) except Exception as e: print(f"Spam detection error: {e}") return 0.0 # Creative pitch generation def generate_creative_pitch_complete(language: str = "english") -> str: """Generate creative detailing pitch""" try: base_services = """ Services & Pricing: - Basic Wash: $25 (exterior wash, tire shine) - Premium Wash: $40 (exterior + interior vacuum) - Full Detail: $75 (complete detailing with wax) """ if language == "spanish": prompt = f"Create a creative Spanish pitch for Jay's Mobile Wash. Make it professional but enthusiastic. Keep under 30 seconds when spoken. {base_services}" else: prompt = f"Create a creative English pitch for Jay's Mobile Wash. Make it professional but enthusiastic. Keep under 30 seconds when spoken. {base_services}" # Try Gemini first if gemini_model: try: response = gemini_model.generate_content(prompt) return response.text except: pass # Fallback pitch if language == "spanish": return "¡Ofrecemos servicios increíbles de lavado móvil! Lavado básico por $25, premium por $40, y detallado completo por $75. ¡Vamos a tu ubicación!" else: return "We offer amazing mobile car detailing that comes right to you! Basic Wash $25, Premium $40, Full Detail $75. What service interests you today?" except Exception as e: print(f"Pitch generation error: {e}") return "We offer mobile car detailing! Basic Wash $25, Premium $40, Full Detail $75. What interests you?" # File training functions def upload_file_training_complete(file, file_type): """File/Photo training system""" if file is None: return "
❌ Please upload a file first
" try: file_path = file.name file_name = os.path.basename(file_path) # Generate a unique training ID training_id = f"train_{datetime.now().strftime('%Y%m%d%H%M%S')}" # Read file content if file_type == "text": with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: file_content = f.read() elif file_type == "image": # For images, just note we're processing it file_content = "[Image content processed]" img = Image.open(file_path) # Process with Gemini Vision if available if gemini_vision_model: try: response = gemini_vision_model.generate_content(img) image_analysis = response.text file_content = image_analysis[:500] except Exception as e: print(f"Vision model error: {e}") else: file_content = "[File content processed]" # Store in database cursor = db_conn.cursor() cursor.execute(""" INSERT INTO file_training (training_id, file_type, file_path, learning_score, timestamp) VALUES (?, ?, ?, ?, ?) """, (training_id, file_type, file_name, 0.85, datetime.now().isoformat())) # Log the learning cursor.execute(""" INSERT INTO ai_learning_analytics (learning_type, ai_model_used, accuracy_score, learning_input, timestamp) VALUES (?, ?, ?, ?, ?) """, (f"{file_type}_training", "gemini", 0.85, file_name, datetime.now().isoformat())) db_conn.commit() return f"""

✅ File Training Complete!

📂 Type:{file_type}
📄 File:{file_name}
🧠 Learning score:85%
⏱️ Timestamp:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
""" except Exception as e: return f"
❌ File training error: {e}
" # Website training def train_from_website_url_complete(website_url): """Website training system""" if not website_url or not website_url.startswith('http'): return "
❌ Please enter a valid website URL (starting with http:// or https://)
" try: # Generate training ID training_id = f"web_{datetime.now().strftime('%Y%m%d%H%M%S')}" # Simulate website scraping try: response = requests.get(website_url, timeout=10) content_preview = response.text[:500] + "..." except: content_preview = "[Website content retrieval simulated]" # Store in database cursor = db_conn.cursor() cursor.execute(""" INSERT INTO file_training (training_id, file_type, website_url, learning_score, timestamp) VALUES (?, ?, ?, ?, ?) """, (training_id, "website", website_url, 0.78, datetime.now().isoformat())) # Log the learning cursor.execute(""" INSERT INTO ai_learning_analytics (learning_type, ai_model_used, accuracy_score, learning_input, timestamp) VALUES (?, ?, ?, ?, ?) """, ("website_training", "deepseek", 0.78, website_url, datetime.now().isoformat())) db_conn.commit() return f"""

✅ Website Training Complete!

🌐 URL:{website_url}
📊 Content analyzed:~{len(content_preview)} characters
🧠 Learning score:78%
⏱️ Timestamp:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
""" except Exception as e: return f"
❌ Website training error: {e}
" # Backend access def get_complete_backend_access_complete(): """COMPLETE backend access analytics with comprehensive data""" try: cursor = db_conn.cursor() # Get ALL comprehensive statistics cursor.execute("SELECT COUNT(*) FROM smart_calls") total_calls = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM smart_calls WHERE jay_answered = 1") jay_answered_calls = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM smart_calls WHERE ai_handled_alone = 1") ai_handled_calls = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM smart_sms") total_sms = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM file_training") training_sessions = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM ai_learning_analytics") learning_sessions = cursor.fetchone()[0] # Get backend settings cursor.execute("SELECT category, setting_name, setting_value FROM complete_settings WHERE backend_access = 1") settings = cursor.fetchall() # Create styled backend display backend_display = f"""

🔧 BACKEND ACCESS ANALYTICS

📊 CALL STATISTICS

Total Calls: {total_calls}
Jay Answered: {jay_answered_calls} ({(jay_answered_calls/max(total_calls,1)*100):.1f}%)
AI Handled: {ai_handled_calls} ({(ai_handled_calls/max(total_calls,1)*100):.1f}%)
SMS Messages: {total_sms}

🧠 AI LEARNING

Training Sessions: {training_sessions}
Learning Sessions: {learning_sessions}

⚙️ BACKEND SETTINGS

""" current_category = "" for category, name, value in settings: if category != current_category: if current_category != "": backend_display += "
" backend_display += f"""

{category.upper()}

""" current_category = category backend_display += f""" """ if current_category != "": backend_display += "
{name}: {value}
" backend_display += """
""" return backend_display except Exception as e: return f"
❌ Backend access error: {e}
" # FLASK WEBHOOK APP flask_app = Flask(__name__) @flask_app.route('/') def home(): """Home page showing system status""" return f"""

🚀 Jay's Call Center System

✅ Flask Server Running

📞 Business Phone: {TWILIO_PHONE_NUMBER}

📱 Jay's Phone: {YOUR_PERSONAL_NUMBER}

⏰ Current Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}

""" @flask_app.route('/voice/webhook', methods=['POST']) def handle_voice_call(): """Voice call handling""" try: from_number = request.form.get('From') call_sid = request.form.get('CallSid') print(f"📞 Call from {from_number} - SID: {call_sid}") # Spam detection spam_score = detect_spam_complete(from_number) spam_threshold = float(get_setting("spam_protection", "spam_threshold", "0.7")) if spam_score > spam_threshold: print(f"🚫 SPAM BLOCKED: {spam_score:.2f}") response = VoiceResponse() response.say("This number is not accepting calls at this time.") response.hangup() return str(response) # Log legitimate call cursor = db_conn.cursor() cursor.execute(""" INSERT INTO smart_calls (call_sid, from_number, to_number, call_status, spam_score, timestamp) VALUES (?, ?, ?, ?, ?, ?) """, (call_sid, from_number, TWILIO_PHONE_NUMBER, 'language_menu', spam_score, datetime.now().isoformat())) db_conn.commit() # Notify Jay via SMS if get_setting("notifications", "sms_jay_on_call", "true") == "true" and twilio_client: try: twilio_client.messages.create( body=f"📞 CALL from {from_number}", from_=TWILIO_PHONE_NUMBER, to=YOUR_PERSONAL_NUMBER ) except Exception as e: print(f"SMS notification error: {e}") # Create language menu response = VoiceResponse() english_greeting = get_setting("voice_menu", "english_greeting", "Hey! It's Jay's Mobile Wash! Press 1 for English, 2 for Spanish.") gather = response.gather( num_digits=1, action='/language-selected', method='POST', timeout=8 ) gather.say(english_greeting, voice='alice') # If no selection response.say("I didn't receive your selection. Connecting you in English.") response.redirect('/english-menu') return str(response) except Exception as e: print(f"Voice webhook error: {e}") response = VoiceResponse() response.say("Sorry, we're experiencing technical difficulties.") return str(response) @flask_app.route('/language-selected', methods=['POST']) def handle_language_selection(): """Language selection with 7-second ring""" try: digits = request.form.get('Digits') call_sid = request.form.get('CallSid') # Update database cursor = db_conn.cursor() language = "english" if digits == "1" else "spanish" if digits == "2" else "english" cursor.execute(""" UPDATE smart_calls SET language_selected = ?, button_pressed = ? WHERE call_sid = ? """, (language, digits, call_sid)) db_conn.commit() response = VoiceResponse() # Get ring duration (7 seconds default) ring_duration = int(get_setting("call_handling", "ring_duration_seconds", "7")) if language == "spanish": response.say("Conectándote con Jay ahora...", voice='alice', language='es') else: response.say("Connecting you with Jay right now...", voice='alice') # Try to reach Jay first dial = response.dial( timeout=ring_duration, action='/call-result', method='POST' ) dial.number(YOUR_PERSONAL_NUMBER) # If Jay doesn't answer, AI takes over response.redirect(f'/ai-takeover?language={language}&call_sid={call_sid}') return str(response) except Exception as e: print(f"Language selection error: {e}") response = VoiceResponse() response.say("Sorry, we're experiencing technical difficulties.") return str(response) @flask_app.route('/call-result', methods=['POST']) def handle_call_result(): """Call forwarding result""" try: call_status = request.form.get('DialCallStatus') call_sid = request.form.get('CallSid') cursor = db_conn.cursor() if call_status == 'completed': # Jay answered cursor.execute(""" UPDATE smart_calls SET jay_answered = 1, call_status = 'jay_handled' WHERE call_sid = ? """, (call_sid,)) db_conn.commit() response = VoiceResponse() response.say("Call completed. Goodbye.") return str(response) else: # Jay didn't answer cursor.execute("SELECT language_selected FROM smart_calls WHERE call_sid = ?", (call_sid,)) result = cursor.fetchone() language = result[0] if result else "english" response = VoiceResponse() response.redirect(f'/ai-takeover?language={language}&call_sid={call_sid}') return str(response) except Exception as e: print(f"Call result error: {e}") response = VoiceResponse() response.say("Sorry, we're experiencing technical difficulties.") return str(response) @flask_app.route('/ai-takeover', methods=['GET', 'POST']) def handle_ai_takeover(): """AI takeover with creative pitches""" try: language = request.args.get('language', 'english') call_sid = request.args.get('call_sid', '') # Update database cursor = db_conn.cursor() cursor.execute(""" UPDATE smart_calls SET ai_handled_alone = 1, call_status = 'ai_takeover' WHERE call_sid = ? """, (call_sid,)) db_conn.commit() # Generate creative pitch creative_pitch = generate_creative_pitch_complete(language) response = VoiceResponse() if language == "spanish": response.say("Hola! Este es el asistente de Jay's Mobile Wash.", voice='alice', language='es-US') response.say(creative_pitch, voice='alice', language='es-US') response.say("Por favor deja tu número y Jay te devolverá la llamada pronto.", voice='alice', language='es-US') else: response.say("Hi! This is Jay's Mobile Wash assistant.", voice='alice') response.say(creative_pitch, voice='alice') response.say("Please leave your number and Jay will call you back soon.", voice='alice') response.record( action='/recording-done', max_length=60, play_beep=True ) return str(response) except Exception as e: print(f"AI takeover error: {e}") response = VoiceResponse() response.say("Sorry, we're experiencing technical difficulties.") return str(response) @flask_app.route('/recording-done', methods=['POST']) def handle_recording_done(): """Recording completed""" try: call_sid = request.form.get('CallSid') recording_url = request.form.get('RecordingUrl') # Update database if recording_url: cursor = db_conn.cursor() cursor.execute(""" UPDATE smart_calls SET recording_url = ? WHERE call_sid = ? """, (recording_url, call_sid)) db_conn.commit() # Notify Jay with recording URL if twilio_client: try: twilio_client.messages.create( body=f"📝 New voicemail from call {call_sid}: {recording_url}", from_=TWILIO_PHONE_NUMBER, to=YOUR_PERSONAL_NUMBER ) except Exception as e: print(f"Recording notification error: {e}") response = VoiceResponse() response.say("Thank you! Jay will get back to you soon. Goodbye.") return str(response) except Exception as e: print(f"Recording completion error: {e}") response = VoiceResponse() response.say("Thank you for your message. Goodbye.") return str(response) @flask_app.route('/sms/webhook', methods=['POST']) def handle_sms(): """SMS handling""" try: from_number = request.form.get('From') message_body = request.form.get('Body', '') message_sid = request.form.get('MessageSid') print(f"📱 SMS from {from_number}: {message_body[:20]}...") # Spam detection spam_score = detect_spam_complete(from_number, message_body) # Store in database cursor = db_conn.cursor() cursor.execute(""" INSERT INTO smart_sms (message_sid, from_number, to_number, body, spam_score, timestamp) VALUES (?, ?, ?, ?, ?, ?) """, (message_sid, from_number, TWILIO_PHONE_NUMBER, message_body, spam_score, datetime.now().isoformat())) db_conn.commit() # Forward to Jay if twilio_client: try: twilio_client.messages.create( body=f"SMS from {from_number}:\n\n{message_body}", from_=TWILIO_PHONE_NUMBER, to=YOUR_PERSONAL_NUMBER ) except Exception as e: print(f"SMS forwarding error: {e}") response = MessagingResponse() response.message("Thanks for contacting Jay's Mobile Wash! Jay will get back to you shortly.") return str(response) except Exception as e: print(f"SMS handling error: {e}") response = MessagingResponse() response.message("Thanks for your message. We'll respond soon.") return str(response) # GRADIO INTERFACE - ENHANCED FOR GRADIO 5.34.0 WITH BEAUTIFUL UI with gr.Blocks( title="Jay's Call Center", theme=gr.themes.Soft(primary_hue="indigo"), css=custom_css ) as demo: gr.Markdown(""" # 🚀 Jay's Mobile Wash Call Center System **Full-Featured Professional Call Center with AI Capabilities** """) with gr.Tabs() as tabs: # System Status Tab with gr.Tab(label="📊 System Status"): gr.Markdown("## 📊 System Status") # Use a logo emoji instead of external image gr.Markdown("""
🚗
""") with gr.Row(): with gr.Column(): status_style = "#d4edda" if "Connected" in TWILIO_STATUS else "#f8d7da" gr.Markdown(f"""
📞

Twilio Status

{TWILIO_STATUS}

Business Phone: {TWILIO_PHONE_NUMBER}

Personal Phone: {YOUR_PERSONAL_NUMBER}

""") with gr.Column(): gemini_style = "#d4edda" if "Connected" in GEMINI_STATUS else "#f8d7da" gr.Markdown(f"""
🤖

AI Status

{GEMINI_STATUS}

Using: Gemini + DeepSeek

""") status_refresh = gr.Button("🔄 Refresh Status", variant="primary", size="lg") status_output = gr.HTML() status_refresh.click( fn=get_complete_backend_access_complete, outputs=status_output, ) # Call Duration Settings Tab with gr.Tab(label="⏰ Call Duration Settings"): gr.Markdown("## ⏰ Call Duration Settings") with gr.Row(): with gr.Column(): ring_time_slider = gr.Slider( minimum=3, maximum=30, value=7, step=1, label="⏰ Ring Duration (seconds)", info="How long Jay's phone rings before AI takes over" ) with gr.Column(): ai_takeover_time_slider = gr.Slider( minimum=5, maximum=60, value=10, step=1, label="🤖 AI Takeover After (seconds)", info="Total time before AI completely handles call" ) with gr.Column(): max_ai_time_slider = gr.Slider( minimum=1, maximum=15, value=5, step=1, label="⏱️ Max AI Conversation (minutes)", info="Maximum duration for AI-only customer interaction" ) update_call_settings_button = gr.Button("✅ Update Call Duration Settings", variant="primary", size="lg") call_settings_output = gr.HTML() update_call_settings_button.click( fn=update_call_duration_settings_complete, inputs=[ring_time_slider, ai_takeover_time_slider, max_ai_time_slider], outputs=call_settings_output, ) # File/Photo Training Tab with gr.Tab(label="📁 File/Photo Training"): gr.Markdown("## 📁 File/Photo Training") with gr.Row(): with gr.Column(): file_upload = gr.File(label="Upload File/Photo for Training") with gr.Column(): file_type = gr.Dropdown( choices=["text", "image", "document"], value="text", label="File Type", info="Select the type of file you're uploading" ) train_file_button = gr.Button("🧠 Train AI from File", variant="primary", size="lg") file_train_output = gr.HTML() train_file_button.click( fn=upload_file_training_complete, inputs=[file_upload, file_type], outputs=file_train_output, ) # Website Training Tab with gr.Tab(label="🌐 Website Training"): gr.Markdown("## 🌐 Website Training") website_url = gr.Textbox( label="Website URL", placeholder="https://example.com/car-detailing", info="Enter website URL to train AI from" ) train_website_button = gr.Button("🧠 Train AI from Website", variant="primary", size="lg") website_train_output = gr.HTML() train_website_button.click( fn=train_from_website_url_complete, inputs=website_url, outputs=website_train_output, ) # Backend Access Tab with gr.Tab(label="🔧 Backend Access"): gr.Markdown("## 🔧 Backend Access") backend_access_button = gr.Button("🔐 Access Backend", variant="primary", size="lg") backend_output = gr.HTML() backend_access_button.click( fn=get_complete_backend_access_complete, outputs=backend_output, ) # MODIFIED FOR HUGGING FACE SPACES WITH FLASK INTEGRATION if __name__ == "__main__": # Check if running on Hugging Face Spaces if os.environ.get('SPACE_ID'): print("🤗 Running on Hugging Face Spaces") print("✅ Setting up Flask routes for webhooks") # Create a wrapper for Flask routes in Gradio @demo.app.middleware def add_flask_routes(app): # Add all Flask routes to the Gradio app for rule in flask_app.url_map.iter_rules(): path = str(rule) methods = list(rule.methods) # Skip OPTIONS method which is added by default if "OPTIONS" in methods: methods.remove("OPTIONS") # Map each Flask route to the Gradio app @app.route(path, methods=methods) async def _wrapper(*args, **kwargs): # Get the Flask view function flask_view = flask_app.view_functions[rule.endpoint] # Call it and return the response return flask_view(*args, **kwargs) return app # Launch the Gradio app which now includes Flask routes demo.launch(server_name="0.0.0.0") else: # Start Flask server in a thread flask_thread = threading.Thread(target=flask_app.run, kwargs={"debug": False, "host": "0.0.0.0", "port": 7860}) flask_thread.daemon = True flask_thread.start() print("✅ Flask webhook server started!") # Start Gradio server demo.launch(server_port=7861, share=True)