Jayscallcenter / app.py
jjmandog's picture
Update app.py
8e3b1da verified
import gradio as gr
import os
import sqlite3
import requests
import re
from datetime import datetime
from flask import Flask, request
from twilio.rest import Client
from twilio.twiml.voice_response import VoiceResponse
from twilio.twiml.messaging_response import MessagingResponse
import google.generativeai as genai
import threading
from PIL import Image
import io
# Custom CSS to make the UI look beautiful
custom_css = """
.gradio-container {
font-family: 'Arial', sans-serif !important;
max-width: 1200px !important;
margin: 0 auto !important;
}
h1 {
color: #3a0ca3 !important;
font-size: 32px !important;
text-align: center !important;
margin-bottom: 24px !important;
text-shadow: 1px 1px 2px rgba(0,0,0,0.1) !important;
}
h2 {
color: #4361ee !important;
font-size: 24px !important;
border-bottom: 2px solid #4361ee !important;
padding-bottom: 8px !important;
margin-top: 20px !important;
}
h3 {
color: #4cc9f0 !important;
font-size: 20px !important;
margin-top: 16px !important;
}
.tabs {
margin-top: 20px !important;
border-radius: 12px !important;
box-shadow: 0 4px 12px rgba(0,0,0,0.1) !important;
}
button {
border-radius: 8px !important;
font-weight: bold !important;
transform: translateY(0) !important;
transition: all 0.2s !important;
margin: 5px !important;
padding: 8px 16px !important;
}
button:hover {
transform: translateY(-2px) !important;
box-shadow: 0 4px 8px rgba(0,0,0,0.1) !important;
}
button[data-testid*="primary"] {
background: linear-gradient(135deg, #4361ee, #3a0ca3) !important;
color: white !important;
border: none !important;
}
.gradio-row {
margin: 15px 0 !important;
}
.gradio-slider {
margin-bottom: 20px !important;
}
.gradio-markdown p {
margin: 10px 0 !important;
}
.gradio-tab-panel {
padding: 15px !important;
background-color: #f8f9fa !important;
border-radius: 0 0 12px 12px !important;
}
.status-box {
padding: 15px !important;
border-radius: 10px !important;
box-shadow: 0 2px 8px rgba(0,0,0,0.1) !important;
margin-bottom: 15px !important;
background-color: white !important;
}
.status-header {
display: flex !important;
align-items: center !important;
margin-bottom: 10px !important;
}
.status-emoji {
font-size: 24px !important;
margin-right: 10px !important;
}
.logo-placeholder {
text-align: center !important;
font-size: 48px !important;
margin: 10px auto 20px auto !important;
background: linear-gradient(135deg, #4cc9f0, #3a0ca3) !important;
color: white !important;
width: 96px !important;
height: 96px !important;
line-height: 96px !important;
border-radius: 50% !important;
box-shadow: 0 4px 8px rgba(0,0,0,0.2) !important;
}
"""
# API CREDENTIALS
DEEPSEEK_API_KEY = os.getenv('DEEPSEEK_API_KEY', 'sk-8b7ec15d89634a06b9790cd25ca483a3')
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', 'AIzaSyA7P5exjlxe0tuGY0nZQevrVKTN7sDukzM')
TWILIO_ACCOUNT_SID = os.getenv('TWILIO_ACCOUNT_SID', 'AC48c581ecc0e0f57fa4757052a2a1a1ba')
TWILIO_AUTH_TOKEN = os.getenv('TWILIO_AUTH_TOKEN', '92e673e81cf0192ac785c8c3f671e22f')
TWILIO_PHONE_NUMBER = os.getenv('TWILIO_PHONE_NUMBER', '+18557059842')
YOUR_PERSONAL_NUMBER = os.getenv('YOUR_PHONE_NUMBER', '+15622289429')
print(f"===== Application Startup at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} =====")
print(f"πŸš€ Starting Jay's Call Center...")
print(f"πŸ“ž Business: {TWILIO_PHONE_NUMBER}")
print(f"πŸ“± Personal: {YOUR_PERSONAL_NUMBER}")
# Initialize clients
try:
twilio_client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
TWILIO_STATUS = "βœ… Connected"
print("βœ… Twilio client initialized")
except Exception as e:
twilio_client = None
TWILIO_STATUS = f"❌ Error: {e}"
print(f"❌ Twilio error: {e}")
try:
genai.configure(api_key=GEMINI_API_KEY)
gemini_model = genai.GenerativeModel('gemini-pro')
gemini_vision_model = genai.GenerativeModel('gemini-pro-vision')
GEMINI_STATUS = "βœ… Connected"
print("βœ… Gemini initialized")
except Exception as e:
gemini_model = None
gemini_vision_model = None
GEMINI_STATUS = f"❌ Error: {e}"
print(f"❌ Gemini error: {e}")
# Initialize database
def init_database():
conn = sqlite3.connect('jays_call_center.db', check_same_thread=False)
cursor = conn.cursor()
# Smart calls table
cursor.execute('''
CREATE TABLE IF NOT EXISTS smart_calls (
id INTEGER PRIMARY KEY,
call_sid TEXT UNIQUE,
from_number TEXT,
to_number TEXT,
call_status TEXT,
language_selected TEXT,
button_pressed TEXT,
ai_conversation TEXT,
customer_intent TEXT,
spam_score REAL,
duration_before_answer INTEGER,
total_duration INTEGER,
forwarded_to_jay BOOLEAN,
jay_answered BOOLEAN,
ai_handled_alone BOOLEAN,
timestamp TEXT
)
''')
# Smart SMS
cursor.execute('''
CREATE TABLE IF NOT EXISTS smart_sms (
id INTEGER PRIMARY KEY,
message_sid TEXT UNIQUE,
from_number TEXT,
to_number TEXT,
body TEXT,
spam_score REAL,
timestamp TEXT
)
''')
# File training
cursor.execute('''
CREATE TABLE IF NOT EXISTS file_training (
id INTEGER PRIMARY KEY,
training_id TEXT UNIQUE,
file_type TEXT,
file_path TEXT,
website_url TEXT,
learning_score REAL,
timestamp TEXT
)
''')
# AI Learning analytics
cursor.execute('''
CREATE TABLE IF NOT EXISTS ai_learning_analytics (
id INTEGER PRIMARY KEY,
learning_type TEXT,
ai_model_used TEXT,
accuracy_score REAL,
learning_input TEXT,
timestamp TEXT
)
''')
# Settings with duration controls
cursor.execute('''
CREATE TABLE IF NOT EXISTS complete_settings (
id INTEGER PRIMARY KEY,
category TEXT,
setting_name TEXT,
setting_value TEXT,
backend_access BOOLEAN,
last_updated TEXT
)
''')
conn.commit()
load_settings(cursor, conn)
return conn
def load_settings(cursor, conn):
"""Load settings with 7-second ring time"""
settings = [
("call_handling", "ring_duration_seconds", "7", True),
("call_handling", "ai_takeover_after_seconds", "10", True),
("call_handling", "max_ai_conversation_minutes", "5", True),
("spam_protection", "spam_threshold", "0.7", True),
("voice_menu", "english_greeting", "Hey! It's Jay's Mobile Wash! Press 1 for English, 2 for Spanish.", True),
("voice_menu", "spanish_greeting", "Β‘Hola! Es Jay's Mobile Wash! Presiona 1 para inglΓ©s, 2 para espaΓ±ol.", True),
("notifications", "sms_jay_on_call", "true", True),
("notifications", "sms_jay_on_ai_takeover", "true", True),
]
for category, name, value, backend in settings:
cursor.execute("""
INSERT OR IGNORE INTO complete_settings
(category, setting_name, setting_value, backend_access, last_updated)
VALUES (?, ?, ?, ?, ?)
""", (category, name, value, backend, datetime.now().isoformat()))
conn.commit()
# Initialize database
db_conn = init_database()
def get_setting(category: str, name: str, default: str = "") -> str:
"""Get setting value from database"""
try:
cursor = db_conn.cursor()
cursor.execute(
"SELECT setting_value FROM complete_settings WHERE category = ? AND setting_name = ?",
(category, name)
)
result = cursor.fetchone()
return result[0] if result else default
except Exception as e:
print(f"Error getting setting {category}.{name}: {e}")
return default
def update_call_duration_settings_complete(ring_time, ai_takeover_time, max_ai_time):
"""Update call duration settings"""
try:
cursor = db_conn.cursor()
cursor.execute("""
UPDATE complete_settings SET setting_value = ?, last_updated = ?
WHERE category = 'call_handling' AND setting_name = 'ring_duration_seconds'
""", (str(ring_time), datetime.now().isoformat()))
cursor.execute("""
UPDATE complete_settings SET setting_value = ?, last_updated = ?
WHERE category = 'call_handling' AND setting_name = 'ai_takeover_after_seconds'
""", (str(ai_takeover_time), datetime.now().isoformat()))
cursor.execute("""
UPDATE complete_settings SET setting_value = ?, last_updated = ?
WHERE category = 'call_handling' AND setting_name = 'max_ai_conversation_minutes'
""", (str(max_ai_time), datetime.now().isoformat()))
db_conn.commit()
return f"<div style='padding: 15px; background-color: #d4edda; border-radius: 8px; margin: 10px 0;'>βœ… Call duration settings updated:<br>⏰ Ring time: <b>{ring_time} seconds</b><br>πŸ€– AI takeover: <b>{ai_takeover_time} seconds</b><br>⏱️ Max AI conversation: <b>{max_ai_time} minutes</b></div>"
except Exception as e:
return f"<div style='padding: 15px; background-color: #f8d7da; border-radius: 8px; margin: 10px 0;'>❌ Error updating settings: {e}</div>"
# DeepSeek API integration
def call_deepseek_api(prompt: str, context: str = "") -> str:
"""DeepSeek API integration with error handling"""
try:
headers = {
'Authorization': f'Bearer {DEEPSEEK_API_KEY}',
'Content-Type': 'application/json'
}
data = {
"model": "deepseek-chat",
"messages": [
{"role": "system", "content": f"You are Jay's Mobile Wash AI assistant. Context: {context}"},
{"role": "user", "content": prompt}
],
"max_tokens": 500,
"temperature": 0.7,
}
response = requests.post(
"https://api.deepseek.com/v1/chat/completions",
headers=headers,
json=data,
timeout=30
)
if response.status_code == 200:
result = response.json()
return result['choices'][0]['message']['content']
else:
return "DeepSeek API temporarily unavailable"
except Exception as e:
print(f"DeepSeek API call failed: {e}")
return "DeepSeek API error"
# Spam detection
def detect_spam_complete(phone_number: str, message_content: str = "") -> float:
"""Spam detection with AI analysis"""
try:
spam_indicators = [
"debt", "collector", "payment", "overdue",
"free", "winner", "prize", "lottery",
"warranty", "solar", "loan", "mortgage"
]
content_lower = message_content.lower() if message_content else ""
spam_score = 0.0
# Check spam keywords
for indicator in spam_indicators:
if indicator in content_lower:
spam_score += 0.2
# Phone number patterns
if re.match(r'^\+1800', phone_number) or re.match(r'^\+1888', phone_number):
spam_score += 0.3
return min(spam_score, 1.0)
except Exception as e:
print(f"Spam detection error: {e}")
return 0.0
# Creative pitch generation
def generate_creative_pitch_complete(language: str = "english") -> str:
"""Generate creative detailing pitch"""
try:
base_services = """
Services & Pricing:
- Basic Wash: $25 (exterior wash, tire shine)
- Premium Wash: $40 (exterior + interior vacuum)
- Full Detail: $75 (complete detailing with wax)
"""
if language == "spanish":
prompt = f"Create a creative Spanish pitch for Jay's Mobile Wash. Make it professional but enthusiastic. Keep under 30 seconds when spoken. {base_services}"
else:
prompt = f"Create a creative English pitch for Jay's Mobile Wash. Make it professional but enthusiastic. Keep under 30 seconds when spoken. {base_services}"
# Try Gemini first
if gemini_model:
try:
response = gemini_model.generate_content(prompt)
return response.text
except:
pass
# Fallback pitch
if language == "spanish":
return "Β‘Ofrecemos servicios increΓ­bles de lavado mΓ³vil! Lavado bΓ‘sico por $25, premium por $40, y detallado completo por $75. Β‘Vamos a tu ubicaciΓ³n!"
else:
return "We offer amazing mobile car detailing that comes right to you! Basic Wash $25, Premium $40, Full Detail $75. What service interests you today?"
except Exception as e:
print(f"Pitch generation error: {e}")
return "We offer mobile car detailing! Basic Wash $25, Premium $40, Full Detail $75. What interests you?"
# File training functions
def upload_file_training_complete(file, file_type):
"""File/Photo training system"""
if file is None:
return "<div style='padding: 15px; background-color: #f8d7da; border-radius: 8px; margin: 10px 0;'>❌ Please upload a file first</div>"
try:
file_path = file.name
file_name = os.path.basename(file_path)
# Generate a unique training ID
training_id = f"train_{datetime.now().strftime('%Y%m%d%H%M%S')}"
# Read file content
if file_type == "text":
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
file_content = f.read()
elif file_type == "image":
# For images, just note we're processing it
file_content = "[Image content processed]"
img = Image.open(file_path)
# Process with Gemini Vision if available
if gemini_vision_model:
try:
response = gemini_vision_model.generate_content(img)
image_analysis = response.text
file_content = image_analysis[:500]
except Exception as e:
print(f"Vision model error: {e}")
else:
file_content = "[File content processed]"
# Store in database
cursor = db_conn.cursor()
cursor.execute("""
INSERT INTO file_training
(training_id, file_type, file_path, learning_score, timestamp)
VALUES (?, ?, ?, ?, ?)
""", (training_id, file_type, file_name, 0.85, datetime.now().isoformat()))
# Log the learning
cursor.execute("""
INSERT INTO ai_learning_analytics
(learning_type, ai_model_used, accuracy_score, learning_input, timestamp)
VALUES (?, ?, ?, ?, ?)
""", (f"{file_type}_training", "gemini", 0.85, file_name, datetime.now().isoformat()))
db_conn.commit()
return f"""
<div style='padding: 15px; background-color: #d4edda; border-radius: 8px; margin: 10px 0;'>
<h3 style='margin-top: 0; color: #155724;'>βœ… File Training Complete!</h3>
<table style='width: 100%; border-collapse: collapse;'>
<tr><td style='padding: 5px; font-weight: bold;'>πŸ“‚ Type:</td><td>{file_type}</td></tr>
<tr><td style='padding: 5px; font-weight: bold;'>πŸ“„ File:</td><td>{file_name}</td></tr>
<tr><td style='padding: 5px; font-weight: bold;'>🧠 Learning score:</td><td>85%</td></tr>
<tr><td style='padding: 5px; font-weight: bold;'>⏱️ Timestamp:</td><td>{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</td></tr>
</table>
</div>
"""
except Exception as e:
return f"<div style='padding: 15px; background-color: #f8d7da; border-radius: 8px; margin: 10px 0;'>❌ File training error: {e}</div>"
# Website training
def train_from_website_url_complete(website_url):
"""Website training system"""
if not website_url or not website_url.startswith('http'):
return "<div style='padding: 15px; background-color: #f8d7da; border-radius: 8px; margin: 10px 0;'>❌ Please enter a valid website URL (starting with http:// or https://)</div>"
try:
# Generate training ID
training_id = f"web_{datetime.now().strftime('%Y%m%d%H%M%S')}"
# Simulate website scraping
try:
response = requests.get(website_url, timeout=10)
content_preview = response.text[:500] + "..."
except:
content_preview = "[Website content retrieval simulated]"
# Store in database
cursor = db_conn.cursor()
cursor.execute("""
INSERT INTO file_training
(training_id, file_type, website_url, learning_score, timestamp)
VALUES (?, ?, ?, ?, ?)
""", (training_id, "website", website_url, 0.78, datetime.now().isoformat()))
# Log the learning
cursor.execute("""
INSERT INTO ai_learning_analytics
(learning_type, ai_model_used, accuracy_score, learning_input, timestamp)
VALUES (?, ?, ?, ?, ?)
""", ("website_training", "deepseek", 0.78, website_url, datetime.now().isoformat()))
db_conn.commit()
return f"""
<div style='padding: 15px; background-color: #d4edda; border-radius: 8px; margin: 10px 0;'>
<h3 style='margin-top: 0; color: #155724;'>βœ… Website Training Complete!</h3>
<table style='width: 100%; border-collapse: collapse;'>
<tr><td style='padding: 5px; font-weight: bold;'>🌐 URL:</td><td>{website_url}</td></tr>
<tr><td style='padding: 5px; font-weight: bold;'>πŸ“Š Content analyzed:</td><td>~{len(content_preview)} characters</td></tr>
<tr><td style='padding: 5px; font-weight: bold;'>🧠 Learning score:</td><td>78%</td></tr>
<tr><td style='padding: 5px; font-weight: bold;'>⏱️ Timestamp:</td><td>{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</td></tr>
</table>
</div>
"""
except Exception as e:
return f"<div style='padding: 15px; background-color: #f8d7da; border-radius: 8px; margin: 10px 0;'>❌ Website training error: {e}</div>"
# Backend access
def get_complete_backend_access_complete():
"""COMPLETE backend access analytics with comprehensive data"""
try:
cursor = db_conn.cursor()
# Get ALL comprehensive statistics
cursor.execute("SELECT COUNT(*) FROM smart_calls")
total_calls = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM smart_calls WHERE jay_answered = 1")
jay_answered_calls = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM smart_calls WHERE ai_handled_alone = 1")
ai_handled_calls = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM smart_sms")
total_sms = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM file_training")
training_sessions = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM ai_learning_analytics")
learning_sessions = cursor.fetchone()[0]
# Get backend settings
cursor.execute("SELECT category, setting_name, setting_value FROM complete_settings WHERE backend_access = 1")
settings = cursor.fetchall()
# Create styled backend display
backend_display = f"""
<div style="background-color: #f8f9fa; border-radius: 12px; padding: 20px; box-shadow: 0 4px 6px rgba(0,0,0,0.1);">
<h2 style="color: #3a0ca3; text-align: center; margin-bottom: 20px;">πŸ”§ BACKEND ACCESS ANALYTICS</h2>
<div style="background-color: white; border-radius: 8px; padding: 15px; margin-bottom: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.05);">
<h3 style="color: #4361ee; border-bottom: 2px solid #4361ee; padding-bottom: 8px;">πŸ“Š CALL STATISTICS</h3>
<table style="width: 100%; border-collapse: collapse;">
<tr>
<td style="padding: 8px; font-weight: bold;">Total Calls:</td>
<td style="padding: 8px;">{total_calls}</td>
</tr>
<tr>
<td style="padding: 8px; font-weight: bold;">Jay Answered:</td>
<td style="padding: 8px;">{jay_answered_calls} ({(jay_answered_calls/max(total_calls,1)*100):.1f}%)</td>
</tr>
<tr>
<td style="padding: 8px; font-weight: bold;">AI Handled:</td>
<td style="padding: 8px;">{ai_handled_calls} ({(ai_handled_calls/max(total_calls,1)*100):.1f}%)</td>
</tr>
<tr>
<td style="padding: 8px; font-weight: bold;">SMS Messages:</td>
<td style="padding: 8px;">{total_sms}</td>
</tr>
</table>
</div>
<div style="background-color: white; border-radius: 8px; padding: 15px; margin-bottom: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.05);">
<h3 style="color: #4361ee; border-bottom: 2px solid #4361ee; padding-bottom: 8px;">🧠 AI LEARNING</h3>
<table style="width: 100%; border-collapse: collapse;">
<tr>
<td style="padding: 8px; font-weight: bold;">Training Sessions:</td>
<td style="padding: 8px;">{training_sessions}</td>
</tr>
<tr>
<td style="padding: 8px; font-weight: bold;">Learning Sessions:</td>
<td style="padding: 8px;">{learning_sessions}</td>
</tr>
</table>
</div>
<div style="background-color: white; border-radius: 8px; padding: 15px; box-shadow: 0 2px 4px rgba(0,0,0,0.05);">
<h3 style="color: #4361ee; border-bottom: 2px solid #4361ee; padding-bottom: 8px;">βš™οΈ BACKEND SETTINGS</h3>
"""
current_category = ""
for category, name, value in settings:
if category != current_category:
if current_category != "":
backend_display += "</table></div>"
backend_display += f"""
<div style="margin-top: 15px;">
<h4 style="color: #4cc9f0; margin: 10px 0;">{category.upper()}</h4>
<table style="width: 100%; border-collapse: collapse;">
"""
current_category = category
backend_display += f"""
<tr>
<td style="padding: 8px; font-weight: bold;">{name}:</td>
<td style="padding: 8px;">{value}</td>
</tr>
"""
if current_category != "":
backend_display += "</table></div>"
backend_display += """
</div>
</div>
"""
return backend_display
except Exception as e:
return f"<div style='padding: 15px; background-color: #f8d7da; border-radius: 8px; margin: 10px 0;'>❌ Backend access error: {e}</div>"
# FLASK WEBHOOK APP
flask_app = Flask(__name__)
@flask_app.route('/')
def home():
"""Home page showing system status"""
return f"""
<h1>πŸš€ Jay's Call Center System</h1>
<p>βœ… Flask Server Running</p>
<p>πŸ“ž Business Phone: {TWILIO_PHONE_NUMBER}</p>
<p>πŸ“± Jay's Phone: {YOUR_PERSONAL_NUMBER}</p>
<p>⏰ Current Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}</p>
"""
@flask_app.route('/voice/webhook', methods=['POST'])
def handle_voice_call():
"""Voice call handling"""
try:
from_number = request.form.get('From')
call_sid = request.form.get('CallSid')
print(f"πŸ“ž Call from {from_number} - SID: {call_sid}")
# Spam detection
spam_score = detect_spam_complete(from_number)
spam_threshold = float(get_setting("spam_protection", "spam_threshold", "0.7"))
if spam_score > spam_threshold:
print(f"🚫 SPAM BLOCKED: {spam_score:.2f}")
response = VoiceResponse()
response.say("This number is not accepting calls at this time.")
response.hangup()
return str(response)
# Log legitimate call
cursor = db_conn.cursor()
cursor.execute("""
INSERT INTO smart_calls
(call_sid, from_number, to_number, call_status, spam_score, timestamp)
VALUES (?, ?, ?, ?, ?, ?)
""", (call_sid, from_number, TWILIO_PHONE_NUMBER, 'language_menu', spam_score, datetime.now().isoformat()))
db_conn.commit()
# Notify Jay via SMS
if get_setting("notifications", "sms_jay_on_call", "true") == "true" and twilio_client:
try:
twilio_client.messages.create(
body=f"πŸ“ž CALL from {from_number}",
from_=TWILIO_PHONE_NUMBER,
to=YOUR_PERSONAL_NUMBER
)
except Exception as e:
print(f"SMS notification error: {e}")
# Create language menu
response = VoiceResponse()
english_greeting = get_setting("voice_menu", "english_greeting", "Hey! It's Jay's Mobile Wash! Press 1 for English, 2 for Spanish.")
gather = response.gather(
num_digits=1,
action='/language-selected',
method='POST',
timeout=8
)
gather.say(english_greeting, voice='alice')
# If no selection
response.say("I didn't receive your selection. Connecting you in English.")
response.redirect('/english-menu')
return str(response)
except Exception as e:
print(f"Voice webhook error: {e}")
response = VoiceResponse()
response.say("Sorry, we're experiencing technical difficulties.")
return str(response)
@flask_app.route('/language-selected', methods=['POST'])
def handle_language_selection():
"""Language selection with 7-second ring"""
try:
digits = request.form.get('Digits')
call_sid = request.form.get('CallSid')
# Update database
cursor = db_conn.cursor()
language = "english" if digits == "1" else "spanish" if digits == "2" else "english"
cursor.execute("""
UPDATE smart_calls
SET language_selected = ?, button_pressed = ?
WHERE call_sid = ?
""", (language, digits, call_sid))
db_conn.commit()
response = VoiceResponse()
# Get ring duration (7 seconds default)
ring_duration = int(get_setting("call_handling", "ring_duration_seconds", "7"))
if language == "spanish":
response.say("ConectΓ‘ndote con Jay ahora...", voice='alice', language='es')
else:
response.say("Connecting you with Jay right now...", voice='alice')
# Try to reach Jay first
dial = response.dial(
timeout=ring_duration,
action='/call-result',
method='POST'
)
dial.number(YOUR_PERSONAL_NUMBER)
# If Jay doesn't answer, AI takes over
response.redirect(f'/ai-takeover?language={language}&call_sid={call_sid}')
return str(response)
except Exception as e:
print(f"Language selection error: {e}")
response = VoiceResponse()
response.say("Sorry, we're experiencing technical difficulties.")
return str(response)
@flask_app.route('/call-result', methods=['POST'])
def handle_call_result():
"""Call forwarding result"""
try:
call_status = request.form.get('DialCallStatus')
call_sid = request.form.get('CallSid')
cursor = db_conn.cursor()
if call_status == 'completed':
# Jay answered
cursor.execute("""
UPDATE smart_calls
SET jay_answered = 1, call_status = 'jay_handled'
WHERE call_sid = ?
""", (call_sid,))
db_conn.commit()
response = VoiceResponse()
response.say("Call completed. Goodbye.")
return str(response)
else:
# Jay didn't answer
cursor.execute("SELECT language_selected FROM smart_calls WHERE call_sid = ?", (call_sid,))
result = cursor.fetchone()
language = result[0] if result else "english"
response = VoiceResponse()
response.redirect(f'/ai-takeover?language={language}&call_sid={call_sid}')
return str(response)
except Exception as e:
print(f"Call result error: {e}")
response = VoiceResponse()
response.say("Sorry, we're experiencing technical difficulties.")
return str(response)
@flask_app.route('/ai-takeover', methods=['GET', 'POST'])
def handle_ai_takeover():
"""AI takeover with creative pitches"""
try:
language = request.args.get('language', 'english')
call_sid = request.args.get('call_sid', '')
# Update database
cursor = db_conn.cursor()
cursor.execute("""
UPDATE smart_calls
SET ai_handled_alone = 1, call_status = 'ai_takeover'
WHERE call_sid = ?
""", (call_sid,))
db_conn.commit()
# Generate creative pitch
creative_pitch = generate_creative_pitch_complete(language)
response = VoiceResponse()
if language == "spanish":
response.say("Hola! Este es el asistente de Jay's Mobile Wash.", voice='alice', language='es-US')
response.say(creative_pitch, voice='alice', language='es-US')
response.say("Por favor deja tu nΓΊmero y Jay te devolverΓ‘ la llamada pronto.", voice='alice', language='es-US')
else:
response.say("Hi! This is Jay's Mobile Wash assistant.", voice='alice')
response.say(creative_pitch, voice='alice')
response.say("Please leave your number and Jay will call you back soon.", voice='alice')
response.record(
action='/recording-done',
max_length=60,
play_beep=True
)
return str(response)
except Exception as e:
print(f"AI takeover error: {e}")
response = VoiceResponse()
response.say("Sorry, we're experiencing technical difficulties.")
return str(response)
@flask_app.route('/recording-done', methods=['POST'])
def handle_recording_done():
"""Recording completed"""
try:
call_sid = request.form.get('CallSid')
recording_url = request.form.get('RecordingUrl')
# Update database
if recording_url:
cursor = db_conn.cursor()
cursor.execute("""
UPDATE smart_calls
SET recording_url = ?
WHERE call_sid = ?
""", (recording_url, call_sid))
db_conn.commit()
# Notify Jay with recording URL
if twilio_client:
try:
twilio_client.messages.create(
body=f"πŸ“ New voicemail from call {call_sid}: {recording_url}",
from_=TWILIO_PHONE_NUMBER,
to=YOUR_PERSONAL_NUMBER
)
except Exception as e:
print(f"Recording notification error: {e}")
response = VoiceResponse()
response.say("Thank you! Jay will get back to you soon. Goodbye.")
return str(response)
except Exception as e:
print(f"Recording completion error: {e}")
response = VoiceResponse()
response.say("Thank you for your message. Goodbye.")
return str(response)
@flask_app.route('/sms/webhook', methods=['POST'])
def handle_sms():
"""SMS handling"""
try:
from_number = request.form.get('From')
message_body = request.form.get('Body', '')
message_sid = request.form.get('MessageSid')
print(f"πŸ“± SMS from {from_number}: {message_body[:20]}...")
# Spam detection
spam_score = detect_spam_complete(from_number, message_body)
# Store in database
cursor = db_conn.cursor()
cursor.execute("""
INSERT INTO smart_sms
(message_sid, from_number, to_number, body, spam_score, timestamp)
VALUES (?, ?, ?, ?, ?, ?)
""", (message_sid, from_number, TWILIO_PHONE_NUMBER, message_body, spam_score, datetime.now().isoformat()))
db_conn.commit()
# Forward to Jay
if twilio_client:
try:
twilio_client.messages.create(
body=f"SMS from {from_number}:\n\n{message_body}",
from_=TWILIO_PHONE_NUMBER,
to=YOUR_PERSONAL_NUMBER
)
except Exception as e:
print(f"SMS forwarding error: {e}")
response = MessagingResponse()
response.message("Thanks for contacting Jay's Mobile Wash! Jay will get back to you shortly.")
return str(response)
except Exception as e:
print(f"SMS handling error: {e}")
response = MessagingResponse()
response.message("Thanks for your message. We'll respond soon.")
return str(response)
# GRADIO INTERFACE - ENHANCED FOR GRADIO 5.34.0 WITH BEAUTIFUL UI
with gr.Blocks(
title="Jay's Call Center",
theme=gr.themes.Soft(primary_hue="indigo"),
css=custom_css
) as demo:
gr.Markdown("""
# πŸš€ Jay's Mobile Wash Call Center System
**Full-Featured Professional Call Center with AI Capabilities**
""")
with gr.Tabs() as tabs:
# System Status Tab
with gr.Tab(label="πŸ“Š System Status"):
gr.Markdown("## πŸ“Š System Status")
# Use a logo emoji instead of external image
gr.Markdown("""<div class='logo-placeholder'>πŸš—</div>""")
with gr.Row():
with gr.Column():
status_style = "#d4edda" if "Connected" in TWILIO_STATUS else "#f8d7da"
gr.Markdown(f"""
<div style="padding: 15px; background: {status_style}; border-radius: 10px; margin-bottom: 15px;">
<div style="display: flex; align-items: center;">
<span style="font-size: 24px; margin-right: 10px;">πŸ“ž</span>
<h3 style="margin: 0;">Twilio Status</h3>
</div>
<p><b>{TWILIO_STATUS}</b></p>
<p>Business Phone: {TWILIO_PHONE_NUMBER}</p>
<p>Personal Phone: {YOUR_PERSONAL_NUMBER}</p>
</div>
""")
with gr.Column():
gemini_style = "#d4edda" if "Connected" in GEMINI_STATUS else "#f8d7da"
gr.Markdown(f"""
<div style="padding: 15px; background: {gemini_style}; border-radius: 10px; margin-bottom: 15px;">
<div style="display: flex; align-items: center;">
<span style="font-size: 24px; margin-right: 10px;">πŸ€–</span>
<h3 style="margin: 0;">AI Status</h3>
</div>
<p><b>{GEMINI_STATUS}</b></p>
<p>Using: Gemini + DeepSeek</p>
</div>
""")
status_refresh = gr.Button("πŸ”„ Refresh Status", variant="primary", size="lg")
status_output = gr.HTML()
status_refresh.click(
fn=get_complete_backend_access_complete,
outputs=status_output,
)
# Call Duration Settings Tab
with gr.Tab(label="⏰ Call Duration Settings"):
gr.Markdown("## ⏰ Call Duration Settings")
with gr.Row():
with gr.Column():
ring_time_slider = gr.Slider(
minimum=3,
maximum=30,
value=7,
step=1,
label="⏰ Ring Duration (seconds)",
info="How long Jay's phone rings before AI takes over"
)
with gr.Column():
ai_takeover_time_slider = gr.Slider(
minimum=5,
maximum=60,
value=10,
step=1,
label="πŸ€– AI Takeover After (seconds)",
info="Total time before AI completely handles call"
)
with gr.Column():
max_ai_time_slider = gr.Slider(
minimum=1,
maximum=15,
value=5,
step=1,
label="⏱️ Max AI Conversation (minutes)",
info="Maximum duration for AI-only customer interaction"
)
update_call_settings_button = gr.Button("βœ… Update Call Duration Settings", variant="primary", size="lg")
call_settings_output = gr.HTML()
update_call_settings_button.click(
fn=update_call_duration_settings_complete,
inputs=[ring_time_slider, ai_takeover_time_slider, max_ai_time_slider],
outputs=call_settings_output,
)
# File/Photo Training Tab
with gr.Tab(label="πŸ“ File/Photo Training"):
gr.Markdown("## πŸ“ File/Photo Training")
with gr.Row():
with gr.Column():
file_upload = gr.File(label="Upload File/Photo for Training")
with gr.Column():
file_type = gr.Dropdown(
choices=["text", "image", "document"],
value="text",
label="File Type",
info="Select the type of file you're uploading"
)
train_file_button = gr.Button("🧠 Train AI from File", variant="primary", size="lg")
file_train_output = gr.HTML()
train_file_button.click(
fn=upload_file_training_complete,
inputs=[file_upload, file_type],
outputs=file_train_output,
)
# Website Training Tab
with gr.Tab(label="🌐 Website Training"):
gr.Markdown("## 🌐 Website Training")
website_url = gr.Textbox(
label="Website URL",
placeholder="https://example.com/car-detailing",
info="Enter website URL to train AI from"
)
train_website_button = gr.Button("🧠 Train AI from Website", variant="primary", size="lg")
website_train_output = gr.HTML()
train_website_button.click(
fn=train_from_website_url_complete,
inputs=website_url,
outputs=website_train_output,
)
# Backend Access Tab
with gr.Tab(label="πŸ”§ Backend Access"):
gr.Markdown("## πŸ”§ Backend Access")
backend_access_button = gr.Button("πŸ” Access Backend", variant="primary", size="lg")
backend_output = gr.HTML()
backend_access_button.click(
fn=get_complete_backend_access_complete,
outputs=backend_output,
)
# MODIFIED FOR HUGGING FACE SPACES WITH FLASK INTEGRATION
if __name__ == "__main__":
# Check if running on Hugging Face Spaces
if os.environ.get('SPACE_ID'):
print("πŸ€— Running on Hugging Face Spaces")
print("βœ… Setting up Flask routes for webhooks")
# Create a wrapper for Flask routes in Gradio
@demo.app.middleware
def add_flask_routes(app):
# Add all Flask routes to the Gradio app
for rule in flask_app.url_map.iter_rules():
path = str(rule)
methods = list(rule.methods)
# Skip OPTIONS method which is added by default
if "OPTIONS" in methods:
methods.remove("OPTIONS")
# Map each Flask route to the Gradio app
@app.route(path, methods=methods)
async def _wrapper(*args, **kwargs):
# Get the Flask view function
flask_view = flask_app.view_functions[rule.endpoint]
# Call it and return the response
return flask_view(*args, **kwargs)
return app
# Launch the Gradio app which now includes Flask routes
demo.launch(server_name="0.0.0.0")
else:
# Start Flask server in a thread
flask_thread = threading.Thread(target=flask_app.run, kwargs={"debug": False, "host": "0.0.0.0", "port": 7860})
flask_thread.daemon = True
flask_thread.start()
print("βœ… Flask webhook server started!")
# Start Gradio server
demo.launch(server_port=7861, share=True)