Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
import sqlite3 | |
import requests | |
import re | |
from datetime import datetime | |
from flask import Flask, request | |
from twilio.rest import Client | |
from twilio.twiml.voice_response import VoiceResponse | |
from twilio.twiml.messaging_response import MessagingResponse | |
import google.generativeai as genai | |
import threading | |
from PIL import Image | |
import io | |
# Custom CSS to make the UI look beautiful | |
custom_css = """ | |
.gradio-container { | |
font-family: 'Arial', sans-serif !important; | |
max-width: 1200px !important; | |
margin: 0 auto !important; | |
} | |
h1 { | |
color: #3a0ca3 !important; | |
font-size: 32px !important; | |
text-align: center !important; | |
margin-bottom: 24px !important; | |
text-shadow: 1px 1px 2px rgba(0,0,0,0.1) !important; | |
} | |
h2 { | |
color: #4361ee !important; | |
font-size: 24px !important; | |
border-bottom: 2px solid #4361ee !important; | |
padding-bottom: 8px !important; | |
margin-top: 20px !important; | |
} | |
h3 { | |
color: #4cc9f0 !important; | |
font-size: 20px !important; | |
margin-top: 16px !important; | |
} | |
.tabs { | |
margin-top: 20px !important; | |
border-radius: 12px !important; | |
box-shadow: 0 4px 12px rgba(0,0,0,0.1) !important; | |
} | |
button { | |
border-radius: 8px !important; | |
font-weight: bold !important; | |
transform: translateY(0) !important; | |
transition: all 0.2s !important; | |
margin: 5px !important; | |
padding: 8px 16px !important; | |
} | |
button:hover { | |
transform: translateY(-2px) !important; | |
box-shadow: 0 4px 8px rgba(0,0,0,0.1) !important; | |
} | |
button[data-testid*="primary"] { | |
background: linear-gradient(135deg, #4361ee, #3a0ca3) !important; | |
color: white !important; | |
border: none !important; | |
} | |
.gradio-row { | |
margin: 15px 0 !important; | |
} | |
.gradio-slider { | |
margin-bottom: 20px !important; | |
} | |
.gradio-markdown p { | |
margin: 10px 0 !important; | |
} | |
.gradio-tab-panel { | |
padding: 15px !important; | |
background-color: #f8f9fa !important; | |
border-radius: 0 0 12px 12px !important; | |
} | |
.status-box { | |
padding: 15px !important; | |
border-radius: 10px !important; | |
box-shadow: 0 2px 8px rgba(0,0,0,0.1) !important; | |
margin-bottom: 15px !important; | |
background-color: white !important; | |
} | |
.status-header { | |
display: flex !important; | |
align-items: center !important; | |
margin-bottom: 10px !important; | |
} | |
.status-emoji { | |
font-size: 24px !important; | |
margin-right: 10px !important; | |
} | |
.logo-placeholder { | |
text-align: center !important; | |
font-size: 48px !important; | |
margin: 10px auto 20px auto !important; | |
background: linear-gradient(135deg, #4cc9f0, #3a0ca3) !important; | |
color: white !important; | |
width: 96px !important; | |
height: 96px !important; | |
line-height: 96px !important; | |
border-radius: 50% !important; | |
box-shadow: 0 4px 8px rgba(0,0,0,0.2) !important; | |
} | |
""" | |
# API CREDENTIALS | |
DEEPSEEK_API_KEY = os.getenv('DEEPSEEK_API_KEY', 'sk-8b7ec15d89634a06b9790cd25ca483a3') | |
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', 'AIzaSyA7P5exjlxe0tuGY0nZQevrVKTN7sDukzM') | |
TWILIO_ACCOUNT_SID = os.getenv('TWILIO_ACCOUNT_SID', 'AC48c581ecc0e0f57fa4757052a2a1a1ba') | |
TWILIO_AUTH_TOKEN = os.getenv('TWILIO_AUTH_TOKEN', '92e673e81cf0192ac785c8c3f671e22f') | |
TWILIO_PHONE_NUMBER = os.getenv('TWILIO_PHONE_NUMBER', '+18557059842') | |
YOUR_PERSONAL_NUMBER = os.getenv('YOUR_PHONE_NUMBER', '+15622289429') | |
print(f"===== Application Startup at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} =====") | |
print(f"π Starting Jay's Call Center...") | |
print(f"π Business: {TWILIO_PHONE_NUMBER}") | |
print(f"π± Personal: {YOUR_PERSONAL_NUMBER}") | |
# Initialize clients | |
try: | |
twilio_client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN) | |
TWILIO_STATUS = "β Connected" | |
print("β Twilio client initialized") | |
except Exception as e: | |
twilio_client = None | |
TWILIO_STATUS = f"β Error: {e}" | |
print(f"β Twilio error: {e}") | |
try: | |
genai.configure(api_key=GEMINI_API_KEY) | |
gemini_model = genai.GenerativeModel('gemini-pro') | |
gemini_vision_model = genai.GenerativeModel('gemini-pro-vision') | |
GEMINI_STATUS = "β Connected" | |
print("β Gemini initialized") | |
except Exception as e: | |
gemini_model = None | |
gemini_vision_model = None | |
GEMINI_STATUS = f"β Error: {e}" | |
print(f"β Gemini error: {e}") | |
# Initialize database | |
def init_database(): | |
conn = sqlite3.connect('jays_call_center.db', check_same_thread=False) | |
cursor = conn.cursor() | |
# Smart calls table | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS smart_calls ( | |
id INTEGER PRIMARY KEY, | |
call_sid TEXT UNIQUE, | |
from_number TEXT, | |
to_number TEXT, | |
call_status TEXT, | |
language_selected TEXT, | |
button_pressed TEXT, | |
ai_conversation TEXT, | |
customer_intent TEXT, | |
spam_score REAL, | |
duration_before_answer INTEGER, | |
total_duration INTEGER, | |
forwarded_to_jay BOOLEAN, | |
jay_answered BOOLEAN, | |
ai_handled_alone BOOLEAN, | |
timestamp TEXT | |
) | |
''') | |
# Smart SMS | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS smart_sms ( | |
id INTEGER PRIMARY KEY, | |
message_sid TEXT UNIQUE, | |
from_number TEXT, | |
to_number TEXT, | |
body TEXT, | |
spam_score REAL, | |
timestamp TEXT | |
) | |
''') | |
# File training | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS file_training ( | |
id INTEGER PRIMARY KEY, | |
training_id TEXT UNIQUE, | |
file_type TEXT, | |
file_path TEXT, | |
website_url TEXT, | |
learning_score REAL, | |
timestamp TEXT | |
) | |
''') | |
# AI Learning analytics | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS ai_learning_analytics ( | |
id INTEGER PRIMARY KEY, | |
learning_type TEXT, | |
ai_model_used TEXT, | |
accuracy_score REAL, | |
learning_input TEXT, | |
timestamp TEXT | |
) | |
''') | |
# Settings with duration controls | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS complete_settings ( | |
id INTEGER PRIMARY KEY, | |
category TEXT, | |
setting_name TEXT, | |
setting_value TEXT, | |
backend_access BOOLEAN, | |
last_updated TEXT | |
) | |
''') | |
conn.commit() | |
load_settings(cursor, conn) | |
return conn | |
def load_settings(cursor, conn): | |
"""Load settings with 7-second ring time""" | |
settings = [ | |
("call_handling", "ring_duration_seconds", "7", True), | |
("call_handling", "ai_takeover_after_seconds", "10", True), | |
("call_handling", "max_ai_conversation_minutes", "5", True), | |
("spam_protection", "spam_threshold", "0.7", True), | |
("voice_menu", "english_greeting", "Hey! It's Jay's Mobile Wash! Press 1 for English, 2 for Spanish.", True), | |
("voice_menu", "spanish_greeting", "Β‘Hola! Es Jay's Mobile Wash! Presiona 1 para inglΓ©s, 2 para espaΓ±ol.", True), | |
("notifications", "sms_jay_on_call", "true", True), | |
("notifications", "sms_jay_on_ai_takeover", "true", True), | |
] | |
for category, name, value, backend in settings: | |
cursor.execute(""" | |
INSERT OR IGNORE INTO complete_settings | |
(category, setting_name, setting_value, backend_access, last_updated) | |
VALUES (?, ?, ?, ?, ?) | |
""", (category, name, value, backend, datetime.now().isoformat())) | |
conn.commit() | |
# Initialize database | |
db_conn = init_database() | |
def get_setting(category: str, name: str, default: str = "") -> str: | |
"""Get setting value from database""" | |
try: | |
cursor = db_conn.cursor() | |
cursor.execute( | |
"SELECT setting_value FROM complete_settings WHERE category = ? AND setting_name = ?", | |
(category, name) | |
) | |
result = cursor.fetchone() | |
return result[0] if result else default | |
except Exception as e: | |
print(f"Error getting setting {category}.{name}: {e}") | |
return default | |
def update_call_duration_settings_complete(ring_time, ai_takeover_time, max_ai_time): | |
"""Update call duration settings""" | |
try: | |
cursor = db_conn.cursor() | |
cursor.execute(""" | |
UPDATE complete_settings SET setting_value = ?, last_updated = ? | |
WHERE category = 'call_handling' AND setting_name = 'ring_duration_seconds' | |
""", (str(ring_time), datetime.now().isoformat())) | |
cursor.execute(""" | |
UPDATE complete_settings SET setting_value = ?, last_updated = ? | |
WHERE category = 'call_handling' AND setting_name = 'ai_takeover_after_seconds' | |
""", (str(ai_takeover_time), datetime.now().isoformat())) | |
cursor.execute(""" | |
UPDATE complete_settings SET setting_value = ?, last_updated = ? | |
WHERE category = 'call_handling' AND setting_name = 'max_ai_conversation_minutes' | |
""", (str(max_ai_time), datetime.now().isoformat())) | |
db_conn.commit() | |
return f"<div style='padding: 15px; background-color: #d4edda; border-radius: 8px; margin: 10px 0;'>β Call duration settings updated:<br>β° Ring time: <b>{ring_time} seconds</b><br>π€ AI takeover: <b>{ai_takeover_time} seconds</b><br>β±οΈ Max AI conversation: <b>{max_ai_time} minutes</b></div>" | |
except Exception as e: | |
return f"<div style='padding: 15px; background-color: #f8d7da; border-radius: 8px; margin: 10px 0;'>β Error updating settings: {e}</div>" | |
# DeepSeek API integration | |
def call_deepseek_api(prompt: str, context: str = "") -> str: | |
"""DeepSeek API integration with error handling""" | |
try: | |
headers = { | |
'Authorization': f'Bearer {DEEPSEEK_API_KEY}', | |
'Content-Type': 'application/json' | |
} | |
data = { | |
"model": "deepseek-chat", | |
"messages": [ | |
{"role": "system", "content": f"You are Jay's Mobile Wash AI assistant. Context: {context}"}, | |
{"role": "user", "content": prompt} | |
], | |
"max_tokens": 500, | |
"temperature": 0.7, | |
} | |
response = requests.post( | |
"https://api.deepseek.com/v1/chat/completions", | |
headers=headers, | |
json=data, | |
timeout=30 | |
) | |
if response.status_code == 200: | |
result = response.json() | |
return result['choices'][0]['message']['content'] | |
else: | |
return "DeepSeek API temporarily unavailable" | |
except Exception as e: | |
print(f"DeepSeek API call failed: {e}") | |
return "DeepSeek API error" | |
# Spam detection | |
def detect_spam_complete(phone_number: str, message_content: str = "") -> float: | |
"""Spam detection with AI analysis""" | |
try: | |
spam_indicators = [ | |
"debt", "collector", "payment", "overdue", | |
"free", "winner", "prize", "lottery", | |
"warranty", "solar", "loan", "mortgage" | |
] | |
content_lower = message_content.lower() if message_content else "" | |
spam_score = 0.0 | |
# Check spam keywords | |
for indicator in spam_indicators: | |
if indicator in content_lower: | |
spam_score += 0.2 | |
# Phone number patterns | |
if re.match(r'^\+1800', phone_number) or re.match(r'^\+1888', phone_number): | |
spam_score += 0.3 | |
return min(spam_score, 1.0) | |
except Exception as e: | |
print(f"Spam detection error: {e}") | |
return 0.0 | |
# Creative pitch generation | |
def generate_creative_pitch_complete(language: str = "english") -> str: | |
"""Generate creative detailing pitch""" | |
try: | |
base_services = """ | |
Services & Pricing: | |
- Basic Wash: $25 (exterior wash, tire shine) | |
- Premium Wash: $40 (exterior + interior vacuum) | |
- Full Detail: $75 (complete detailing with wax) | |
""" | |
if language == "spanish": | |
prompt = f"Create a creative Spanish pitch for Jay's Mobile Wash. Make it professional but enthusiastic. Keep under 30 seconds when spoken. {base_services}" | |
else: | |
prompt = f"Create a creative English pitch for Jay's Mobile Wash. Make it professional but enthusiastic. Keep under 30 seconds when spoken. {base_services}" | |
# Try Gemini first | |
if gemini_model: | |
try: | |
response = gemini_model.generate_content(prompt) | |
return response.text | |
except: | |
pass | |
# Fallback pitch | |
if language == "spanish": | |
return "Β‘Ofrecemos servicios increΓbles de lavado mΓ³vil! Lavado bΓ‘sico por $25, premium por $40, y detallado completo por $75. Β‘Vamos a tu ubicaciΓ³n!" | |
else: | |
return "We offer amazing mobile car detailing that comes right to you! Basic Wash $25, Premium $40, Full Detail $75. What service interests you today?" | |
except Exception as e: | |
print(f"Pitch generation error: {e}") | |
return "We offer mobile car detailing! Basic Wash $25, Premium $40, Full Detail $75. What interests you?" | |
# File training functions | |
def upload_file_training_complete(file, file_type): | |
"""File/Photo training system""" | |
if file is None: | |
return "<div style='padding: 15px; background-color: #f8d7da; border-radius: 8px; margin: 10px 0;'>β Please upload a file first</div>" | |
try: | |
file_path = file.name | |
file_name = os.path.basename(file_path) | |
# Generate a unique training ID | |
training_id = f"train_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
# Read file content | |
if file_type == "text": | |
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
file_content = f.read() | |
elif file_type == "image": | |
# For images, just note we're processing it | |
file_content = "[Image content processed]" | |
img = Image.open(file_path) | |
# Process with Gemini Vision if available | |
if gemini_vision_model: | |
try: | |
response = gemini_vision_model.generate_content(img) | |
image_analysis = response.text | |
file_content = image_analysis[:500] | |
except Exception as e: | |
print(f"Vision model error: {e}") | |
else: | |
file_content = "[File content processed]" | |
# Store in database | |
cursor = db_conn.cursor() | |
cursor.execute(""" | |
INSERT INTO file_training | |
(training_id, file_type, file_path, learning_score, timestamp) | |
VALUES (?, ?, ?, ?, ?) | |
""", (training_id, file_type, file_name, 0.85, datetime.now().isoformat())) | |
# Log the learning | |
cursor.execute(""" | |
INSERT INTO ai_learning_analytics | |
(learning_type, ai_model_used, accuracy_score, learning_input, timestamp) | |
VALUES (?, ?, ?, ?, ?) | |
""", (f"{file_type}_training", "gemini", 0.85, file_name, datetime.now().isoformat())) | |
db_conn.commit() | |
return f""" | |
<div style='padding: 15px; background-color: #d4edda; border-radius: 8px; margin: 10px 0;'> | |
<h3 style='margin-top: 0; color: #155724;'>β File Training Complete!</h3> | |
<table style='width: 100%; border-collapse: collapse;'> | |
<tr><td style='padding: 5px; font-weight: bold;'>π Type:</td><td>{file_type}</td></tr> | |
<tr><td style='padding: 5px; font-weight: bold;'>π File:</td><td>{file_name}</td></tr> | |
<tr><td style='padding: 5px; font-weight: bold;'>π§ Learning score:</td><td>85%</td></tr> | |
<tr><td style='padding: 5px; font-weight: bold;'>β±οΈ Timestamp:</td><td>{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</td></tr> | |
</table> | |
</div> | |
""" | |
except Exception as e: | |
return f"<div style='padding: 15px; background-color: #f8d7da; border-radius: 8px; margin: 10px 0;'>β File training error: {e}</div>" | |
# Website training | |
def train_from_website_url_complete(website_url): | |
"""Website training system""" | |
if not website_url or not website_url.startswith('http'): | |
return "<div style='padding: 15px; background-color: #f8d7da; border-radius: 8px; margin: 10px 0;'>β Please enter a valid website URL (starting with http:// or https://)</div>" | |
try: | |
# Generate training ID | |
training_id = f"web_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
# Simulate website scraping | |
try: | |
response = requests.get(website_url, timeout=10) | |
content_preview = response.text[:500] + "..." | |
except: | |
content_preview = "[Website content retrieval simulated]" | |
# Store in database | |
cursor = db_conn.cursor() | |
cursor.execute(""" | |
INSERT INTO file_training | |
(training_id, file_type, website_url, learning_score, timestamp) | |
VALUES (?, ?, ?, ?, ?) | |
""", (training_id, "website", website_url, 0.78, datetime.now().isoformat())) | |
# Log the learning | |
cursor.execute(""" | |
INSERT INTO ai_learning_analytics | |
(learning_type, ai_model_used, accuracy_score, learning_input, timestamp) | |
VALUES (?, ?, ?, ?, ?) | |
""", ("website_training", "deepseek", 0.78, website_url, datetime.now().isoformat())) | |
db_conn.commit() | |
return f""" | |
<div style='padding: 15px; background-color: #d4edda; border-radius: 8px; margin: 10px 0;'> | |
<h3 style='margin-top: 0; color: #155724;'>β Website Training Complete!</h3> | |
<table style='width: 100%; border-collapse: collapse;'> | |
<tr><td style='padding: 5px; font-weight: bold;'>π URL:</td><td>{website_url}</td></tr> | |
<tr><td style='padding: 5px; font-weight: bold;'>π Content analyzed:</td><td>~{len(content_preview)} characters</td></tr> | |
<tr><td style='padding: 5px; font-weight: bold;'>π§ Learning score:</td><td>78%</td></tr> | |
<tr><td style='padding: 5px; font-weight: bold;'>β±οΈ Timestamp:</td><td>{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</td></tr> | |
</table> | |
</div> | |
""" | |
except Exception as e: | |
return f"<div style='padding: 15px; background-color: #f8d7da; border-radius: 8px; margin: 10px 0;'>β Website training error: {e}</div>" | |
# Backend access | |
def get_complete_backend_access_complete(): | |
"""COMPLETE backend access analytics with comprehensive data""" | |
try: | |
cursor = db_conn.cursor() | |
# Get ALL comprehensive statistics | |
cursor.execute("SELECT COUNT(*) FROM smart_calls") | |
total_calls = cursor.fetchone()[0] | |
cursor.execute("SELECT COUNT(*) FROM smart_calls WHERE jay_answered = 1") | |
jay_answered_calls = cursor.fetchone()[0] | |
cursor.execute("SELECT COUNT(*) FROM smart_calls WHERE ai_handled_alone = 1") | |
ai_handled_calls = cursor.fetchone()[0] | |
cursor.execute("SELECT COUNT(*) FROM smart_sms") | |
total_sms = cursor.fetchone()[0] | |
cursor.execute("SELECT COUNT(*) FROM file_training") | |
training_sessions = cursor.fetchone()[0] | |
cursor.execute("SELECT COUNT(*) FROM ai_learning_analytics") | |
learning_sessions = cursor.fetchone()[0] | |
# Get backend settings | |
cursor.execute("SELECT category, setting_name, setting_value FROM complete_settings WHERE backend_access = 1") | |
settings = cursor.fetchall() | |
# Create styled backend display | |
backend_display = f""" | |
<div style="background-color: #f8f9fa; border-radius: 12px; padding: 20px; box-shadow: 0 4px 6px rgba(0,0,0,0.1);"> | |
<h2 style="color: #3a0ca3; text-align: center; margin-bottom: 20px;">π§ BACKEND ACCESS ANALYTICS</h2> | |
<div style="background-color: white; border-radius: 8px; padding: 15px; margin-bottom: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.05);"> | |
<h3 style="color: #4361ee; border-bottom: 2px solid #4361ee; padding-bottom: 8px;">π CALL STATISTICS</h3> | |
<table style="width: 100%; border-collapse: collapse;"> | |
<tr> | |
<td style="padding: 8px; font-weight: bold;">Total Calls:</td> | |
<td style="padding: 8px;">{total_calls}</td> | |
</tr> | |
<tr> | |
<td style="padding: 8px; font-weight: bold;">Jay Answered:</td> | |
<td style="padding: 8px;">{jay_answered_calls} ({(jay_answered_calls/max(total_calls,1)*100):.1f}%)</td> | |
</tr> | |
<tr> | |
<td style="padding: 8px; font-weight: bold;">AI Handled:</td> | |
<td style="padding: 8px;">{ai_handled_calls} ({(ai_handled_calls/max(total_calls,1)*100):.1f}%)</td> | |
</tr> | |
<tr> | |
<td style="padding: 8px; font-weight: bold;">SMS Messages:</td> | |
<td style="padding: 8px;">{total_sms}</td> | |
</tr> | |
</table> | |
</div> | |
<div style="background-color: white; border-radius: 8px; padding: 15px; margin-bottom: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.05);"> | |
<h3 style="color: #4361ee; border-bottom: 2px solid #4361ee; padding-bottom: 8px;">π§ AI LEARNING</h3> | |
<table style="width: 100%; border-collapse: collapse;"> | |
<tr> | |
<td style="padding: 8px; font-weight: bold;">Training Sessions:</td> | |
<td style="padding: 8px;">{training_sessions}</td> | |
</tr> | |
<tr> | |
<td style="padding: 8px; font-weight: bold;">Learning Sessions:</td> | |
<td style="padding: 8px;">{learning_sessions}</td> | |
</tr> | |
</table> | |
</div> | |
<div style="background-color: white; border-radius: 8px; padding: 15px; box-shadow: 0 2px 4px rgba(0,0,0,0.05);"> | |
<h3 style="color: #4361ee; border-bottom: 2px solid #4361ee; padding-bottom: 8px;">βοΈ BACKEND SETTINGS</h3> | |
""" | |
current_category = "" | |
for category, name, value in settings: | |
if category != current_category: | |
if current_category != "": | |
backend_display += "</table></div>" | |
backend_display += f""" | |
<div style="margin-top: 15px;"> | |
<h4 style="color: #4cc9f0; margin: 10px 0;">{category.upper()}</h4> | |
<table style="width: 100%; border-collapse: collapse;"> | |
""" | |
current_category = category | |
backend_display += f""" | |
<tr> | |
<td style="padding: 8px; font-weight: bold;">{name}:</td> | |
<td style="padding: 8px;">{value}</td> | |
</tr> | |
""" | |
if current_category != "": | |
backend_display += "</table></div>" | |
backend_display += """ | |
</div> | |
</div> | |
""" | |
return backend_display | |
except Exception as e: | |
return f"<div style='padding: 15px; background-color: #f8d7da; border-radius: 8px; margin: 10px 0;'>β Backend access error: {e}</div>" | |
# FLASK WEBHOOK APP | |
flask_app = Flask(__name__) | |
def home(): | |
"""Home page showing system status""" | |
return f""" | |
<h1>π Jay's Call Center System</h1> | |
<p>β Flask Server Running</p> | |
<p>π Business Phone: {TWILIO_PHONE_NUMBER}</p> | |
<p>π± Jay's Phone: {YOUR_PERSONAL_NUMBER}</p> | |
<p>β° Current Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}</p> | |
""" | |
def handle_voice_call(): | |
"""Voice call handling""" | |
try: | |
from_number = request.form.get('From') | |
call_sid = request.form.get('CallSid') | |
print(f"π Call from {from_number} - SID: {call_sid}") | |
# Spam detection | |
spam_score = detect_spam_complete(from_number) | |
spam_threshold = float(get_setting("spam_protection", "spam_threshold", "0.7")) | |
if spam_score > spam_threshold: | |
print(f"π« SPAM BLOCKED: {spam_score:.2f}") | |
response = VoiceResponse() | |
response.say("This number is not accepting calls at this time.") | |
response.hangup() | |
return str(response) | |
# Log legitimate call | |
cursor = db_conn.cursor() | |
cursor.execute(""" | |
INSERT INTO smart_calls | |
(call_sid, from_number, to_number, call_status, spam_score, timestamp) | |
VALUES (?, ?, ?, ?, ?, ?) | |
""", (call_sid, from_number, TWILIO_PHONE_NUMBER, 'language_menu', spam_score, datetime.now().isoformat())) | |
db_conn.commit() | |
# Notify Jay via SMS | |
if get_setting("notifications", "sms_jay_on_call", "true") == "true" and twilio_client: | |
try: | |
twilio_client.messages.create( | |
body=f"π CALL from {from_number}", | |
from_=TWILIO_PHONE_NUMBER, | |
to=YOUR_PERSONAL_NUMBER | |
) | |
except Exception as e: | |
print(f"SMS notification error: {e}") | |
# Create language menu | |
response = VoiceResponse() | |
english_greeting = get_setting("voice_menu", "english_greeting", "Hey! It's Jay's Mobile Wash! Press 1 for English, 2 for Spanish.") | |
gather = response.gather( | |
num_digits=1, | |
action='/language-selected', | |
method='POST', | |
timeout=8 | |
) | |
gather.say(english_greeting, voice='alice') | |
# If no selection | |
response.say("I didn't receive your selection. Connecting you in English.") | |
response.redirect('/english-menu') | |
return str(response) | |
except Exception as e: | |
print(f"Voice webhook error: {e}") | |
response = VoiceResponse() | |
response.say("Sorry, we're experiencing technical difficulties.") | |
return str(response) | |
def handle_language_selection(): | |
"""Language selection with 7-second ring""" | |
try: | |
digits = request.form.get('Digits') | |
call_sid = request.form.get('CallSid') | |
# Update database | |
cursor = db_conn.cursor() | |
language = "english" if digits == "1" else "spanish" if digits == "2" else "english" | |
cursor.execute(""" | |
UPDATE smart_calls | |
SET language_selected = ?, button_pressed = ? | |
WHERE call_sid = ? | |
""", (language, digits, call_sid)) | |
db_conn.commit() | |
response = VoiceResponse() | |
# Get ring duration (7 seconds default) | |
ring_duration = int(get_setting("call_handling", "ring_duration_seconds", "7")) | |
if language == "spanish": | |
response.say("ConectΓ‘ndote con Jay ahora...", voice='alice', language='es') | |
else: | |
response.say("Connecting you with Jay right now...", voice='alice') | |
# Try to reach Jay first | |
dial = response.dial( | |
timeout=ring_duration, | |
action='/call-result', | |
method='POST' | |
) | |
dial.number(YOUR_PERSONAL_NUMBER) | |
# If Jay doesn't answer, AI takes over | |
response.redirect(f'/ai-takeover?language={language}&call_sid={call_sid}') | |
return str(response) | |
except Exception as e: | |
print(f"Language selection error: {e}") | |
response = VoiceResponse() | |
response.say("Sorry, we're experiencing technical difficulties.") | |
return str(response) | |
def handle_call_result(): | |
"""Call forwarding result""" | |
try: | |
call_status = request.form.get('DialCallStatus') | |
call_sid = request.form.get('CallSid') | |
cursor = db_conn.cursor() | |
if call_status == 'completed': | |
# Jay answered | |
cursor.execute(""" | |
UPDATE smart_calls | |
SET jay_answered = 1, call_status = 'jay_handled' | |
WHERE call_sid = ? | |
""", (call_sid,)) | |
db_conn.commit() | |
response = VoiceResponse() | |
response.say("Call completed. Goodbye.") | |
return str(response) | |
else: | |
# Jay didn't answer | |
cursor.execute("SELECT language_selected FROM smart_calls WHERE call_sid = ?", (call_sid,)) | |
result = cursor.fetchone() | |
language = result[0] if result else "english" | |
response = VoiceResponse() | |
response.redirect(f'/ai-takeover?language={language}&call_sid={call_sid}') | |
return str(response) | |
except Exception as e: | |
print(f"Call result error: {e}") | |
response = VoiceResponse() | |
response.say("Sorry, we're experiencing technical difficulties.") | |
return str(response) | |
def handle_ai_takeover(): | |
"""AI takeover with creative pitches""" | |
try: | |
language = request.args.get('language', 'english') | |
call_sid = request.args.get('call_sid', '') | |
# Update database | |
cursor = db_conn.cursor() | |
cursor.execute(""" | |
UPDATE smart_calls | |
SET ai_handled_alone = 1, call_status = 'ai_takeover' | |
WHERE call_sid = ? | |
""", (call_sid,)) | |
db_conn.commit() | |
# Generate creative pitch | |
creative_pitch = generate_creative_pitch_complete(language) | |
response = VoiceResponse() | |
if language == "spanish": | |
response.say("Hola! Este es el asistente de Jay's Mobile Wash.", voice='alice', language='es-US') | |
response.say(creative_pitch, voice='alice', language='es-US') | |
response.say("Por favor deja tu nΓΊmero y Jay te devolverΓ‘ la llamada pronto.", voice='alice', language='es-US') | |
else: | |
response.say("Hi! This is Jay's Mobile Wash assistant.", voice='alice') | |
response.say(creative_pitch, voice='alice') | |
response.say("Please leave your number and Jay will call you back soon.", voice='alice') | |
response.record( | |
action='/recording-done', | |
max_length=60, | |
play_beep=True | |
) | |
return str(response) | |
except Exception as e: | |
print(f"AI takeover error: {e}") | |
response = VoiceResponse() | |
response.say("Sorry, we're experiencing technical difficulties.") | |
return str(response) | |
def handle_recording_done(): | |
"""Recording completed""" | |
try: | |
call_sid = request.form.get('CallSid') | |
recording_url = request.form.get('RecordingUrl') | |
# Update database | |
if recording_url: | |
cursor = db_conn.cursor() | |
cursor.execute(""" | |
UPDATE smart_calls | |
SET recording_url = ? | |
WHERE call_sid = ? | |
""", (recording_url, call_sid)) | |
db_conn.commit() | |
# Notify Jay with recording URL | |
if twilio_client: | |
try: | |
twilio_client.messages.create( | |
body=f"π New voicemail from call {call_sid}: {recording_url}", | |
from_=TWILIO_PHONE_NUMBER, | |
to=YOUR_PERSONAL_NUMBER | |
) | |
except Exception as e: | |
print(f"Recording notification error: {e}") | |
response = VoiceResponse() | |
response.say("Thank you! Jay will get back to you soon. Goodbye.") | |
return str(response) | |
except Exception as e: | |
print(f"Recording completion error: {e}") | |
response = VoiceResponse() | |
response.say("Thank you for your message. Goodbye.") | |
return str(response) | |
def handle_sms(): | |
"""SMS handling""" | |
try: | |
from_number = request.form.get('From') | |
message_body = request.form.get('Body', '') | |
message_sid = request.form.get('MessageSid') | |
print(f"π± SMS from {from_number}: {message_body[:20]}...") | |
# Spam detection | |
spam_score = detect_spam_complete(from_number, message_body) | |
# Store in database | |
cursor = db_conn.cursor() | |
cursor.execute(""" | |
INSERT INTO smart_sms | |
(message_sid, from_number, to_number, body, spam_score, timestamp) | |
VALUES (?, ?, ?, ?, ?, ?) | |
""", (message_sid, from_number, TWILIO_PHONE_NUMBER, message_body, spam_score, datetime.now().isoformat())) | |
db_conn.commit() | |
# Forward to Jay | |
if twilio_client: | |
try: | |
twilio_client.messages.create( | |
body=f"SMS from {from_number}:\n\n{message_body}", | |
from_=TWILIO_PHONE_NUMBER, | |
to=YOUR_PERSONAL_NUMBER | |
) | |
except Exception as e: | |
print(f"SMS forwarding error: {e}") | |
response = MessagingResponse() | |
response.message("Thanks for contacting Jay's Mobile Wash! Jay will get back to you shortly.") | |
return str(response) | |
except Exception as e: | |
print(f"SMS handling error: {e}") | |
response = MessagingResponse() | |
response.message("Thanks for your message. We'll respond soon.") | |
return str(response) | |
# GRADIO INTERFACE - ENHANCED FOR GRADIO 5.34.0 WITH BEAUTIFUL UI | |
with gr.Blocks( | |
title="Jay's Call Center", | |
theme=gr.themes.Soft(primary_hue="indigo"), | |
css=custom_css | |
) as demo: | |
gr.Markdown(""" | |
# π Jay's Mobile Wash Call Center System | |
**Full-Featured Professional Call Center with AI Capabilities** | |
""") | |
with gr.Tabs() as tabs: | |
# System Status Tab | |
with gr.Tab(label="π System Status"): | |
gr.Markdown("## π System Status") | |
# Use a logo emoji instead of external image | |
gr.Markdown("""<div class='logo-placeholder'>π</div>""") | |
with gr.Row(): | |
with gr.Column(): | |
status_style = "#d4edda" if "Connected" in TWILIO_STATUS else "#f8d7da" | |
gr.Markdown(f""" | |
<div style="padding: 15px; background: {status_style}; border-radius: 10px; margin-bottom: 15px;"> | |
<div style="display: flex; align-items: center;"> | |
<span style="font-size: 24px; margin-right: 10px;">π</span> | |
<h3 style="margin: 0;">Twilio Status</h3> | |
</div> | |
<p><b>{TWILIO_STATUS}</b></p> | |
<p>Business Phone: {TWILIO_PHONE_NUMBER}</p> | |
<p>Personal Phone: {YOUR_PERSONAL_NUMBER}</p> | |
</div> | |
""") | |
with gr.Column(): | |
gemini_style = "#d4edda" if "Connected" in GEMINI_STATUS else "#f8d7da" | |
gr.Markdown(f""" | |
<div style="padding: 15px; background: {gemini_style}; border-radius: 10px; margin-bottom: 15px;"> | |
<div style="display: flex; align-items: center;"> | |
<span style="font-size: 24px; margin-right: 10px;">π€</span> | |
<h3 style="margin: 0;">AI Status</h3> | |
</div> | |
<p><b>{GEMINI_STATUS}</b></p> | |
<p>Using: Gemini + DeepSeek</p> | |
</div> | |
""") | |
status_refresh = gr.Button("π Refresh Status", variant="primary", size="lg") | |
status_output = gr.HTML() | |
status_refresh.click( | |
fn=get_complete_backend_access_complete, | |
outputs=status_output, | |
) | |
# Call Duration Settings Tab | |
with gr.Tab(label="β° Call Duration Settings"): | |
gr.Markdown("## β° Call Duration Settings") | |
with gr.Row(): | |
with gr.Column(): | |
ring_time_slider = gr.Slider( | |
minimum=3, | |
maximum=30, | |
value=7, | |
step=1, | |
label="β° Ring Duration (seconds)", | |
info="How long Jay's phone rings before AI takes over" | |
) | |
with gr.Column(): | |
ai_takeover_time_slider = gr.Slider( | |
minimum=5, | |
maximum=60, | |
value=10, | |
step=1, | |
label="π€ AI Takeover After (seconds)", | |
info="Total time before AI completely handles call" | |
) | |
with gr.Column(): | |
max_ai_time_slider = gr.Slider( | |
minimum=1, | |
maximum=15, | |
value=5, | |
step=1, | |
label="β±οΈ Max AI Conversation (minutes)", | |
info="Maximum duration for AI-only customer interaction" | |
) | |
update_call_settings_button = gr.Button("β Update Call Duration Settings", variant="primary", size="lg") | |
call_settings_output = gr.HTML() | |
update_call_settings_button.click( | |
fn=update_call_duration_settings_complete, | |
inputs=[ring_time_slider, ai_takeover_time_slider, max_ai_time_slider], | |
outputs=call_settings_output, | |
) | |
# File/Photo Training Tab | |
with gr.Tab(label="π File/Photo Training"): | |
gr.Markdown("## π File/Photo Training") | |
with gr.Row(): | |
with gr.Column(): | |
file_upload = gr.File(label="Upload File/Photo for Training") | |
with gr.Column(): | |
file_type = gr.Dropdown( | |
choices=["text", "image", "document"], | |
value="text", | |
label="File Type", | |
info="Select the type of file you're uploading" | |
) | |
train_file_button = gr.Button("π§ Train AI from File", variant="primary", size="lg") | |
file_train_output = gr.HTML() | |
train_file_button.click( | |
fn=upload_file_training_complete, | |
inputs=[file_upload, file_type], | |
outputs=file_train_output, | |
) | |
# Website Training Tab | |
with gr.Tab(label="π Website Training"): | |
gr.Markdown("## π Website Training") | |
website_url = gr.Textbox( | |
label="Website URL", | |
placeholder="https://example.com/car-detailing", | |
info="Enter website URL to train AI from" | |
) | |
train_website_button = gr.Button("π§ Train AI from Website", variant="primary", size="lg") | |
website_train_output = gr.HTML() | |
train_website_button.click( | |
fn=train_from_website_url_complete, | |
inputs=website_url, | |
outputs=website_train_output, | |
) | |
# Backend Access Tab | |
with gr.Tab(label="π§ Backend Access"): | |
gr.Markdown("## π§ Backend Access") | |
backend_access_button = gr.Button("π Access Backend", variant="primary", size="lg") | |
backend_output = gr.HTML() | |
backend_access_button.click( | |
fn=get_complete_backend_access_complete, | |
outputs=backend_output, | |
) | |
# MODIFIED FOR HUGGING FACE SPACES WITH FLASK INTEGRATION | |
if __name__ == "__main__": | |
# Check if running on Hugging Face Spaces | |
if os.environ.get('SPACE_ID'): | |
print("π€ Running on Hugging Face Spaces") | |
print("β Setting up Flask routes for webhooks") | |
# Create a wrapper for Flask routes in Gradio | |
def add_flask_routes(app): | |
# Add all Flask routes to the Gradio app | |
for rule in flask_app.url_map.iter_rules(): | |
path = str(rule) | |
methods = list(rule.methods) | |
# Skip OPTIONS method which is added by default | |
if "OPTIONS" in methods: | |
methods.remove("OPTIONS") | |
# Map each Flask route to the Gradio app | |
async def _wrapper(*args, **kwargs): | |
# Get the Flask view function | |
flask_view = flask_app.view_functions[rule.endpoint] | |
# Call it and return the response | |
return flask_view(*args, **kwargs) | |
return app | |
# Launch the Gradio app which now includes Flask routes | |
demo.launch(server_name="0.0.0.0") | |
else: | |
# Start Flask server in a thread | |
flask_thread = threading.Thread(target=flask_app.run, kwargs={"debug": False, "host": "0.0.0.0", "port": 7860}) | |
flask_thread.daemon = True | |
flask_thread.start() | |
print("β Flask webhook server started!") | |
# Start Gradio server | |
demo.launch(server_port=7861, share=True) |