#!/usr/bin/env python3 """ πŸš— JAY'S MOBILE WASH - COMPLETE AI SYSTEM WITH IPHONE FORWARDING Maximum effort implementation with all features and iPhone call forwarding! Built for HuggingFace Spaces deployment. """ import os import json import requests import sqlite3 import pandas as pd from datetime import datetime, timedelta, timezone from flask import Flask, render_template, request, jsonify, redirect, url_for, session, flash, send_file, make_response from werkzeug.security import generate_password_hash, check_password_hash from functools import wraps import logging import threading import time import uuid from signalwire.rest import Client from signalwire.voice_response import VoiceResponse from signalwire.messaging_response import MessagingResponse import speech_recognition as sr import pyttsx3 from textblob import TextBlob import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from sklearn.cluster import KMeans from sklearn.preprocessing import StandardScaler import pickle import asyncio import aiohttp from collections import defaultdict, deque import re from urllib.parse import quote_plus, unquote import hashlib import hmac import base64 import io import wave import matplotlib matplotlib.use('Agg') # Use non-interactive backend for HuggingFace import matplotlib.pyplot as plt import seaborn as sns from matplotlib.backends.backend_agg import FigureCanvasAgg import plotly.graph_objs as go import plotly.express as px from plotly.utils import PlotlyJSONEncoder import concurrent.futures import multiprocessing from threading import Lock, Thread import schedule import smtplib from email.mime.text import MimeText from email.mime.multipart import MimeMultipart import phonenumbers from phonenumbers import geocoder, carrier import geocoder as geo from geopy.distance import geodesic import folium from folium import plugins from werkzeug.utils import secure_filename from PIL import Image, ImageDraw, ImageFont import qrcode from reportlab.pdfgen import canvas from reportlab.lib.pagesizes import letter, A4 from reportlab.lib import colors from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.units import inch import csv from io import StringIO, BytesIO import zipfile import tempfile import shutil import subprocess import sys from pathlib import Path import yaml import toml from flask_socketio import SocketIO, emit, join_room, leave_room from flask_cors import CORS from dataclasses import dataclass, field from typing import List, Dict, Optional, Union, Any import click from click import echo, style import dotenv from dotenv import load_dotenv, find_dotenv import structlog import rich from rich.console import Console from rich.table import Table from rich.progress import Progress, SpinnerColumn, TextColumn from rich.live import Live from rich.panel import Panel from rich.layout import Layout from rich.text import Text from rich.markdown import Markdown import platform import socket import psutil from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger import joblib import torch from transformers import pipeline, AutoTokenizer, AutoModel import instructor from pydantic import BaseModel, Field import streamlit as st from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPBearer from fastapi.responses import HTMLResponse, FileResponse, StreamingResponse import uvicorn from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Boolean, Text, ForeignKey from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, Session, relationship import alembic from pymongo import MongoClient import sentry_sdk from sentry_sdk.integrations.flask import FlaskIntegration import newrelic.agent import prometheus_client from prometheus_client import Counter, Histogram, Gauge, start_http_server import jaeger_client from jaeger_client import Config as JaegerConfig import datadog from datadog import initialize, statsd import rollbar import bugsnag from bugsnag.flask import handle_exceptions import marshmallow from marshmallow import Schema, fields, validate, ValidationError import attrs from attrs import define, field as attrs_field # ===== FLASK APP INITIALIZATION ===== app = Flask(__name__, template_folder='templates', static_folder='static', static_url_path='/static', instance_relative_config=True) # Enhanced Flask configuration for production deployment app.config.update({ 'SECRET_KEY': os.environ.get('SECRET_KEY', 'jays-mobile-wash-super-secret-production-key-2025'), 'DEBUG': os.environ.get('FLASK_DEBUG', 'False').lower() == 'true', 'TESTING': False, 'PROPAGATE_EXCEPTIONS': True, 'PRESERVE_CONTEXT_ON_EXCEPTION': None, 'SESSION_COOKIE_SECURE': True, 'SESSION_COOKIE_HTTPONLY': True, 'SESSION_COOKIE_SAMESITE': 'Lax', 'PERMANENT_SESSION_LIFETIME': timedelta(hours=24), 'MAX_CONTENT_LENGTH': 32 * 1024 * 1024, # 32MB max file upload 'JSON_SORT_KEYS': False, 'JSONIFY_PRETTYPRINT_REGULAR': True, 'TEMPLATES_AUTO_RELOAD': True, 'SEND_FILE_MAX_AGE_DEFAULT': timedelta(hours=1), 'PREFERRED_URL_SCHEME': 'https', 'APPLICATION_ROOT': '/', 'SERVER_NAME': None, 'USE_RELOADER': True, 'USE_DEBUGGER': False, 'USE_EVALEX': True, 'TRAP_HTTP_EXCEPTIONS': False, 'TRAP_BAD_REQUEST_ERRORS': None, 'JSON_AS_ASCII': True, 'JSONIFY_MIMETYPE': 'application/json' }) # Initialize extensions CORS(app, resources={ r"/api/*": {"origins": "*"}, r"/webhook/*": {"origins": "*"}, r"/voice/*": {"origins": "*"}, r"/sms/*": {"origins": "*"} }) socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading', logger=True, engineio_logger=True, ping_timeout=60, ping_interval=25) # ===== ENVIRONMENT CONFIGURATION ===== load_dotenv(find_dotenv()) @dataclass class EnvironmentConfig: """Complete environment configuration with validation""" # ===== SIGNALWIRE CONFIGURATION ===== SIGNALWIRE_PROJECT_ID: str = os.environ.get('SIGNALWIRE_PROJECT_ID', '') SIGNALWIRE_AUTH_TOKEN: str = os.environ.get('SIGNALWIRE_AUTH_TOKEN', '') SIGNALWIRE_SPACE_URL: str = os.environ.get('SIGNALWIRE_SPACE_URL', '') # ===== DEEPSEEK API ===== DEEPSEEK_API_KEY: str = os.environ.get('DEEPSEEK_API_KEY', '') # ===== HUGGINGFACE APIS - ALL 15 ===== HF_TRANSFORMERS_API_KEY: str = os.environ.get('HF_TRANSFORMERS_API_KEY', '') HF_SENTENCE_TRANSFORMERS_API_KEY: str = os.environ.get('HF_SENTENCE_TRANSFORMERS_API_KEY', '') HF_SUMMARY_API_KEY: str = os.environ.get('HF_SUMMARY_API_KEY', '') HF_GPT_NEO_API_KEY: str = os.environ.get('HF_GPT_NEO_API_KEY', '') HF_DISTILBERT_SENTIMENT_API_KEY: str = os.environ.get('HF_DISTILBERT_SENTIMENT_API_KEY', '') HF_WHISPER_API_KEY: str = os.environ.get('HF_WHISPER_API_KEY', '') HF_CLAP_API_KEY: str = os.environ.get('HF_CLAP_API_KEY', '') HF_WAV2VEC2_API_KEY: str = os.environ.get('HF_WAV2VEC2_API_KEY', '') HF_SPEECH_GENERATION_API_KEY: str = os.environ.get('HF_SPEECH_GENERATION_API_KEY', '') HF_KEYBERT_API_KEY: str = os.environ.get('HF_KEYBERT_API_KEY', '') HF_LLAMA_API_KEY: str = os.environ.get('HF_LLAMA_API_KEY', '') HF_BERT_BASE_API_KEY: str = os.environ.get('HF_BERT_BASE_API_KEY', '') HF_AUTO_TEMPLATE_API_KEY: str = os.environ.get('HF_AUTO_TEMPLATE_API_KEY', '') HF_MULTILINGUAL_API_KEY: str = os.environ.get('HF_MULTILINGUAL_API_KEY', '') HF_DEBERTA_API_KEY: str = os.environ.get('HF_DEBERTA_API_KEY', '') # ===== ADDITIONAL AI APIS ===== OPENAI_API_KEY: str = os.environ.get('OPENAI_API_KEY', '') ANTHROPIC_API_KEY: str = os.environ.get('ANTHROPIC_API_KEY', '') COHERE_API_KEY: str = os.environ.get('COHERE_API_KEY', '') GROQ_API_KEY: str = os.environ.get('GROQ_API_KEY', '') TOGETHER_API_KEY: str = os.environ.get('TOGETHER_API_KEY', '') REPLICATE_API_TOKEN: str = os.environ.get('REPLICATE_API_TOKEN', '') MISTRAL_API_KEY: str = os.environ.get('MISTRAL_API_KEY', '') GOOGLE_API_KEY: str = os.environ.get('GOOGLE_API_KEY', '') # ===== PHONE NUMBERS ===== JAY_PHONE: str = os.environ.get('JAY_PHONE', '+15622289429') # Jay's iPhone SIGNALWIRE_PHONE: str = os.environ.get('SIGNALWIRE_PHONE', '+17149278841') # AI number # ===== IPHONE FORWARDING SETTINGS ===== IPHONE_FORWARDING_ENABLED: bool = os.environ.get('IPHONE_FORWARDING_ENABLED', 'True').lower() == 'true' FORWARDING_DELAY_SECONDS: int = int(os.environ.get('FORWARDING_DELAY_SECONDS', '20')) FORWARDING_TYPE: str = os.environ.get('FORWARDING_TYPE', 'unanswered') # unanswered, busy, unreachable, all # ===== DATABASE CONFIGURATION ===== DATABASE_URL: str = os.environ.get('DATABASE_URL', 'sqlite:///jays_mobile_wash.db') REDIS_URL: str = os.environ.get('REDIS_URL', 'redis://localhost:6379/0') MONGODB_URL: str = os.environ.get('MONGODB_URL', 'mongodb://localhost:27017/jays_mobile_wash') # ===== CLOUD STORAGE ===== AWS_ACCESS_KEY_ID: str = os.environ.get('AWS_ACCESS_KEY_ID', '') AWS_SECRET_ACCESS_KEY: str = os.environ.get('AWS_SECRET_ACCESS_KEY', '') AWS_S3_BUCKET: str = os.environ.get('AWS_S3_BUCKET', '') AWS_REGION: str = os.environ.get('AWS_REGION', 'us-east-1') # ===== GOOGLE CLOUD ===== GOOGLE_CLOUD_PROJECT: str = os.environ.get('GOOGLE_CLOUD_PROJECT', '') GOOGLE_APPLICATION_CREDENTIALS: str = os.environ.get('GOOGLE_APPLICATION_CREDENTIALS', '') # ===== MONITORING AND ANALYTICS ===== SENTRY_DSN: str = os.environ.get('SENTRY_DSN', '') DATADOG_API_KEY: str = os.environ.get('DATADOG_API_KEY', '') NEWRELIC_LICENSE_KEY: str = os.environ.get('NEWRELIC_LICENSE_KEY', '') ROLLBAR_ACCESS_TOKEN: str = os.environ.get('ROLLBAR_ACCESS_TOKEN', '') BUGSNAG_API_KEY: str = os.environ.get('BUGSNAG_API_KEY', '') # ===== EMAIL CONFIGURATION ===== SMTP_SERVER: str = os.environ.get('SMTP_SERVER', 'smtp.gmail.com') SMTP_PORT: int = int(os.environ.get('SMTP_PORT', '587')) SMTP_USERNAME: str = os.environ.get('SMTP_USERNAME', '') SMTP_PASSWORD: str = os.environ.get('SMTP_PASSWORD', '') ADMIN_EMAIL: str = os.environ.get('ADMIN_EMAIL', 'jay@mobilewash.com') # ===== BUSINESS CONFIGURATION ===== BUSINESS_NAME: str = os.environ.get('BUSINESS_NAME', "Jay's Mobile Wash") BUSINESS_PHONE: str = os.environ.get('BUSINESS_PHONE', '+15622289429') BUSINESS_EMAIL: str = os.environ.get('BUSINESS_EMAIL', 'jay@mobilewash.com') BUSINESS_ADDRESS: str = os.environ.get('BUSINESS_ADDRESS', 'Los Angeles, CA') SERVICE_RADIUS_MILES: int = int(os.environ.get('SERVICE_RADIUS_MILES', '15')) # ===== BUSINESS HOURS ===== BUSINESS_HOURS_WEEKDAY: str = os.environ.get('BUSINESS_HOURS_WEEKDAY', '8:00 AM - 6:00 PM') BUSINESS_HOURS_SATURDAY: str = os.environ.get('BUSINESS_HOURS_SATURDAY', '8:00 AM - 6:00 PM') BUSINESS_HOURS_SUNDAY: str = os.environ.get('BUSINESS_HOURS_SUNDAY', '10:00 AM - 4:00 PM') # ===== PRICING CONFIGURATION ===== BASIC_WASH_PRICE: float = float(os.environ.get('BASIC_WASH_PRICE', '25.00')) PREMIUM_WASH_PRICE: float = float(os.environ.get('PREMIUM_WASH_PRICE', '45.00')) FULL_DETAIL_PRICE: float = float(os.environ.get('FULL_DETAIL_PRICE', '85.00')) CERAMIC_COATING_PRICE: float = float(os.environ.get('CERAMIC_COATING_PRICE', '150.00')) HEADLIGHT_RESTORATION_PRICE: float = float(os.environ.get('HEADLIGHT_RESTORATION_PRICE', '35.00')) # ===== SYSTEM PERFORMANCE ===== MAX_CONCURRENT_CALLS: int = int(os.environ.get('MAX_CONCURRENT_CALLS', '50')) MAX_CONCURRENT_SMS: int = int(os.environ.get('MAX_CONCURRENT_SMS', '100')) API_RATE_LIMIT_PER_MINUTE: int = int(os.environ.get('API_RATE_LIMIT_PER_MINUTE', '1000')) CACHE_TTL_SECONDS: int = int(os.environ.get('CACHE_TTL_SECONDS', '3600')) # ===== AI CONFIGURATION ===== AI_RESPONSE_TIMEOUT: int = int(os.environ.get('AI_RESPONSE_TIMEOUT', '30')) WHISPER_MODEL: str = os.environ.get('WHISPER_MODEL', 'openai/whisper-large-v3') SENTIMENT_MODEL: str = os.environ.get('SENTIMENT_MODEL', 'distilbert-base-uncased-finetuned-sst-2-english') LANGUAGE_MODEL: str = os.environ.get('LANGUAGE_MODEL', 'deepseek-chat') # ===== FEATURE FLAGS ===== ENABLE_VOICE_CLONING: bool = os.environ.get('ENABLE_VOICE_CLONING', 'False').lower() == 'true' ENABLE_MULTILINGUAL: bool = os.environ.get('ENABLE_MULTILINGUAL', 'True').lower() == 'true' ENABLE_REAL_TIME_LEARNING: bool = os.environ.get('ENABLE_REAL_TIME_LEARNING', 'True').lower() == 'true' ENABLE_PREDICTIVE_SCHEDULING: bool = os.environ.get('ENABLE_PREDICTIVE_SCHEDULING', 'True').lower() == 'true' ENABLE_DYNAMIC_PRICING: bool = os.environ.get('ENABLE_DYNAMIC_PRICING', 'False').lower() == 'true' ENABLE_WEATHER_INTEGRATION: bool = os.environ.get('ENABLE_WEATHER_INTEGRATION', 'True').lower() == 'true' ENABLE_LOCATION_TRACKING: bool = os.environ.get('ENABLE_LOCATION_TRACKING', 'True').lower() == 'true' ENABLE_CUSTOMER_SENTIMENT_ROUTING: bool = os.environ.get('ENABLE_CUSTOMER_SENTIMENT_ROUTING', 'True').lower() == 'true' # ===== SECURITY CONFIGURATION ===== JWT_SECRET_KEY: str = os.environ.get('JWT_SECRET_KEY', 'jwt-secret-key-change-in-production') WEBHOOK_SECRET: str = os.environ.get('WEBHOOK_SECRET', 'webhook-secret-key-change-in-production') API_KEY_ENCRYPTION_KEY: str = os.environ.get('API_KEY_ENCRYPTION_KEY', 'encryption-key-change-in-production') # ===== DEVELOPMENT SETTINGS ===== DEVELOPMENT_MODE: bool = os.environ.get('DEVELOPMENT_MODE', 'False').lower() == 'true' DEBUG_API_CALLS: bool = os.environ.get('DEBUG_API_CALLS', 'False').lower() == 'true' MOCK_EXTERNAL_APIs: bool = os.environ.get('MOCK_EXTERNAL_APIs', 'False').lower() == 'true' LOG_LEVEL: str = os.environ.get('LOG_LEVEL', 'INFO') def validate(self) -> bool: """Validate critical configuration parameters""" errors = [] # SignalWire validation if not self.SIGNALWIRE_PROJECT_ID: errors.append("SIGNALWIRE_PROJECT_ID is required") if not self.SIGNALWIRE_AUTH_TOKEN: errors.append("SIGNALWIRE_AUTH_TOKEN is required") if not self.SIGNALWIRE_SPACE_URL: errors.append("SIGNALWIRE_SPACE_URL is required") # Phone number validation if not self.JAY_PHONE: errors.append("JAY_PHONE is required") if not self.SIGNALWIRE_PHONE: errors.append("SIGNALWIRE_PHONE is required") # Business validation if self.FORWARDING_DELAY_SECONDS < 5 or self.FORWARDING_DELAY_SECONDS > 60: errors.append("FORWARDING_DELAY_SECONDS must be between 5 and 60") if errors: error_message = f"Configuration validation failed: {', '.join(errors)}" raise ValueError(error_message) return True def get_hf_api_keys(self) -> Dict[str, str]: """Get all HuggingFace API keys""" return { 'transformers': self.HF_TRANSFORMERS_API_KEY, 'sentence_transformers': self.HF_SENTENCE_TRANSFORMERS_API_KEY, 'summary': self.HF_SUMMARY_API_KEY, 'gpt_neo': self.HF_GPT_NEO_API_KEY, 'distilbert_sentiment': self.HF_DISTILBERT_SENTIMENT_API_KEY, 'whisper': self.HF_WHISPER_API_KEY, 'clap': self.HF_CLAP_API_KEY, 'wav2vec2': self.HF_WAV2VEC2_API_KEY, 'speech_generation': self.HF_SPEECH_GENERATION_API_KEY, 'keybert': self.HF_KEYBERT_API_KEY, 'llama': self.HF_LLAMA_API_KEY, 'bert_base': self.HF_BERT_BASE_API_KEY, 'auto_template': self.HF_AUTO_TEMPLATE_API_KEY, 'multilingual': self.HF_MULTILINGUAL_API_KEY, 'deberta': self.HF_DEBERTA_API_KEY } def get_service_pricing(self) -> Dict[str, Dict[str, Union[float, int, str]]]: """Get complete service pricing configuration""" return { 'basic_wash': { 'price': self.BASIC_WASH_PRICE, 'duration': 30, 'description': 'Exterior wash and dry', 'category': 'basic' }, 'premium_wash': { 'price': self.PREMIUM_WASH_PRICE, 'duration': 45, 'description': 'Exterior wash, wax, interior vacuum', 'category': 'premium' }, 'full_detail': { 'price': self.FULL_DETAIL_PRICE, 'duration': 90, 'description': 'Complete interior and exterior detailing', 'category': 'premium' }, 'ceramic_coating': { 'price': self.CERAMIC_COATING_PRICE, 'duration': 120, 'description': 'Premium ceramic protection coating', 'category': 'premium' }, 'headlight_restoration': { 'price': self.HEADLIGHT_RESTORATION_PRICE, 'duration': 30, 'description': 'Professional headlight cleaning and restoration', 'category': 'specialty' } } # Initialize configuration config = EnvironmentConfig() try: config.validate() console.print("βœ… Configuration validated successfully", style="green") except ValueError as e: console.print(f"❌ Configuration validation failed: {e}", style="red") sys.exit(1) # ===== SIGNALWIRE CLIENT INITIALIZATION ===== try: signalwire_client = Client( config.SIGNALWIRE_PROJECT_ID, config.SIGNALWIRE_AUTH_TOKEN, signalwire_space_url=config.SIGNALWIRE_SPACE_URL ) console.print("βœ… SignalWire client initialized successfully", style="green") except Exception as e: console.print(f"❌ Failed to initialize SignalWire client: {e}", style="red") signalwire_client = None # ===== LOGGING CONFIGURATION ===== log_level = getattr(logging, config.LOG_LEVEL.upper(), logging.INFO) logging.basicConfig( level=log_level, format='%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s', handlers=[ logging.FileHandler('jays_mobile_wash.log'), logging.StreamHandler(sys.stdout) ] ) # Create specialized loggers logger = logging.getLogger(__name__) call_logger = logging.getLogger('calls') sms_logger = logging.getLogger('sms') ai_logger = logging.getLogger('ai') training_logger = logging.getLogger('training') system_logger = logging.getLogger('system') forwarding_logger = logging.getLogger('forwarding') # ===== RICH CONSOLE SETUP ===== console = Console() # ===== MONITORING SETUP ===== if config.SENTRY_DSN: sentry_sdk.init( dsn=config.SENTRY_DSN, integrations=[FlaskIntegration()], traces_sample_rate=0.1, environment="production" if not app.debug else "development" ) if config.DATADOG_API_KEY: initialize(api_key=config.DATADOG_API_KEY) if config.BUGSNAG_API_KEY: bugsnag.configure(api_key=config.BUGSNAG_API_KEY) handle_exceptions(app) # ===== PROMETHEUS METRICS ===== REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status']) REQUEST_LATENCY = Histogram('http_request_duration_seconds', 'HTTP request latency') CALL_COUNT = Counter('signalwire_calls_total', 'Total SignalWire calls', ['type', 'status']) SMS_COUNT = Counter('signalwire_sms_total', 'Total SignalWire SMS', ['status']) AI_PROCESSING_TIME = Histogram('ai_processing_seconds', 'AI processing time', ['api']) ACTIVE_CALLS = Gauge('active_calls', 'Number of active calls') ACTIVE_SMS = Gauge('active_sms', 'Number of active SMS conversations') # Start Prometheus metrics server if not config.DEVELOPMENT_MODE: try: start_http_server(8000) logger.info("Prometheus metrics server started on port 8000") except Exception as e: logger.warning(f"Failed to start Prometheus metrics server: {e}") # ===== THREAD-SAFE SYSTEM STATE ===== class SystemState: """Thread-safe system state management""" def __init__(self): self._lock = Lock() self._state = { # ===== CORE SYSTEM STATUS ===== 'ai_enabled': True, 'learning_enabled': True, 'iphone_forwarding_enabled': config.IPHONE_FORWARDING_ENABLED, 'forwarding_delay_seconds': config.FORWARDING_DELAY_SECONDS, 'emergency_mode': False, 'maintenance_mode': False, 'system_health': 'healthy', # ===== CALL MANAGEMENT ===== 'active_calls': {}, 'active_sms': {}, 'call_queue': deque(maxlen=100), 'sms_queue': deque(maxlen=500), # ===== DAILY STATISTICS ===== 'total_calls_today': 0, 'total_sms_today': 0, 'calls_forwarded_today': 0, 'jay_answered_calls': 0, 'ai_handled_calls': 0, 'ai_handled_sms': 0, 'revenue_today': 0.0, 'bookings_today': 0, # ===== PERFORMANCE METRICS ===== 'average_response_time': 0.0, 'customer_satisfaction_score': 4.9, 'ai_accuracy_score': 0.95, 'system_uptime_start': datetime.now(), 'last_health_check': datetime.now(), # ===== API HEALTH STATUS ===== 'api_health': { 'signalwire': {'status': 'unknown', 'last_check': None, 'response_time': 0.0}, 'deepseek': {'status': 'unknown', 'last_check': None, 'response_time': 0.0}, 'huggingface': {'status': 'unknown', 'last_check': None, 'response_time': 0.0}, }, # ===== TRAINING AND LEARNING ===== 'training_sessions_today': 0, 'corrections_applied_today': 0, 'learning_improvements': 0, 'knowledge_base_size': 0, # ===== IPHONE FORWARDING SPECIFIC ===== 'forwarding_rules': { 'unanswered_after_seconds': config.FORWARDING_DELAY_SECONDS, 'forward_on_busy': True, 'forward_on_unreachable': True, 'forward_all_calls': False, 'vip_numbers': [], 'blocked_numbers': [] }, # ===== REAL-TIME MONITORING ===== 'performance_metrics': { 'cpu_usage': 0.0, 'memory_usage': 0.0, 'disk_usage': 0.0, 'network_io': {'bytes_sent': 0, 'bytes_recv': 0}, 'database_connections': 0, 'active_sessions': 0, 'api_calls_per_minute': 0, 'cache_hit_rate': 0.0 }, # ===== FEATURE FLAGS ===== 'feature_flags': { 'whisper_enabled': bool(config.HF_WHISPER_API_KEY), 'deepseek_fallback': bool(config.DEEPSEEK_API_KEY), 'real_time_learning': config.ENABLE_REAL_TIME_LEARNING, 'advanced_analytics': True, 'multilingual_support': config.ENABLE_MULTILINGUAL, 'voice_cloning': config.ENABLE_VOICE_CLONING, 'sentiment_routing': config.ENABLE_CUSTOMER_SENTIMENT_ROUTING, 'predictive_scheduling': config.ENABLE_PREDICTIVE_SCHEDULING, 'dynamic_pricing': config.ENABLE_DYNAMIC_PRICING, 'weather_integration': config.ENABLE_WEATHER_INTEGRATION, 'location_tracking': config.ENABLE_LOCATION_TRACKING, 'iphone_forwarding': config.IPHONE_FORWARDING_ENABLED }, # ===== BUSINESS METRICS ===== 'business_metrics': { 'conversion_rate': 0.0, 'average_booking_value': 0.0, 'customer_retention_rate': 0.0, 'service_completion_rate': 0.0, 'customer_lifetime_value': 0.0, 'profit_margin': 0.0, 'market_penetration': 0.0, 'brand_awareness': 0.0 } } def get(self, key: str = None) -> Any: """Get state value with thread safety""" with self._lock: if key is None: return self._state.copy() return self._state.get(key) def set(self, key: str, value: Any) -> None: """Set state value with thread safety""" with self._lock: self._state[key] = value def update(self, updates: Dict[str, Any]) -> None: """Update multiple state values with thread safety""" with self._lock: self._state.update(updates) def increment(self, key: str, amount: Union[int, float] = 1) -> None: """Increment numeric state value with thread safety""" with self._lock: current_value = self._state.get(key, 0) self._state[key] = current_value + amount def decrement(self, key: str, amount: Union[int, float] = 1) -> None: """Decrement numeric state value with thread safety""" with self._lock: current_value = self._state.get(key, 0) self._state[key] = max(0, current_value - amount) def add_to_queue(self, queue_name: str, item: Any) -> None: """Add item to queue with thread safety""" with self._lock: if queue_name in self._state and hasattr(self._state[queue_name], 'append'): self._state[queue_name].append(item) def get_metrics(self) -> Dict[str, Any]: """Get comprehensive system metrics""" with self._lock: uptime_seconds = (datetime.now() - self._state['system_uptime_start']).total_seconds() return { 'system_uptime_seconds': uptime_seconds, 'system_uptime_formatted': str(timedelta(seconds=int(uptime_seconds))), 'total_calls_today': self._state['total_calls_today'], 'total_sms_today': self._state['total_sms_today'], 'calls_forwarded_today': self._state['calls_forwarded_today'], 'active_calls': len(self._state['active_calls']), 'active_sms': len(self._state['active_sms']), 'jay_answer_rate': self._state['jay_answered_calls'] / max(self._state['total_calls_today'], 1) * 100, 'ai_success_rate': self._state['ai_handled_calls'] / max(self._state['total_calls_today'], 1) * 100, 'revenue_today': self._state['revenue_today'], 'bookings_today': self._state['bookings_today'], 'customer_satisfaction': self._state['customer_satisfaction_score'], 'ai_accuracy': self._state['ai_accuracy_score'], 'average_response_time': self._state['average_response_time'], 'system_health': self._state['system_health'], 'performance': self._state['performance_metrics'], 'api_health': self._state['api_health'], 'feature_flags': self._state['feature_flags'], 'forwarding_enabled': self._state['iphone_forwarding_enabled'], 'forwarding_delay': self._state['forwarding_delay_seconds'] } def update_performance_metrics(self) -> None: """Update real-time performance metrics""" try: with self._lock: # CPU and memory usage self._state['performance_metrics']['cpu_usage'] = psutil.cpu_percent(interval=1) self._state['performance_metrics']['memory_usage'] = psutil.virtual_memory().percent self._state['performance_metrics']['disk_usage'] = psutil.disk_usage('/').percent # Network I/O net_io = psutil.net_io_counters() self._state['performance_metrics']['network_io'] = { 'bytes_sent': net_io.bytes_sent, 'bytes_recv': net_io.bytes_recv } # Update Prometheus metrics ACTIVE_CALLS.set(len(self._state['active_calls'])) ACTIVE_SMS.set(len(self._state['active_sms'])) except Exception as e: logger.error(f"Error updating performance metrics: {e}") # Initialize global system state system_state = SystemState() # ===== DATABASE MODELS ===== Base = declarative_base() class Call(Base): """Enhanced call model with iPhone forwarding support""" __tablename__ = 'calls' id = Column(Integer, primary_key=True, index=True) call_sid = Column(String(50), unique=True, index=True, nullable=False) from_number = Column(String(20), index=True, nullable=False) to_number = Column(String(20), nullable=False) forwarded_from = Column(String(20)) # Original number if forwarded forwarding_reason = Column(String(20)) # unanswered, busy, unreachable start_time = Column(DateTime, default=datetime.utcnow, nullable=False) end_time = Column(DateTime) duration = Column(Integer) # Duration in seconds handled_by = Column(String(10), nullable=False) # 'jay', 'ai', 'forwarded' call_status = Column(String(20), nullable=False) # 'ringing', 'answered', 'completed', 'failed', 'forwarded' forwarding_status = Column(String(20)) # 'forwarded', 'not_forwarded', 'forwarding_failed' recording_url = Column(String(500)) transcription = Column(Text) transcription_confidence = Column(Float) customer_satisfaction = Column(Integer) # 1-5 rating ai_confidence_score = Column(Float) sentiment_score = Column(Float) intent_detected = Column(String(50)) escalated_to_human = Column(Boolean, default=False) booking_created = Column(Boolean, default=False) revenue_generated = Column(Float, default=0.0) notes = Column(Text) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def __repr__(self): return f"" class SMS(Base): """Enhanced SMS model with conversation threading""" __tablename__ = 'sms' id = Column(Integer, primary_key=True, index=True) message_sid = Column(String(50), unique=True, index=True, nullable=False) from_number = Column(String(20), index=True, nullable=False) to_number = Column(String(20), nullable=False) message_body = Column(Text, nullable=False) response_body = Column(Text) timestamp = Column(DateTime, default=datetime.utcnow, nullable=False) response_time = Column(Float) # Response time in seconds conversation_id = Column(String(50), index=True) # Link related messages message_sequence = Column(Integer, default=1) # Order in conversation customer_satisfaction = Column(Integer) # 1-5 rating sentiment_score = Column(Float) intent_detected = Column(String(50)) ai_confidence_score = Column(Float) handled_by = Column(String(10), default='ai') # 'ai', 'jay', 'escalated' escalated_to_human = Column(Boolean, default=False) booking_created = Column(Boolean, default=False) revenue_generated = Column(Float, default=0.0) language_detected = Column(String(10), default='en') contains_media = Column(Boolean, default=False) media_urls = Column(Text) # JSON array of media URLs customer_location = Column(String(100)) weather_conditions = Column(String(50)) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def __repr__(self): return f"" class LearningData(Base): """Enhanced learning data model""" __tablename__ = 'learning_data' id = Column(Integer, primary_key=True, index=True) interaction_type = Column(String(20), nullable=False) # 'call', 'sms', 'training', 'correction' customer_id = Column(String(50), index=True) customer_phone = Column(String(20), index=True) input_text = Column(Text, nullable=False) bot_response = Column(Text) human_correction = Column(Text) corrected_response = Column(Text) sentiment_score = Column(Float) intent_detected = Column(String(50)) confidence_score = Column(Float) timestamp = Column(DateTime, default=datetime.utcnow, nullable=False) approved = Column(Boolean, default=False) correction_applied = Column(Boolean, default=False) improvement_score = Column(Float) # Measure of learning improvement api_used = Column(String(50)) # Which API generated the response response_quality = Column(Float) # 0-1 quality score learning_category = Column(String(50)) # Category of learning feedback_type = Column(String(20)) # 'positive', 'negative', 'neutral' trainer_id = Column(String(50)) # Who provided the correction model_version = Column(String(20)) # Model version when learned context_data = Column(Text) # JSON context information created_at = Column(DateTime, default=datetime.utcnow) def __repr__(self): return f"" class TrainingSession(Base): """Enhanced training session model""" __tablename__ = 'training_sessions' id = Column(Integer, primary_key=True, index=True) session_id = Column(String(50), unique=True, index=True, nullable=False) session_type = Column(String(20), nullable=False) # 'sms_simulation', 'voice_training', 'correction' test_message = Column(Text, nullable=False) bot_response = Column(Text) corrected_response = Column(Text) expected_response = Column(Text) timestamp = Column(DateTime, default=datetime.utcnow, nullable=False) approved = Column(Boolean, default=False) correction_applied = Column(Boolean, default=False) trainer_feedback = Column(Text) quality_score = Column(Float) # 0-1 quality assessment learning_category = Column(String(50)) difficulty_level = Column(String(20)) # 'easy', 'medium', 'hard', 'expert' scenario_type = Column(String(50)) # 'pricing', 'scheduling', 'complaint', etc. success_rate = Column(Float) # Historical success rate for this type improvement_suggestions = Column(Text) model_performance = Column(Text) # JSON performance metrics training_data_source = Column(String(50)) # Source of training data validation_passed = Column(Boolean, default=False) created_at = Column(DateTime, default=datetime.utcnow) def __repr__(self): return f"" class BusinessData(Base): """Enhanced business data model""" __tablename__ = 'business_data' id = Column(Integer, primary_key=True, index=True) date = Column(DateTime, default=datetime.utcnow, nullable=False, index=True) service_type = Column(String(50), index=True, nullable=False) price = Column(Float, nullable=False) actual_price = Column(Float) # Price actually charged (may include discounts) quantity = Column(Integer, default=1) bookings_count = Column(Integer, default=0) completed_bookings = Column(Integer, default=0) cancelled_bookings = Column(Integer, default=0) revenue = Column(Float, default=0.0) profit = Column(Float, default=0.0) costs = Column(Float, default=0.0) customer_rating = Column(Float) service_duration = Column(Integer) # Actual duration in minutes location = Column(String(100)) location_lat = Column(Float) location_lng = Column(Float) weather_conditions = Column(String(50)) temperature = Column(Float) humidity = Column(Float) wind_speed = Column(Float) day_of_week = Column(Integer) # 0=Monday, 6=Sunday hour_of_day = Column(Integer) # 0-23 season = Column(String(20)) # 'spring', 'summer', 'fall', 'winter' promotional_code = Column(String(20)) discount_amount = Column(Float, default=0.0) referral_source = Column(String(50)) customer_type = Column(String(20)) # 'new', 'returning', 'vip' service_quality_score = Column(Float) employee_assigned = Column(String(50)) # 'jay', 'employee_1', etc. vehicle_type = Column(String(50)) vehicle_size = Column(String(20)) # 'compact', 'sedan', 'suv', 'truck' additional_services = Column(Text) # JSON array of add-on services before_photos = Column(Text) # JSON array of photo URLs after_photos = Column(Text) # JSON array of photo URLs created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def __repr__(self): return f"" class Customer(Base): """Enhanced customer model with comprehensive tracking""" __tablename__ = 'customers' id = Column(Integer, primary_key=True, index=True) phone_number = Column(String(20), unique=True, index=True, nullable=False) normalized_phone = Column(String(20), index=True) # E.164 format name = Column(String(100)) email = Column(String(100)) address = Column(String(200)) city = Column(String(50)) state = Column(String(20)) zip_code = Column(String(10)) location_lat = Column(Float) location_lng = Column(Float) # Contact history first_contact = Column(DateTime, default=datetime.utcnow) last_contact = Column(DateTime, default=datetime.utcnow) total_calls = Column(Integer, default=0) total_sms = Column(Integer, default=0) total_interactions = Column(Integer, default=0) # Business metrics total_bookings = Column(Integer, default=0) completed_bookings = Column(Integer, default=0) cancelled_bookings = Column(Integer, default=0) total_spent = Column(Float, default=0.0) total_profit = Column(Float, default=0.0) average_rating = Column(Float, default=5.0) lifetime_value = Column(Float, default=0.0) # Preferences and behavior preferred_services = Column(Text) # JSON array preferred_times = Column(Text) # JSON object with time preferences preferred_locations = Column(Text) # JSON array of locations communication_preference = Column(String(20), default='sms') # 'sms', 'call', 'email' language_preference = Column(String(10), default='en') # Segmentation customer_segment = Column(String(50)) # 'new', 'regular', 'vip', 'at_risk', 'inactive' acquisition_channel = Column(String(50)) # 'organic', 'referral', 'social', 'ads' referral_source = Column(String(100)) marketing_consent = Column(Boolean, default=True) # Satisfaction and engagement satisfaction_score = Column(Float, default=5.0) engagement_score = Column(Float, default=1.0) churn_risk_score = Column(Float, default=0.0) loyalty_points = Column(Integer, default=0) # Vehicle information vehicle_info = Column(Text) # JSON object with vehicle details vehicle_type = Column(String(50)) vehicle_size = Column(String(20)) # AI interaction data ai_interaction_count = Column(Integer, default=0) human_interaction_count = Column(Integer, default=0) ai_satisfaction_score = Column(Float, default=5.0) preferred_interaction_type = Column(String(20)) # 'ai', 'human', 'mixed' # Behavioral data typical_booking_day = Column(String(20)) # 'monday', 'tuesday', etc. typical_booking_time = Column(String(20)) # 'morning', 'afternoon', 'evening' seasonal_pattern = Column(String(50)) # Seasonal booking patterns price_sensitivity = Column(String(20)) # 'low', 'medium', 'high' service_frequency = Column(String(20)) # 'weekly', 'monthly', 'quarterly', 'yearly' # Special flags vip_customer = Column(Boolean, default=False) priority_customer = Column(Boolean, default=False) blocked_customer = Column(Boolean, default=False) test_customer = Column(Boolean, default=False) # Notes and tags notes = Column(Text) tags = Column(Text) # JSON array of tags # Timestamps created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def __repr__(self): return f"" class SystemLog(Base): """System event logging""" __tablename__ = 'system_logs' id = Column(Integer, primary_key=True, index=True) timestamp = Column(DateTime, default=datetime.utcnow, nullable=False, index=True) log_level = Column(String(10), nullable=False, index=True) # 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' logger_name = Column(String(50), nullable=False, index=True) message = Column(Text, nullable=False) function_name = Column(String(100)) line_number = Column(Integer) module_name = Column(String(100)) thread_id = Column(String(50)) process_id = Column(Integer) user_id = Column(String(50)) session_id = Column(String(50)) request_id = Column(String(50)) ip_address = Column(String(45)) user_agent = Column(String(500)) additional_data = Column(Text) # JSON additional context created_at = Column(DateTime, default=datetime.utcnow) def __repr__(self): return f"" # ===== DATABASE INITIALIZATION ===== def init_database(): """Initialize database with all tables and indexes""" try: # Create engine with enhanced configuration engine_config = { 'echo': config.DEBUG_API_CALLS, 'pool_size': 10, 'max_overflow': 20, 'pool_timeout': 30, 'pool_recycle': 3600, 'pool_pre_ping': True } if config.DATABASE_URL.startswith('sqlite'): engine_config.update({ 'connect_args': { 'check_same_thread': False, 'timeout': 30 } }) engine = create_engine(config.DATABASE_URL, **engine_config) # Create all tables Base.metadata.create_all(bind=engine) # Create additional indexes for performance with engine.connect() as conn: # Calls indexes conn.execute("CREATE INDEX IF NOT EXISTS idx_calls_forwarded_from ON calls(forwarded_from)") conn.execute("CREATE INDEX IF NOT EXISTS idx_calls_forwarding_reason ON calls(forwarding_reason)") conn.execute("CREATE INDEX IF NOT EXISTS idx_calls_handled_by_status ON calls(handled_by, call_status)") conn.execute("CREATE INDEX IF NOT EXISTS idx_calls_date_revenue ON calls(DATE(start_time), revenue_generated)") # SMS indexes conn.execute("CREATE INDEX IF NOT EXISTS idx_sms_conversation_sequence ON sms(conversation_id, message_sequence)") conn.execute("CREATE INDEX IF NOT EXISTS idx_sms_date_revenue ON sms(DATE(timestamp), revenue_generated)") conn.execute("CREATE INDEX IF NOT EXISTS idx_sms_language_sentiment ON sms(language_detected, sentiment_score)") # Customers indexes conn.execute("CREATE INDEX IF NOT EXISTS idx_customers_segment_value ON customers(customer_segment, lifetime_value)") conn.execute("CREATE INDEX IF NOT EXISTS idx_customers_location ON customers(location_lat, location_lng)") conn.execute("CREATE INDEX IF NOT EXISTS idx_customers_churn_risk ON customers(churn_risk_score)") # Business data indexes conn.execute("CREATE INDEX IF NOT EXISTS idx_business_data_date_service ON business_data(DATE(date), service_type)") conn.execute("CREATE INDEX IF NOT EXISTS idx_business_data_location_revenue ON business_data(location, revenue)") conn.execute("CREATE INDEX IF NOT EXISTS idx_business_data_weather_performance ON business_data(weather_conditions, service_quality_score)") # Learning data indexes conn.execute("CREATE INDEX IF NOT EXISTS idx_learning_data_category_quality ON learning_data(learning_category, response_quality)") conn.execute("CREATE INDEX IF NOT EXISTS idx_learning_data_api_confidence ON learning_data(api_used, confidence_score)") # System logs indexes conn.execute("CREATE INDEX IF NOT EXISTS idx_system_logs_level_time ON system_logs(log_level, timestamp)") conn.execute("CREATE INDEX IF NOT EXISTS idx_system_logs_logger_time ON system_logs(logger_name, timestamp)") conn.commit() logger.info(f"βœ… Database initialized successfully: {config.DATABASE_URL}") return engine except Exception as e: logger.error(f"❌ Database initialization failed: {e}") return None # Create database engine and session factory engine = init_database() if engine: SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) else: SessionLocal = None logger.error("❌ Database not available - running in limited mode") def get_db(): """Database dependency for request handling""" if SessionLocal is None: return None db = SessionLocal() try: yield db finally: db.close() # ===== ADVANCED API INTEGRATION SYSTEM ===== class AdvancedAPIProcessor: """Advanced API processing with comprehensive HuggingFace integration""" def __init__(self): self.api_endpoints = { 'transformers': 'https://api-inference.huggingface.co/models/distilbert-base-uncased', 'sentence_transformers': 'https://api-inference.huggingface.co/models/sentence-transformers/all-MiniLM-L6-v2', 'summary': 'https://api-inference.huggingface.co/models/facebook/bart-large-cnn', 'gpt_neo': 'https://api-inference.huggingface.co/models/EleutherAI/gpt-neo-2.7B', 'distilbert_sentiment': 'https://api-inference.huggingface.co/models/distilbert-base-uncased-finetuned-sst-2-english', 'whisper': f'https://api-inference.huggingface.co/models/{config.WHISPER_MODEL}', 'clap': 'https://api-inference.huggingface.co/models/laion/clap-htsat-unfused', 'wav2vec2': 'https://api-inference.huggingface.co/models/facebook/wav2vec2-base-960h', 'speech_generation': 'https://api-inference.huggingface.co/models/microsoft/speecht5_tts', 'keybert': 'https://api-inference.huggingface.co/models/distilbert-base-nli-mean-tokens', 'llama': 'https://api-inference.huggingface.co/models/meta-llama/Llama-2-7b-chat-hf', 'bert_base': 'https://api-inference.huggingface.co/models/bert-base-uncased', 'auto_template': 'https://api-inference.huggingface.co/models/microsoft/DialoGPT-medium', 'multilingual': 'https://api-inference.huggingface.co/models/distilbert-base-multilingual-cased', 'deberta': 'https://api-inference.huggingface.co/models/microsoft/deberta-v3-large' } self.api_keys = config.get_hf_api_keys() self.usage_stats = defaultdict(int) self.response_cache = {} self.rate_limits = defaultdict(lambda: {'count': 0, 'reset_time': datetime.now()}) self.health_status = defaultdict(lambda: {'status': 'unknown', 'last_check': None, 'response_time': 0.0}) self.performance_metrics = defaultdict(lambda: {'total_calls': 0, 'success_rate': 0.0, 'avg_response_time': 0.0}) # Initialize additional AI clients self.init_additional_ai_clients() # Start background health monitoring self.start_health_monitoring() # Initialize caching system self.init_caching_system() def init_additional_ai_clients(self): """Initialize additional AI service clients""" self.clients = {} # OpenAI if config.OPENAI_API_KEY: try: import openai openai.api_key = config.OPENAI_API_KEY self.clients['openai'] = openai logger.info("βœ… OpenAI client initialized") except ImportError: logger.warning("OpenAI library not available") # Anthropic if config.ANTHROPIC_API_KEY: try: import anthropic self.clients['anthropic'] = anthropic.Anthropic(api_key=config.ANTHROPIC_API_KEY) logger.info("βœ… Anthropic client initialized") except ImportError: logger.warning("Anthropic library not available") # Cohere if config.COHERE_API_KEY: try: import cohere self.clients['cohere'] = cohere.Client(config.COHERE_API_KEY) logger.info("βœ… Cohere client initialized") except ImportError: logger.warning("Cohere library not available") # Groq if config.GROQ_API_KEY: try: from groq import Groq self.clients['groq'] = Groq(api_key=config.GROQ_API_KEY) logger.info("βœ… Groq client initialized") except ImportError: logger.warning("Groq library not available") # Together if config.TOGETHER_API_KEY: try: import together together.api_key = config.TOGETHER_API_KEY self.clients['together'] = together logger.info("βœ… Together client initialized") except ImportError: logger.warning("Together library not available") # Mistral if config.MISTRAL_API_KEY: try: from mistralai.client import MistralClient self.clients['mistral'] = MistralClient(api_key=config.MISTRAL_API_KEY) logger.info("βœ… Mistral client initialized") except ImportError: logger.warning("Mistral library not available") # Google Generative AI if config.GOOGLE_API_KEY: try: import google.generativeai as genai genai.configure(api_key=config.GOOGLE_API_KEY) self.clients['gemini'] = genai logger.info("βœ… Google Generative AI client initialized") except ImportError: logger.warning("Google Generative AI library not available") logger.info(f"Initialized {len(self.clients)} additional AI clients") def init_caching_system(self): """Initialize response caching system""" try: if config.REDIS_URL: import redis self.redis_client = redis.Redis.from_url(config.REDIS_URL, decode_responses=True) self.redis_client.ping() logger.info("βœ… Redis caching system initialized") else: self.redis_client = None logger.info("πŸ“ Using in-memory caching (Redis not available)") except Exception as e: logger.warning(f"Redis connection failed, using in-memory cache: {e}") self.redis_client = None def start_health_monitoring(self): """Start background health monitoring for all APIs""" def monitor_health(): while True: try: self.check_all_api_health() system_state.set('last_health_check', datetime.now()) time.sleep(300) # Check every 5 minutes except Exception as e: logger.error(f"Health monitoring error: {e}") time.sleep(60) # Retry after 1 minute on error health_thread = Thread(target=monitor_health, daemon=True) health_thread.start() logger.info("βœ… API health monitoring started") def check_rate_limit(self, api_name: str) -> bool: """Check if API rate limit is exceeded""" now = datetime.now() rate_info = self.rate_limits[api_name] # Reset counter if a minute has passed if (now - rate_info['reset_time']).seconds >= 60: rate_info['count'] = 0 rate_info['reset_time'] = now # Check if rate limit exceeded if rate_info['count'] >= config.API_RATE_LIMIT_PER_MINUTE: logger.warning(f"Rate limit exceeded for {api_name}") return False rate_info['count'] += 1 return True def get_cache_key(self, api_name: str, input_data: str) -> str: """Generate cache key for API responses""" data_hash = hashlib.md5(input_data.encode()).hexdigest() return f"api_cache:{api_name}:{data_hash}" def get_cached_response(self, api_name: str, input_data: str) -> Optional[Dict]: """Get cached API response""" try: cache_key = self.get_cache_key(api_name, input_data) if self.redis_client: cached = self.redis_client.get(cache_key) if cached: return json.loads(cached) else: # In-memory cache if cache_key in self.response_cache: cached_data = self.response_cache[cache_key] if (datetime.now() - cached_data['timestamp']).seconds < config.CACHE_TTL_SECONDS: return cached_data['response'] else: del self.response_cache[cache_key] return None except Exception as e: logger.error(f"Cache retrieval error: {e}") return None def cache_response(self, api_name: str, input_data: str, response: Dict) -> None: """Cache API response""" try: cache_key = self.get_cache_key(api_name, input_data) if self.redis_client: self.redis_client.setex( cache_key, config.CACHE_TTL_SECONDS, json.dumps(response) ) else: # In-memory cache with size limit if len(self.response_cache) > 1000: # Remove oldest entries oldest_keys = sorted( self.response_cache.keys(), key=lambda k: self.response_cache[k]['timestamp'] )[:100] for key in oldest_keys: del self.response_cache[key] self.response_cache[cache_key] = { 'response': response, 'timestamp': datetime.now() } except Exception as e: logger.error(f"Cache storage error: {e}") def process_audio_with_whisper(self, audio_data: bytes, context: Dict = None) -> Dict: """🎀 WHISPER: Enhanced speech-to-text with iPhone forwarding context""" start_time = time.time() try: if not self.check_rate_limit('whisper'): return {"error": "Rate limit exceeded for Whisper API", "success": False} api_key = self.api_keys.get('whisper') if not api_key: return {"error": "Whisper API not configured", "success": False} # Check cache first cache_key_data = f"whisper_{len(audio_data)}_{hashlib.md5(audio_data).hexdigest()[:8]}" cached_response = self.get_cached_response('whisper', cache_key_data) if cached_response: ai_logger.info("Whisper response served from cache") return cached_response headers = {"Authorization": f"Bearer {api_key}"} # Enhanced audio preprocessing processed_audio = self.preprocess_audio(audio_data) response = requests.post( self.api_endpoints['whisper'], headers=headers, data=processed_audio, timeout=config.AI_RESPONSE_TIMEOUT ) response_time = time.time() - start_time # Update performance metrics self.performance_metrics['whisper']['total_calls'] += 1 self.performance_metrics['whisper']['avg_response_time'] = ( (self.performance_metrics['whisper']['avg_response_time'] * (self.performance_metrics['whisper']['total_calls'] - 1) + response_time) / self.performance_metrics['whisper']['total_calls'] ) if response.status_code == 200: result = response.json() self.usage_stats['whisper'] += 1 transcribed_text = result.get('text', '') confidence = result.get('confidence', 1.0) language = result.get('language', 'en') # Enhanced post-processing with context cleaned_text = self.clean_transcription(transcribed_text, context) # Check for iPhone forwarding indicators forwarding_indicators = self.detect_forwarding_context(cleaned_text, context) response_data = { "success": True, "transcription": cleaned_text, "original_transcription": transcribed_text, "confidence": confidence, "response_time": response_time, "language_detected": language, "audio_duration": self.get_audio_duration(audio_data), "forwarding_context": forwarding_indicators, "quality_score": self.assess_transcription_quality(cleaned_text, confidence), "api_used": "whisper", "model": config.WHISPER_MODEL } # Cache successful response self.cache_response('whisper', cache_key_data, response_data) # Update Prometheus metrics AI_PROCESSING_TIME.labels(api='whisper').observe(response_time) ai_logger.info(f"Whisper transcription completed: {cleaned_text[:100]}... (confidence: {confidence:.2f})") return response_data else: error_msg = f"Whisper API returned {response.status_code}: {response.text}" ai_logger.error(error_msg) self.performance_metrics['whisper']['success_rate'] = ( self.performance_metrics['whisper']['success_rate'] * 0.9 ) return {"error": error_msg, "success": False, "response_time": response_time} except Exception as e: response_time = time.time() - start_time error_msg = f"Whisper processing error: {e}" ai_logger.error(error_msg) return {"error": error_msg, "success": False, "response_time": response_time} def detect_forwarding_context(self, text: str, context: Dict = None) -> Dict: """Detect iPhone forwarding context from transcription""" indicators = { 'likely_forwarded': False, 'forwarding_reason': None, 'urgency_level': 'normal', 'customer_mood': 'neutral' } if not text or not context: return indicators text_lower = text.lower() # Check for forwarding indicators forwarding_phrases = [ 'tried calling', 'called earlier', 'no answer', 'voicemail', 'forwarded', 'transferred', 'calling back', 'return call' ] urgency_phrases = [ 'urgent', 'emergency', 'asap', 'immediately', 'right now', 'stuck', 'problem', 'issue', 'help' ] frustration_phrases = [ 'frustrated', 'annoyed', 'upset', 'angry', 'disappointed', 'waiting', 'still waiting', 'called multiple times' ] # Detect forwarding if any(phrase in text_lower for phrase in forwarding_phrases): indicators['likely_forwarded'] = True indicators['forwarding_reason'] = 'customer_indication' # Detect urgency if any(phrase in text_lower for phrase in urgency_phrases): indicators['urgency_level'] = 'high' # Detect customer mood if any(phrase in text_lower for phrase in frustration_phrases): indicators['customer_mood'] = 'frustrated' # Check call context if context.get('forwarded_from'): indicators['likely_forwarded'] = True indicators['forwarding_reason'] = context.get('forwarding_reason', 'system_forwarded') return indicators def preprocess_audio(self, audio_data: bytes) -> bytes: """Enhanced audio preprocessing for better transcription""" try: # Basic audio preprocessing # In a production environment, you would implement: # - Noise reduction # - Audio normalization # - Format conversion # - Quality enhancement # For now, return as-is return audio_data except Exception as e: logger.warning(f"Audio preprocessing failed: {e}") return audio_data def clean_transcription(self, text: str, context: Dict = None) -> str: """Enhanced transcription cleaning with context awareness""" if not text: return "" # Remove filler words filler_words = ['um', 'uh', 'like', 'you know', 'so', 'well', 'actually', 'basically'] words = text.split() cleaned_words = [word for word in words if word.lower() not in filler_words] # Fix common transcription errors for car wash domain corrections = { 'jay': 'Jay', 'jays': "Jay's", 'mobile wash': 'Mobile Wash', 'car wash': 'car wash', 'detailing': 'detailing', 'ceramic': 'ceramic', 'wax': 'wax', 'vacuum': 'vacuum', 'interior': 'interior', 'exterior': 'exterior' } cleaned_text = ' '.join(cleaned_words) # Apply corrections for old, new in corrections.items(): cleaned_text = re.sub(rf'\b{re.escape(old)}\b', new, cleaned_text, flags=re.IGNORECASE) # Context-aware cleaning if context and context.get('likely_forwarded'): # If this is a forwarded call, customer might be explaining the situation pass # Keep more context words return cleaned_text.strip() def get_audio_duration(self, audio_data: bytes) -> float: """Estimate audio duration in seconds""" try: # This is a simplified estimation # In production, you would use audio libraries like librosa or pydub return len(audio_data) / 16000 # Assuming 16kHz sample rate except: return 0.0 def assess_transcription_quality(self, text: str, confidence: float) -> float: """Assess transcription quality based on multiple factors""" if not text: return 0.0 quality_score = confidence # Length factor (very short or very long transcriptions might be low quality) word_count = len(text.split()) if 5 <= word_count <= 100: quality_score += 0.1 elif word_count < 3: quality_score -= 0.2 # Business domain relevance business_keywords = [ 'wash', 'detail', 'car', 'service', 'book', 'schedule', 'price', 'cost', 'jay', 'mobile', 'clean', 'wax' ] keyword_matches = sum(1 for keyword in business_keywords if keyword.lower() in text.lower()) quality_score += min(keyword_matches * 0.05, 0.2) # Grammar and coherence (simplified check) if text.count('.') > 0 or text.count('?') > 0: quality_score += 0.05 return max(0.0, min(1.0, quality_score)) def analyze_sentiment_advanced(self, text: str, context: Dict = None) -> Dict: """Advanced sentiment analysis with iPhone forwarding context""" start_time = time.time() try: if not self.check_rate_limit('distilbert_sentiment'): return {"error": "Rate limit exceeded for sentiment analysis", "success": False} # Check cache first cached_response = self.get_cached_response('distilbert_sentiment', text) if cached_response: return cached_response api_key = self.api_keys.get('distilbert_sentiment') if not api_key: # Fallback to TextBlob return self.fallback_sentiment_analysis(text, context) headers = {"Authorization": f"Bearer {api_key}"} response = requests.post( self.api_endpoints['distilbert_sentiment'], headers=headers, json={"inputs": text}, timeout=config.AI_RESPONSE_TIMEOUT ) response_time = time.time() - start_time if response.status_code == 200: result = response.json() self.usage_stats['distilbert_sentiment'] += 1 if isinstance(result, list) and len(result) > 0: sentiment_data = result[0] label = sentiment_data.get('label', 'NEUTRAL') score = sentiment_data.get('score', 0.5) # Convert to -1 to 1 scale sentiment_value = score if label == 'POSITIVE' else -score # Apply context adjustments if context: sentiment_value = self.adjust_sentiment_for_context(sentiment_value, context) response_data = { "sentiment": sentiment_value, "confidence": score, "label": label, "source": "huggingface_distilbert", "response_time": response_time, "context_adjusted": bool(context), "success": True } # Cache successful response self.cache_response('distilbert_sentiment', text, response_data) return response_data # Fallback to TextBlob return self.fallback_sentiment_analysis(text, context) except Exception as e: ai_logger.error(f"Sentiment analysis error: {e}") return self.fallback_sentiment_analysis(text, context) def adjust_sentiment_for_context(self, sentiment: float, context: Dict) -> float: """Adjust sentiment based on iPhone forwarding context""" adjusted_sentiment = sentiment # If call was forwarded, customer might be more frustrated if context.get('likely_forwarded'): if sentiment > -0.3: # Not already very negative adjusted_sentiment -= 0.2 # Make slightly more negative # If customer had to wait or call multiple times if context.get('forwarding_reason') == 'customer_indication': adjusted_sentiment -= 0.1 # If this is an urgent situation if context.get('urgency_level') == 'high': if sentiment < 0: # Already negative adjusted_sentiment -= 0.1 # Make more negative return max(-1.0, min(1.0, adjusted_sentiment)) def fallback_sentiment_analysis(self, text: str, context: Dict = None) -> Dict: """Fallback sentiment analysis using TextBlob""" try: blob = TextBlob(text) sentiment_value = blob.sentiment.polarity # Apply context adjustments if context: sentiment_value = self.adjust_sentiment_for_context(sentiment_value, context) return { "sentiment": sentiment_value, "confidence": abs(sentiment_value), "label": "POSITIVE" if sentiment_value > 0 else "NEGATIVE" if sentiment_value < 0 else "NEUTRAL", "source": "textblob_fallback", "response_time": 0.1, "context_adjusted": bool(context), "success": True } except Exception as e: ai_logger.error(f"Fallback sentiment analysis error: {e}") return { "sentiment": 0.0, "confidence": 0.0, "label": "NEUTRAL", "source": "default", "response_time": 0.0, "context_adjusted": False, "success": False, "error": str(e) } def extract_intent_advanced(self, text: str, context: Dict = None) -> Dict: """Advanced intent extraction with iPhone forwarding awareness""" try: # Enhanced intent patterns for iPhone forwarding scenarios intent_patterns = { 'pricing_inquiry': [ r'\b(?:price|cost|how much|expensive|cheap|fee|charge|rate|pricing|costs|charges|fees|rates)\b', r'\$\d+', r'\b(?:dollar|buck|money|payment|pay)\b' ], 'scheduling_inquiry': [ r'\b(?:book|schedule|appointment|available|when|time|date|timing)\b', r'\b(?:tomorrow|today|next week|weekend|monday|tuesday|wednesday|thursday|friday|saturday|sunday)\b', r'\b(?:morning|afternoon|evening|noon|am|pm)\b', r'\b(?:availability|slot|opening)\b' ], 'service_inquiry': [ r'\b(?:service|wash|detail|clean|wax|ceramic|polish|vacuum|detailing)\b', r'\b(?:interior|exterior|full|basic|premium|complete|express)\b', r'\b(?:headlight|tire|rim|dashboard|seat|carpet|window)\b' ], 'location_inquiry': [ r'\b(?:location|where|area|travel|distance|come to|visit|address)\b', r'\b(?:directions|map|gps|miles|radius|zone)\b' ], 'complaint': [ r'\b(?:problem|issue|complaint|wrong|bad|terrible|awful|disappointed|unsatisfied)\b', r'\b(?:unhappy|refund|cancel|dissatisfied|poor|horrible)\b' ], 'compliment': [ r'\b(?:great|excellent|amazing|fantastic|wonderful|perfect|love|awesome)\b', r'\b(?:thank you|thanks|appreciate|recommend|satisfied|happy)\b' ], 'emergency': [ r'\b(?:emergency|urgent|asap|immediately|now|help|stuck)\b', r'\b(?:accident|breakdown|stranded|crisis)\b' ], 'callback_request': [ r'\b(?:call back|callback|return call|call me back|phone me)\b', r'\b(?:tried calling|called earlier|no answer|voicemail)\b' ], 'forwarding_explanation': [ r'\b(?:forwarded|transferred|redirected|routed)\b', r'\b(?:calling again|second time|multiple times)\b' ], 'frustration_forwarding': [ r'\b(?:waiting|still waiting|tried multiple|keep trying)\b', r'\b(?:busy|no one answered|goes to voicemail)\b' ] } text_lower = text.lower() intent_scores = {} # Calculate intent scores for intent, patterns in intent_patterns.items(): score = 0 for pattern in patterns: matches = len(re.findall(pattern, text_lower)) score += matches if score > 0: # Normalize score based on text length normalized_score = score / len(text.split()) * 10 intent_scores[intent] = min(normalized_score, 1.0) # Apply context-based adjustments if context: intent_scores = self.adjust_intent_for_context(intent_scores, context) # Get primary intent if intent_scores: primary_intent = max(intent_scores, key=intent_scores.get) confidence = intent_scores[primary_intent] # Combine related intents combined_intents = self.combine_related_intents(intent_scores) return { "intent": primary_intent, "confidence": confidence, "all_intents": intent_scores, "combined_intents": combined_intents, "method": "rule_based_enhanced", "context_aware": bool(context), "success": True } return { "intent": "general_inquiry", "confidence": 0.5, "all_intents": {}, "combined_intents": [], "method": "default", "context_aware": bool(context), "success": True } except Exception as e: ai_logger.error(f"Intent extraction error: {e}") return { "intent": "general_inquiry", "confidence": 0.0, "error": str(e), "method": "error_fallback", "context_aware": False, "success": False } def adjust_intent_for_context(self, intent_scores: Dict, context: Dict) -> Dict: """Adjust intent scores based on iPhone forwarding context""" adjusted_scores = intent_scores.copy() # If call was forwarded, boost relevant intents if context.get('likely_forwarded'): # Boost callback and frustration intents if 'callback_request' in adjusted_scores: adjusted_scores['callback_request'] *= 1.5 if 'forwarding_explanation' in adjusted_scores: adjusted_scores['forwarding_explanation'] *= 1.3 if 'frustration_forwarding' in adjusted_scores: adjusted_scores['frustration_forwarding'] *= 1.4 # If customer mood is frustrated if context.get('customer_mood') == 'frustrated': if 'complaint' in adjusted_scores: adjusted_scores['complaint'] *= 1.3 if 'frustration_forwarding' in adjusted_scores: adjusted_scores['frustration_forwarding'] *= 1.2 # If urgency is high if context.get('urgency_level') == 'high': if 'emergency' in adjusted_scores: adjusted_scores['emergency'] *= 1.4 if 'scheduling_inquiry' in adjusted_scores: adjusted_scores['scheduling_inquiry'] *= 1.2 # Normalize scores to stay within 0-1 range for intent in adjusted_scores: adjusted_scores[intent] = min(adjusted_scores[intent], 1.0) return adjusted_scores def combine_related_intents(self, intent_scores: Dict) -> List[str]: """Combine related intents for better understanding""" combined = [] # Service-related intents service_intents = ['service_inquiry', 'pricing_inquiry', 'scheduling_inquiry'] if any(intent in intent_scores for intent in service_intents): combined.append('service_related') # Forwarding-related intents forwarding_intents = ['callback_request', 'forwarding_explanation', 'frustration_forwarding'] if any(intent in intent_scores for intent in forwarding_intents): combined.append('forwarding_related') # Problem-related intents problem_intents = ['complaint', 'emergency', 'frustration_forwarding'] if any(intent in intent_scores for intent in problem_intents): combined.append('problem_related') return combined def extract_keywords_advanced(self, text: str, context: Dict = None) -> Dict: """Advanced keyword extraction with business domain focus""" try: keywords = [] # Enhanced car wash domain keywords domain_keywords = { 'services': { 'keywords': ['wash', 'detail', 'clean', 'wax', 'polish', 'vacuum', 'ceramic', 'coating', 'headlight', 'restoration', 'interior', 'exterior'], 'weight': 1.0 }, 'vehicle_parts': { 'keywords': ['dashboard', 'seats', 'carpet', 'windows', 'tires', 'rims', 'engine', 'trunk', 'hood', 'bumper'], 'weight': 0.8 }, 'service_levels': { 'keywords': ['basic', 'premium', 'full', 'complete', 'express', 'deluxe', 'standard', 'quick', 'thorough'], 'weight': 0.9 }, 'pricing': { 'keywords': ['price', 'cost', 'expensive', 'cheap', 'fee', 'charge', 'rate', 'discount', 'deal', 'special', 'promotion'], 'weight': 1.0 }, 'scheduling': { 'keywords': ['book', 'schedule', 'appointment', 'available', 'time', 'date', 'when', 'today', 'tomorrow', 'weekend'], 'weight': 1.0 }, 'location': { 'keywords': ['location', 'address', 'where', 'distance', 'travel', 'come', 'visit', 'area', 'radius', 'miles'], 'weight': 0.9 }, 'quality': { 'keywords': ['quality', 'professional', 'excellent', 'perfect', 'thorough', 'careful', 'detailed'], 'weight': 0.7 }, 'problems': { 'keywords': ['problem', 'issue', 'wrong', 'bad', 'complaint', 'disappointed', 'unhappy', 'poor'], 'weight': 1.2 }, 'forwarding': { 'keywords': ['forwarded', 'callback', 'tried calling', 'no answer', 'voicemail', 'busy', 'waiting'], 'weight': 1.1 } } text_lower = text.lower() words = re.findall(r'\b\w+\b', text_lower) word_count = len(words) # Extract keywords by category for category, data in domain_keywords.items(): category_keywords = data['keywords'] weight = data['weight'] for keyword in category_keywords: if keyword in words: frequency = words.count(keyword) relevance = (frequency / word_count) * weight keywords.append({ 'keyword': keyword, 'category': category, 'frequency': frequency, 'relevance': relevance, 'context_boosted': False }) # Apply context-based boosting if context: keywords = self.boost_keywords_for_context(keywords, context) # Sort by relevance keywords.sort(key=lambda x: x['relevance'], reverse=True) # Use HuggingFace API if available for additional extraction if self.api_keys.get('keybert') and self.check_rate_limit('keybert'): try: hf_keywords = self.extract_keywords_huggingface(text) if hf_keywords: keywords.extend(hf_keywords) except Exception as e: ai_logger.warning(f"HuggingFace keyword extraction failed: {e}") # Remove duplicates and limit to top keywords unique_keywords = [] seen_keywords = set() for kw in keywords: if kw['keyword'] not in seen_keywords: unique_keywords.append(kw) seen_keywords.add(kw['keyword']) return { "keywords": unique_keywords[:15], # Top 15 keywords "total_found": len(unique_keywords), "extraction_methods": ["domain_specific", "context_aware"], "context_applied": bool(context), "success": True } except Exception as e: ai_logger.error(f"Keyword extraction error: {e}") return {"keywords": [], "error": str(e), "success": False} def boost_keywords_for_context(self, keywords: List[Dict], context: Dict) -> List[Dict]: """Boost keyword relevance based on context""" for keyword in keywords: # Boost forwarding-related keywords if call was forwarded if context.get('likely_forwarded') and keyword['category'] == 'forwarding': keyword['relevance'] *= 1.5 keyword['context_boosted'] = True # Boost problem keywords if customer is frustrated if context.get('customer_mood') == 'frustrated' and keyword['category'] == 'problems': keyword['relevance'] *= 1.3 keyword['context_boosted'] = True # Boost scheduling keywords if urgency is high if context.get('urgency_level') == 'high' and keyword['category'] == 'scheduling': keyword['relevance'] *= 1.2 keyword['context_boosted'] = True return keywords def extract_keywords_huggingface(self, text: str) -> List[Dict]: """Extract keywords using HuggingFace API""" try: api_key = self.api_keys.get('keybert') if not api_key: return [] headers = {"Authorization": f"Bearer {api_key}"} response = requests.post( self.api_endpoints['keybert'], headers=headers, json={"inputs": text}, timeout=10 ) if response.status_code == 200: self.usage_stats['keybert'] += 1 result = response.json() # Process HuggingFace response and convert to our format hf_keywords = [] if isinstance(result, list): for item in result: if isinstance(item, dict) and 'word' in item: hf_keywords.append({ 'keyword': item['word'], 'category': 'huggingface_extracted', 'frequency': 1, 'relevance': item.get('score', 0.5), 'context_boosted': False }) return hf_keywords except Exception as e: ai_logger.error(f"HuggingFace keyword extraction error: {e}") return [] def generate_with_multiple_apis(self, prompt: str, context: Dict = None) -> Dict: """Generate response using multiple AI APIs with intelligent fallback""" response_chain = [] # Enhanced context preparation enhanced_context = self.prepare_enhanced_context(context) # Try DeepSeek first (best for customer service) if config.DEEPSEEK_API_KEY: try: deepseek_response = self.generate_with_deepseek(prompt, enhanced_context) if deepseek_response and not deepseek_response.get('error'): quality_score = self.assess_response_quality( deepseek_response.get('text', ''), prompt, enhanced_context ) response_chain.append({ 'api': 'deepseek', 'response': deepseek_response.get('text', ''), 'full_response': deepseek_response, 'quality': quality_score, 'response_time': deepseek_response.get('response_time', 0) }) except Exception as e: ai_logger.warning(f"DeepSeek generation failed: {e}") # Try HuggingFace LLaMA if DeepSeek quality is low or failed if (not response_chain or response_chain[0]['quality'] < 0.7) and self.api_keys.get('llama'): try: llama_response = self.generate_with_llama(prompt, enhanced_context) if llama_response: quality_score = self.assess_response_quality(llama_response, prompt, enhanced_context) response_chain.append({ 'api': 'llama', 'response': llama_response, 'full_response': {'text': llama_response}, 'quality': quality_score, 'response_time': 0 }) except Exception as e: ai_logger.warning(f"LLaMA generation failed: {e}") # Try GPT-Neo as additional fallback if (not response_chain or max(r['quality'] for r in response_chain) < 0.6) and self.api_keys.get('gpt_neo'): try: gpt_neo_response = self.generate_with_gpt_neo(prompt, enhanced_context) if gpt_neo_response: quality_score = self.assess_response_quality(gpt_neo_response, prompt, enhanced_context) response_chain.append({ 'api': 'gpt_neo', 'response': gpt_neo_response, 'full_response': {'text': gpt_neo_response}, 'quality': quality_score, 'response_time': 0 }) except Exception as e: ai_logger.warning(f"GPT-Neo generation failed: {e}") # Select best response if response_chain: best_response = max(response_chain, key=lambda x: x['quality']) # Apply post-processing based on context processed_response = self.post_process_response( best_response['response'], enhanced_context ) return { 'text': processed_response, 'original_text': best_response['response'], 'api_used': best_response['api'], 'quality_score': best_response['quality'], 'alternatives': len(response_chain) - 1, 'response_time': best_response['response_time'], 'context_enhanced': bool(enhanced_context), 'success': True } # Final rule-based fallback fallback_response = self.generate_rule_based_fallback(prompt, enhanced_context) return { 'text': fallback_response, 'original_text': fallback_response, 'api_used': 'rule_based_fallback', 'quality_score': 0.5, 'alternatives': 0, 'response_time': 0.1, 'context_enhanced': bool(enhanced_context), 'success': True } def prepare_enhanced_context(self, context: Dict = None) -> Dict: """Prepare enhanced context for AI generation""" if not context: context = {} enhanced = context.copy() # Add business context enhanced['business_info'] = { 'name': config.BUSINESS_NAME, 'services': config.get_service_pricing(), 'hours': { 'weekday': config.BUSINESS_HOURS_WEEKDAY, 'saturday': config.BUSINESS_HOURS_SATURDAY, 'sunday': config.BUSINESS_HOURS_SUNDAY }, 'service_area': f"{config.SERVICE_RADIUS_MILES} mile radius", 'phone': config.BUSINESS_PHONE, 'email': config.BUSINESS_EMAIL } # Add iPhone forwarding context if enhanced.get('likely_forwarded'): enhanced['communication_context'] = { 'forwarded_call': True, 'customer_waited': True, 'may_be_frustrated': enhanced.get('customer_mood') == 'frustrated', 'urgency_level': enhanced.get('urgency_level', 'normal') } # Add current system status enhanced['system_context'] = { 'jay_available': not system_state.get('emergency_mode'), 'current_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'forwarding_enabled': system_state.get('iphone_forwarding_enabled'), 'ai_mode': system_state.get('ai_enabled') } return enhanced def generate_with_deepseek(self, prompt: str, context: Dict = None) -> Dict: """Enhanced DeepSeek generation with iPhone forwarding awareness""" try: if not config.DEEPSEEK_API_KEY: return {"error": "DeepSeek API not configured"} headers = { "Authorization": f"Bearer {config.DEEPSEEK_API_KEY}", "Content-Type": "application/json" } # Enhanced system prompt with iPhone forwarding context system_prompt = f"""You are Jay's Mobile Wash AI assistant. You represent a professional mobile car detailing service owned by Jay. BUSINESS INFORMATION: - Business: {config.BUSINESS_NAME} - Owner: Jay - Service Area: {config.SERVICE_RADIUS_MILES} mile radius from Los Angeles, CA - Hours: {config.BUSINESS_HOURS_WEEKDAY} (Mon-Fri), {config.BUSINESS_HOURS_SATURDAY} (Sat), {config.BUSINESS_HOURS_SUNDAY} (Sun) - Payment: Cash, Venmo, Zelle, card on arrival - Weather Policy: Service suspended during rain SERVICES & PRICING: - Basic Wash: ${config.BASIC_WASH_PRICE} (30 min) - Exterior wash and dry - Premium Wash: ${config.PREMIUM_WASH_PRICE} (45 min) - Exterior wash, wax, interior vacuum - Full Detail: ${config.FULL_DETAIL_PRICE} (90 min) - Complete interior and exterior detailing - Ceramic Coating: ${config.CERAMIC_COATING_PRICE} (2 hours) - Premium ceramic protection - Headlight Restoration: ${config.HEADLIGHT_RESTORATION_PRICE} (30 min) - Headlight cleaning and restoration IPHONE FORWARDING CONTEXT: Jay uses iPhone call forwarding. If customers call his main number (+1 {config.JAY_PHONE}) and he doesn't answer within {config.FORWARDING_DELAY_SECONDS} seconds, calls automatically forward to this AI system. COMMUNICATION GUIDELINES: - Be friendly, professional, and helpful - Sound natural and conversational like Jay would - Focus on customer satisfaction and booking appointments - If customer seems frustrated (especially if call was forwarded), acknowledge their wait time - Always offer to connect them with Jay directly if needed - Use first person ("I can help you") not third person ("Jay can help you") ESCALATION TRIGGERS: - Customer expresses frustration or anger - Complex technical issues - Complaints or refund requests - Emergency situations - Customer specifically asks for Jay IMPORTANT: If this is a forwarded call or customer mentions waiting/calling multiple times, acknowledge their patience and offer priority service.""" # Add context-specific instructions if context: if context.get('likely_forwarded'): system_prompt += "\n\nCONTEXT: This call was forwarded from Jay's iPhone. Customer may have been waiting. Be empathetic about any wait time." if context.get('customer_mood') == 'frustrated': system_prompt += "\n\nCONTEXT: Customer appears frustrated. Prioritize empathy and offer to connect with Jay directly." if context.get('urgency_level') == 'high': system_prompt += "\n\nCONTEXT: This appears to be an urgent situation. Prioritize immediate assistance." data = { "model": config.LANGUAGE_MODEL, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": prompt} ], "max_tokens": 250, "temperature": 0.7, "top_p": 0.9, "frequency_penalty": 0.1, "presence_penalty": 0.1, "stop": ["\n\nUser:", "\n\nCustomer:", "\n\nHuman:"] } start_time = time.time() response = requests.post( "https://api.deepseek.com/v1/chat/completions", headers=headers, json=data, timeout=config.AI_RESPONSE_TIMEOUT ) response_time = time.time() - start_time if response.status_code == 200: result = response.json() ai_response = result['choices'][0]['message']['content'] # Clean up response cleaned_response = self.clean_ai_response(ai_response) ai_logger.info(f"DeepSeek generated response in {response_time:.2f}s") return { "text": cleaned_response, "original_text": ai_response, "response_time": response_time, "model": config.LANGUAGE_MODEL, "tokens_used": result.get('usage', {}).get('total_tokens', 0), "success": True } else: error_msg = f"DeepSeek API returned {response.status_code}: {response.text}" ai_logger.error(error_msg) return {"error": error_msg, "success": False} except Exception as e: error_msg = f"DeepSeek API error: {e}" ai_logger.error(error_msg) return {"error": error_msg, "success": False} def clean_ai_response(self, response: str) -> str: """Clean and enhance AI response""" if not response: return "" # Remove unwanted prefixes/suffixes cleaned = response.strip() # Remove common AI artifacts artifacts = [ "AI Assistant:", "Assistant:", "AI:", "Bot:", "Response:", "Here's my response:", "I would say:", "My response is:" ] for artifact in artifacts: if cleaned.startswith(artifact): cleaned = cleaned[len(artifact):].strip() # Ensure proper punctuation if cleaned and not cleaned.endswith(('.', '!', '?')): cleaned += '.' # Ensure it sounds like Jay's business if "I can help" not in cleaned and "I'd be happy" not in cleaned: # Make it more personal if too robotic pass return cleaned def generate_with_llama(self, prompt: str, context: Dict = None) -> str: """Generate response using HuggingFace LLaMA with context""" try: if not self.check_rate_limit('llama'): return None api_key = self.api_keys.get('llama') if not api_key: return None headers = {"Authorization": f"Bearer {api_key}"} # Enhanced prompt for LLaMA with context context_info = "" if context and context.get('likely_forwarded'): context_info = " (Note: Customer's call was forwarded from Jay's phone)" formatted_prompt = f"""[INST] You are a helpful assistant for Jay's Mobile Wash, a professional mobile car detailing service. Services: Basic wash ($25), Premium wash ($45), Full detail ($85), Ceramic coating ($150), Headlight restoration ($35). Customer question{context_info}: {prompt} Respond naturally and helpfully as Jay's assistant. [/INST]""" response = requests.post( self.api_endpoints['llama'], headers=headers, json={ "inputs": formatted_prompt, "parameters": { "max_new_tokens": 180, "temperature": 0.6, "top_p": 0.9, "do_sample": True, "return_full_text": False, "stop": ["", "[INST]", "[/INST]", "\n\nCustomer:", "\n\nUser:"] } }, timeout=config.AI_RESPONSE_TIMEOUT ) if response.status_code == 200: result = response.json() self.usage_stats['llama'] += 1 if isinstance(result, list) and len(result) > 0: generated_text = result[0].get('generated_text', '') # Clean up the response if '[/INST]' in generated_text: generated_text = generated_text.split('[/INST]')[-1].strip() return self.clean_ai_response(generated_text) except Exception as e: ai_logger.error(f"LLaMA generation error: {e}") return None def generate_with_gpt_neo(self, prompt: str, context: Dict = None) -> str: """Generate response using HuggingFace GPT-Neo with context""" try: if not self.check_rate_limit('gpt_neo'): return None api_key = self.api_keys.get('gpt_neo') if not api_key: return None headers = {"Authorization": f"Bearer {api_key}"} # Create context-aware prompt context_note = "" if context and context.get('likely_forwarded'): context_note = " [Customer called Jay's phone, forwarded to AI]" context_prompt = f"""Customer Service for Jay's Mobile Wash - Professional Mobile Car Detailing Services: Basic wash ($25), Premium ($45), Full detail ($85), Ceramic coating ($150) Business hours: Mon-Sat 8AM-6PM, Sun 10AM-4PM Service area: 15 mile radius Customer{context_note}: {prompt} Representative:""" response = requests.post( self.api_endpoints['gpt_neo'], headers=headers, json={ "inputs": context_prompt, "parameters": { "max_new_tokens": 150, "temperature": 0.7, "top_p": 0.9, "do_sample": True, "return_full_text": False, "stop": ["\n", "Customer:", "Representative:", "\n\n"] } }, timeout=config.AI_RESPONSE_TIMEOUT ) if response.status_code == 200: result = response.json() self.usage_stats['gpt_neo'] += 1 if isinstance(result, list) and len(result) > 0: generated_text = result[0].get('generated_text', '') # Clean up the response clean_text = generated_text.replace(context_prompt, '').strip() return self.clean_ai_response(clean_text) except Exception as e: ai_logger.error(f"GPT-Neo generation error: {e}") return None def generate_rule_based_fallback(self, prompt: str, context: Dict = None) -> str: """Generate rule-based fallback response""" prompt_lower = prompt.lower() # Check for forwarding context forwarding_acknowledgment = "" if context and context.get('likely_forwarded'): forwarding_acknowledgment = "Thank you for your patience while the call was being connected. " # Rule-based responses for common scenarios if any(word in prompt_lower for word in ['price', 'cost', 'how much']): return f"{forwarding_acknowledgment}I'd be happy to help with pricing! Our basic wash is $25, premium wash is $45, and full detail is $85. Which service interests you most?" elif any(word in prompt_lower for word in ['book', 'schedule', 'appointment']): return f"{forwarding_acknowledgment}I can help you schedule an appointment! We're available Monday through Saturday 8AM-6PM, and Sunday 10AM-4PM. What day works best for you?" elif any(word in prompt_lower for word in ['service', 'wash', 'detail']): return f"{forwarding_acknowledgment}We offer several services: basic wash ($25), premium wash with wax ($45), full interior and exterior detail ($85), and ceramic coating ($150). Which would you like to know more about?" elif any(word in prompt_lower for word in ['location', 'where', 'area']): return f"{forwarding_acknowledgment}We provide mobile service within a 15-mile radius. I can come to your location! Where are you located?" elif any(word in prompt_lower for word in ['urgent', 'emergency', 'asap']): return f"{forwarding_acknowledgment}I understand this is urgent. Let me connect you with Jay directly right away for immediate assistance." elif any(word in prompt_lower for word in ['frustrated', 'angry', 'complaint', 'problem']): return f"{forwarding_acknowledgment}I sincerely apologize for any inconvenience. Let me connect you with Jay personally to make sure we take care of this properly." else: return f"{forwarding_acknowledgment}Thanks for contacting Jay's Mobile Wash! I'm here to help with scheduling, pricing, or any questions about our mobile car detailing services. How can I assist you today?" def post_process_response(self, response: str, context: Dict = None) -> str: """Post-process AI response based on context""" if not response: return response processed = response # Add forwarding acknowledgment if missing and context indicates forwarding if context and context.get('likely_forwarded'): forwarding_phrases = ['thank you for your patience', 'sorry for the wait', 'thanks for holding'] if not any(phrase in processed.lower() for phrase in forwarding_phrases): # Add subtle acknowledgment if not processed.startswith(('Thank', 'I appreciate', 'Thanks')): processed = f"Thanks for your patience. {processed}" # Ensure call-to-action for bookings if 'book' in processed.lower() or 'schedule' in processed.lower(): if '?' not in processed: processed += " When would work best for you?" # Ensure contact information is provided when relevant if 'call' in processed.lower() and 'jay' in processed.lower(): if config.JAY_PHONE not in processed: processed += f" You can reach Jay directly at {config.JAY_PHONE}." return processed def assess_response_quality(self, response_text: str, original_prompt: str, context: Dict = None) -> float: """Enhanced response quality assessment""" if not response_text or len(response_text.strip()) < 10: return 0.0 quality_score = 0.5 # Base score # Length appropriateness (not too short, not too long) word_count = len(response_text.split()) if 15 <= word_count <= 80: quality_score += 0.15 elif word_count < 8: quality_score -= 0.2 elif word_count > 120: quality_score -= 0.1 # Business relevance business_keywords = [ 'wash', 'detail', 'service', 'clean', 'price', 'book', 'schedule', 'jay', 'mobile', 'car', 'vehicle', 'appointment', 'available' ] keyword_matches = sum(1 for keyword in business_keywords if keyword.lower() in response_text.lower()) quality_score += min(keyword_matches * 0.05, 0.25) # Professionalism and politeness polite_phrases = [ 'please', 'thank you', 'thanks', 'happy to help', 'i\'d be happy', 'would you like', 'let me', 'i can help', 'i\'ll be glad' ] polite_matches = sum(1 for phrase in polite_phrases if phrase.lower() in response_text.lower()) quality_score += min(polite_matches * 0.08, 0.2) # Context awareness (iPhone forwarding) if context and context.get('likely_forwarded'): forwarding_awareness = [ 'patience', 'wait', 'holding', 'connected', 'transferred' ] if any(phrase in response_text.lower() for phrase in forwarding_awareness): quality_score += 0.15 # Question answering relevance prompt_lower = original_prompt.lower() response_lower = response_text.lower() # Check if response addresses the prompt topic if 'price' in prompt_lower and any(word in response_lower for word in ['$', 'price', 'cost']): quality_score += 0.1 if 'book' in prompt_lower and any(word in response_lower for word in ['schedule', 'appointment', 'available']): quality_score += 0.1 if 'service' in prompt_lower and any(word in response_lower for word in ['wash', 'detail', 'clean']): quality_score += 0.1 # Penalty for generic responses generic_phrases = [ 'how can i help', 'what can i do', 'i\'m here to assist', 'please let me know', 'feel free to ask' ] generic_matches = sum(1 for phrase in generic_phrases if phrase.lower() in response_text.lower()) quality_score -= min(generic_matches * 0.05, 0.15) # Penalty for AI artifacts ai_artifacts = [ 'as an ai', 'i\'m an artificial', 'as a language model', 'i don\'t have access', 'i cannot provide' ] if any(artifact in response_lower for artifact in ai_artifacts): quality_score -= 0.3 # Bonus for specific pricing or service information pricing_mentioned = any(price in response_text for price in ['$25', '$45', '$85', '$150', '$35']) if pricing_mentioned: quality_score += 0.1 # Bonus for call-to-action cta_phrases = [ 'when would work', 'what day', 'which service', 'would you like', 'let me know', 'feel free to call' ] if any(phrase in response_lower for phrase in cta_phrases): quality_score += 0.05 return max(0.0, min(1.0, quality_score)) def check_all_api_health(self) -> Dict[str, Dict]: """Check health of all APIs""" health_results = {} # Check SignalWire try: start_time = time.time() # Simple SignalWire API test account = signalwire_client.api.accounts(config.SIGNALWIRE_PROJECT_ID).fetch() response_time = time.time() - start_time health_results['signalwire'] = { 'status': 'healthy', 'response_time': response_time, 'last_check': datetime.now() } except Exception as e: health_results['signalwire'] = { 'status': 'unhealthy', 'error': str(e), 'response_time': 0, 'last_check': datetime.now() } # Check DeepSeek if config.DEEPSEEK_API_KEY: try: start_time = time.time() headers = { "Authorization": f"Bearer {config.DEEPSEEK_API_KEY}", "Content-Type": "application/json" } response = requests.post( "https://api.deepseek.com/v1/chat/completions", headers=headers, json={ "model": "deepseek-chat", "messages": [{"role": "user", "content": "Test"}], "max_tokens": 5 }, timeout=10 ) response_time = time.time() - start_time health_results['deepseek'] = { 'status': 'healthy' if response.status_code == 200 else 'unhealthy', 'response_time': response_time, 'last_check': datetime.now() } except Exception as e: health_results['deepseek'] = { 'status': 'unhealthy', 'error': str(e), 'response_time': 0, 'last_check': datetime.now() } # Check HuggingFace APIs (sample a few key ones) key_hf_apis = ['whisper', 'distilbert_sentiment', 'llama'] for api_name in key_hf_apis: api_key = self.api_keys.get(api_name) if api_key: try: start_time = time.time() headers = {"Authorization": f"Bearer {api_key}"} # Simple test request response = requests.get( self.api_endpoints[api_name], headers=headers, timeout=5 ) response_time = time.time() - start_time health_results[f'huggingface_{api_name}'] = { 'status': 'healthy' if response.status_code in [200, 503] else 'unhealthy', 'response_time': response_time, 'last_check': datetime.now() } except Exception as e: health_results[f'huggingface_{api_name}'] = { 'status': 'unhealthy', 'error': str(e), 'response_time': 0, 'last_check': datetime.now() } # Update system state system_state.set('api_health', health_results) return health_results def get_usage_stats(self) -> Dict[str, Any]: """Get comprehensive usage statistics""" return { 'api_usage': dict(self.usage_stats), 'performance_metrics': dict(self.performance_metrics), 'health_status': dict(self.health_status), 'cache_stats': { 'redis_available': self.redis_client is not None, 'memory_cache_size': len(self.response_cache), 'cache_hit_rate': 0.0 # Would be calculated based on actual cache hits }, 'rate_limits': { api: {'count': data['count'], 'reset_time': data['reset_time'].isoformat()} for api, data in self.rate_limits.items() } } # Initialize global API processor api_processor_global = AdvancedAPIProcessor() # ===== DRIP LEARNβ„’ SYSTEM ===== class DripLearnSystem: """Enhanced learning system with iPhone forwarding awareness""" def __init__(self): self.knowledge_base = { 'services': config.get_service_pricing(), 'business_info': { 'owner': 'Jay', 'name': config.BUSINESS_NAME, 'phone': config.BUSINESS_PHONE, 'email': config.BUSINESS_EMAIL, 'service_radius': config.SERVICE_RADIUS_MILES, 'address': config.BUSINESS_ADDRESS }, 'common_responses': {}, 'corrections': {}, 'patterns': {}, 'forwarding_scenarios': {} } self.learning_queue = deque(maxlen=1000) self.training_sessions = {} self.improvement_metrics = { 'total_corrections': 0, 'accuracy_improvements': 0, 'customer_satisfaction_boost': 0, 'response_time_optimization': 0 } # Initialize learning patterns self.init_learning_patterns() # Start background learning process self.start_background_learning() def init_learning_patterns(self): """Initialize learning patterns for iPhone forwarding scenarios""" self.knowledge_base['forwarding_scenarios'] = { 'customer_frustrated_forwarded': { 'triggers': ['tried calling', 'no answer', 'busy', 'voicemail', 'waited'], 'response_pattern': 'acknowledge_wait_apologize_help', 'escalation_threshold': 0.7 }, 'urgent_forwarded_call': { 'triggers': ['urgent', 'emergency', 'asap', 'now', 'stuck'], 'response_pattern': 'immediate_assistance_jay_connection', 'escalation_threshold': 0.9 }, 'repeat_caller_forwarded': { 'triggers': ['called again', 'second time', 'multiple times'], 'response_pattern': 'priority_service_acknowledgment', 'escalation_threshold': 0.8 }, 'pricing_inquiry_forwarded': { 'triggers': ['price', 'cost', 'how much'], 'response_pattern': 'immediate_pricing_with_booking', 'escalation_threshold': 0.3 } } self.knowledge_base['common_responses'] = { 'acknowledge_wait_apologize_help': [ "Thank you for your patience while the call was being connected. I sincerely apologize for any wait time. I'm here to help you right away!", "I appreciate you waiting while the call transferred. Sorry about that delay - let me take care of you immediately!", "Thanks for holding while I got connected. I apologize for the wait, and I'm ready to help you now!" ], 'immediate_assistance_jay_connection': [ "I understand this is urgent. Let me connect you with Jay directly right now for immediate assistance.", "This sounds urgent - I'm connecting you with Jay immediately to make sure you get the help you need right away.", "I can see this needs immediate attention. Let me get Jay on the line for you right now." ], 'priority_service_acknowledgment': [ "I see you've called before - thank you for your patience. Let me give you priority service and take care of this right away.", "Since you've tried reaching us multiple times, you're getting priority attention. How can I help you immediately?", "I appreciate your persistence in reaching us. You have my full attention now - what can I do for you?" ], 'immediate_pricing_with_booking': [ "I'd be happy to help with pricing! Our basic wash is $25, premium is $45, and full detail is $85. I can book you right now - which service interests you?", "Absolutely! Here's our pricing: Basic $25, Premium $45, Full Detail $85, Ceramic Coating $150. When would you like to schedule?", "Great question! We offer Basic wash ($25), Premium wash ($45), Full detail ($85), and Ceramic coating ($150). Ready to book today?" ] } def start_background_learning(self): """Start background learning process""" def learning_worker(): while True: try: self.process_learning_queue() self.update_knowledge_patterns() time.sleep(60) # Process every minute except Exception as e: logger.error(f"Background learning error: {e}") time.sleep(30) learning_thread = Thread(target=learning_worker, daemon=True) learning_thread.start() training_logger.info("βœ… Background learning system started") def learn_from_interaction(self, interaction_data: Dict) -> Dict: """Learn from customer interaction with iPhone forwarding context""" try: learning_entry = { 'timestamp': datetime.now(), 'interaction_type': interaction_data.get('type', 'unknown'), 'customer_input': interaction_data.get('input', ''), 'ai_response': interaction_data.get('ai_response', ''), 'human_correction': interaction_data.get('correction', ''), 'context': interaction_data.get('context', {}), 'sentiment_score': interaction_data.get('sentiment', 0.0), 'quality_score': interaction_data.get('quality', 0.0), 'forwarding_context': interaction_data.get('forwarding_context', {}), 'customer_satisfaction': interaction_data.get('satisfaction', 5.0) } # Add to learning queue self.learning_queue.append(learning_entry) # Immediate learning for high-impact scenarios if self.should_learn_immediately(learning_entry): improvement = self.apply_immediate_learning(learning_entry) return improvement return {'status': 'queued_for_learning', 'immediate': False} except Exception as e: training_logger.error(f"Learning from interaction error: {e}") return {'status': 'error', 'error': str(e)} def should_learn_immediately(self, learning_entry: Dict) -> bool: """Determine if learning should be applied immediately""" # Learn immediately for: # 1. Customer corrections if learning_entry.get('human_correction'): return True # 2. Low quality responses if learning_entry.get('quality_score', 1.0) < 0.4: return True # 3. Very negative sentiment if learning_entry.get('sentiment_score', 0.0) < -0.7: return True # 4. Low customer satisfaction if learning_entry.get('customer_satisfaction', 5.0) < 3.0: return True # 5. iPhone forwarding issues forwarding_context = learning_entry.get('forwarding_context', {}) if forwarding_context.get('customer_mood') == 'frustrated': return True return False def apply_immediate_learning(self, learning_entry: Dict) -> Dict: """Apply immediate learning from interaction""" try: improvements = { 'pattern_updates': [], 'response_improvements': [], 'context_enhancements': [] } customer_input = learning_entry.get('customer_input', '') ai_response = learning_entry.get('ai_response', '') correction = learning_entry.get('human_correction', '') context = learning_entry.get('context', {}) # Pattern learning if customer_input and ai_response: pattern_key = self.extract_pattern_key(customer_input, context) if pattern_key not in self.knowledge_base['patterns']: self.knowledge_base['patterns'][pattern_key] = { 'examples': [], 'best_responses': [], 'success_rate': 0.0, 'context_factors': [] } pattern = self.knowledge_base['patterns'][pattern_key] pattern['examples'].append({ 'input': customer_input, 'response': ai_response, 'correction': correction, 'quality': learning_entry.get('quality_score', 0.0), 'context': context, 'timestamp': learning_entry['timestamp'] }) improvements['pattern_updates'].append(pattern_key) # Response improvement learning if correction and ai_response: improvement = self.learn_response_improvement( ai_response, correction, customer_input, context ) improvements['response_improvements'].append(improvement) # iPhone forwarding context learning forwarding_context = learning_entry.get('forwarding_context', {}) if forwarding_context: context_improvement = self.learn_forwarding_context( customer_input, ai_response, forwarding_context ) improvements['context_enhancements'].append(context_improvement) # Update improvement metrics self.improvement_metrics['total_corrections'] += 1 if correction: self.improvement_metrics['accuracy_improvements'] += 1 # Store learning data in database self.store_learning_data(learning_entry, improvements) training_logger.info(f"Applied immediate learning: {len(improvements['pattern_updates'])} patterns updated") return { 'status': 'learning_applied', 'improvements': improvements, 'immediate': True, 'timestamp': learning_entry['timestamp'] } except Exception as e: training_logger.error(f"Immediate learning application error: {e}") return {'status': 'error', 'error': str(e)} def extract_pattern_key(self, customer_input: str, context: Dict) -> str: """Extract pattern key from customer input and context""" # Analyze input for key patterns input_lower = customer_input.lower() # Service type patterns if any(word in input_lower for word in ['price', 'cost', 'how much']): pattern_type = 'pricing_inquiry' elif any(word in input_lower for word in ['book', 'schedule', 'appointment']): pattern_type = 'booking_request' elif any(word in input_lower for word in ['service', 'wash', 'detail']): pattern_type = 'service_inquiry' elif any(word in input_lower for word in ['location', 'where', 'area']): pattern_type = 'location_inquiry' elif any(word in input_lower for word in ['complaint', 'problem', 'issue']): pattern_type = 'complaint' elif any(word in input_lower for word in ['urgent', 'emergency', 'asap']): pattern_type = 'urgent_request' else: pattern_type = 'general_inquiry' # Add context modifiers context_modifiers = [] if context.get('likely_forwarded'): context_modifiers.append('forwarded') if context.get('customer_mood') == 'frustrated': context_modifiers.append('frustrated') if context.get('urgency_level') == 'high': context_modifiers.append('urgent') # Combine pattern type with context if context_modifiers: pattern_key = f"{pattern_type}_{'_'.join(context_modifiers)}" else: pattern_key = pattern_type return pattern_key def learn_response_improvement(self, original_response: str, correction: str, customer_input: str, context: Dict) -> Dict: """Learn from response corrections""" try: improvement_data = { 'original_response': original_response, 'corrected_response': correction, 'customer_input': customer_input, 'context': context, 'improvement_type': self.classify_improvement_type(original_response, correction), 'quality_gain': self.calculate_quality_gain(original_response, correction), 'timestamp': datetime.now() } # Store in corrections database correction_key = hashlib.md5(f"{customer_input}_{context}".encode()).hexdigest()[:16] self.knowledge_base['corrections'][correction_key] = improvement_data # Update common responses if applicable improvement_type = improvement_data['improvement_type'] if improvement_type in ['tone_improvement', 'context_awareness', 'accuracy_improvement']: self.update_common_responses(correction, context) return improvement_data except Exception as e: training_logger.error(f"Response improvement learning error: {e}") return {'error': str(e)} def learn_forwarding_context(self, customer_input: str, ai_response: str, forwarding_context: Dict) -> Dict: """Learn from iPhone forwarding context""" try: context_learning = { 'customer_input': customer_input, 'ai_response': ai_response, 'forwarding_context': forwarding_context, 'learning_points': [], 'timestamp': datetime.now() } # Analyze forwarding-specific patterns if forwarding_context.get('likely_forwarded'): context_learning['learning_points'].append('call_forwarded') # Learn acknowledgment patterns if 'patience' not in ai_response.lower() and 'wait' not in ai_response.lower(): context_learning['learning_points'].append('missing_wait_acknowledgment') if forwarding_context.get('customer_mood') == 'frustrated': context_learning['learning_points'].append('customer_frustrated') # Learn empathy patterns if 'sorry' not in ai_response.lower() and 'apologize' not in ai_response.lower(): context_learning['learning_points'].append('missing_empathy') if forwarding_context.get('urgency_level') == 'high': context_learning['learning_points'].append('high_urgency') # Learn escalation patterns if 'jay' not in ai_response.lower() and 'connect' not in ai_response.lower(): context_learning['learning_points'].append('missing_escalation_offer') # Update forwarding scenarios knowledge scenario_key = self.determine_forwarding_scenario(forwarding_context) if scenario_key: self.update_forwarding_scenario(scenario_key, context_learning) return context_learning except Exception as e: training_logger.error(f"Forwarding context learning error: {e}") return {'error': str(e)} def classify_improvement_type(self, original: str, correction: str) -> str: """Classify the type of improvement made""" original_lower = original.lower() correction_lower = correction.lower() # Tone improvement polite_words = ['please', 'thank you', 'sorry', 'apologize', 'appreciate'] if any(word in correction_lower for word in polite_words) and not any(word in original_lower for word in polite_words): return 'tone_improvement' # Context awareness context_words = ['patience', 'wait', 'forwarded', 'transferred', 'connect'] if any(word in correction_lower for word in context_words) and not any(word in original_lower for word in context_words): return 'context_awareness' # Accuracy improvement business_terms = ['$25', '$45', '$85', '$150', 'basic', 'premium', 'full', 'ceramic'] if any(term in correction_lower for term in business_terms) and not any(term in original_lower for term in business_terms): return 'accuracy_improvement' # Length adjustment if abs(len(correction.split()) - len(original.split())) > 5: return 'length_adjustment' return 'general_improvement' def calculate_quality_gain(self, original: str, correction: str) -> float: """Calculate quality improvement from correction""" try: # Use API processor to assess both responses original_quality = api_processor_global.assess_response_quality(original, "test prompt") correction_quality = api_processor_global.assess_response_quality(correction, "test prompt") return max(0.0, correction_quality - original_quality) except: return 0.1 # Default small improvement def update_common_responses(self, correction: str, context: Dict): """Update common responses based on corrections""" try: # Determine response category if context.get('likely_forwarded'): if 'patience' in correction.lower() or 'wait' in correction.lower(): category = 'acknowledge_wait_apologize_help' elif 'urgent' in correction.lower() or 'jay' in correction.lower(): category = 'immediate_assistance_jay_connection' else: category = 'general_forwarded_response' else: category = 'general_response' # Add to common responses if it's high quality if category not in self.knowledge_base['common_responses']: self.knowledge_base['common_responses'][category] = [] # Add if not already present and limit to 5 best responses per category responses = self.knowledge_base['common_responses'][category] if correction not in responses: responses.append(correction) if len(responses) > 5: responses.pop(0) # Remove oldest except Exception as e: training_logger.error(f"Common responses update error: {e}") def determine_forwarding_scenario(self, forwarding_context: Dict) -> str: """Determine which forwarding scenario applies""" if forwarding_context.get('urgency_level') == 'high': return 'urgent_forwarded_call' elif forwarding_context.get('customer_mood') == 'frustrated': return 'customer_frustrated_forwarded' elif forwarding_context.get('forwarding_reason') == 'customer_indication': return 'repeat_caller_forwarded' elif forwarding_context.get('likely_forwarded'): return 'general_forwarded_call' else: return None def update_forwarding_scenario(self, scenario_key: str, context_learning: Dict): """Update forwarding scenario knowledge""" try: if scenario_key not in self.knowledge_base['forwarding_scenarios']: self.knowledge_base['forwarding_scenarios'][scenario_key] = { 'triggers': [], 'response_pattern': 'general_help', 'escalation_threshold': 0.5, 'examples': [] } scenario = self.knowledge_base['forwarding_scenarios'][scenario_key] scenario['examples'].append(context_learning) # Update triggers based on learning points for point in context_learning.get('learning_points', []): if point not in scenario['triggers']: scenario['triggers'].append(point) # Adjust escalation threshold based on outcomes if 'missing_escalation_offer' in context_learning.get('learning_points', []): scenario['escalation_threshold'] = max(0.3, scenario['escalation_threshold'] - 0.1) except Exception as e: training_logger.error(f"Forwarding scenario update error: {e}") def process_learning_queue(self): """Process the learning queue for batch learning""" try: if not self.learning_queue: return # Process up to 10 items per batch batch_size = min(10, len(self.learning_queue)) batch = [self.learning_queue.popleft() for _ in range(batch_size)] for learning_entry in batch: if not self.should_learn_immediately(learning_entry): # Apply delayed learning for non-critical items self.apply_delayed_learning(learning_entry) training_logger.info(f"Processed {batch_size} learning queue items") except Exception as e: training_logger.error(f"Learning queue processing error: {e}") def apply_delayed_learning(self, learning_entry: Dict): """Apply learning for non-critical items""" try: # Update pattern statistics pattern_key = self.extract_pattern_key( learning_entry.get('customer_input', ''), learning_entry.get('context', {}) ) if pattern_key in self.knowledge_base['patterns']: pattern = self.knowledge_base['patterns'][pattern_key] # Update success rate quality_score = learning_entry.get('quality_score', 0.0) current_rate = pattern.get('success_rate', 0.0) example_count = len(pattern.get('examples', [])) new_rate = (current_rate * example_count + quality_score) / (example_count + 1) pattern['success_rate'] = new_rate # Store in database for analysis self.store_learning_data(learning_entry, {'type': 'delayed_learning'}) except Exception as e: training_logger.error(f"Delayed learning error: {e}") def update_knowledge_patterns(self): """Update knowledge patterns based on accumulated learning""" try: # Analyze patterns for improvements for pattern_key, pattern_data in self.knowledge_base['patterns'].items(): examples = pattern_data.get('examples', []) if len(examples) < 5: # Need minimum examples continue # Calculate average quality avg_quality = sum(ex.get('quality', 0.0) for ex in examples) / len(examples) pattern_data['average_quality'] = avg_quality # Identify best practices best_examples = [ex for ex in examples if ex.get('quality', 0.0) > 0.8] if best_examples: pattern_data['best_practices'] = best_examples[-3:] # Keep 3 best recent examples # Update success rate recent_examples = examples[-10:] # Last 10 examples if recent_examples: recent_quality = sum(ex.get('quality', 0.0) for ex in recent_examples) / len(recent_examples) pattern_data['recent_success_rate'] = recent_quality training_logger.info("Knowledge patterns updated") except Exception as e: training_logger.error(f"Knowledge pattern update error: {e}") def store_learning_data(self, learning_entry: Dict, improvements: Dict): """Store learning data in database""" try: if SessionLocal is None: return db = SessionLocal() try: learning_record = LearningData( interaction_type=learning_entry.get('interaction_type', 'unknown'), customer_phone=learning_entry.get('context', {}).get('from_number', ''), input_text=learning_entry.get('customer_input', ''), bot_response=learning_entry.get('ai_response', ''), human_correction=learning_entry.get('human_correction', ''), sentiment_score=learning_entry.get('sentiment_score', 0.0), confidence_score=learning_entry.get('quality_score', 0.0), approved=True, correction_applied=bool(learning_entry.get('human_correction')), improvement_score=improvements.get('quality_gain', 0.0), api_used=learning_entry.get('api_used', 'unknown'), response_quality=learning_entry.get('quality_score', 0.0), learning_category=self.extract_pattern_key( learning_entry.get('customer_input', ''), learning_entry.get('context', {}) ), context_data=json.dumps(learning_entry.get('context', {})) ) db.add(learning_record) db.commit() finally: db.close() except Exception as e: training_logger.error(f"Learning data storage error: {e}") def get_best_response(self, customer_input: str, context: Dict = None) -> str: """Get best response based on learned patterns""" try: pattern_key = self.extract_pattern_key(customer_input, context or {}) # Check if we have learned patterns for this scenario if pattern_key in self.knowledge_base['patterns']: pattern = self.knowledge_base['patterns'][pattern_key] best_practices = pattern.get('best_practices', []) if best_practices: # Return the most recent best practice response return best_practices[-1].get('response', '') # Check forwarding scenarios if context and context.get('likely_forwarded'): scenario_key = self.determine_forwarding_scenario(context.get('forwarding_context', {})) if scenario_key and scenario_key in self.knowledge_base['forwarding_scenarios']: scenario = self.knowledge_base['forwarding_scenarios'][scenario_key] response_pattern = scenario.get('response_pattern', '') if response_pattern in self.knowledge_base['common_responses']: responses = self.knowledge_base['common_responses'][response_pattern] if responses: # Return the most recent/best response return responses[-1] return None # No learned response available except Exception as e: training_logger.error(f"Best response retrieval error: {e}") return None def get_learning_metrics(self) -> Dict[str, Any]: """Get comprehensive learning metrics""" return { 'improvement_metrics': self.improvement_metrics.copy(), 'knowledge_base_stats': { 'total_patterns': len(self.knowledge_base['patterns']), 'total_corrections': len(self.knowledge_base['corrections']), 'forwarding_scenarios': len(self.knowledge_base['forwarding_scenarios']), 'common_responses': sum(len(responses) for responses in self.knowledge_base['common_responses'].values()) }, 'learning_queue_size': len(self.learning_queue), 'training_sessions': len(self.training_sessions), 'learning_enabled': system_state.get('learning_enabled', True) } # Initialize global learning system drip_learn_system = DripLearnSystem() # ===== VOICE WEBHOOK HANDLERS (IPHONE FORWARDING) ===== @app.route('/voice/incoming', methods=['POST']) def handle_incoming_voice(): """πŸ”₯ ENHANCED: Handle incoming voice calls with iPhone forwarding detection""" try: # Extract call information call_sid = request.form.get('CallSid') from_number = request.form.get('From') to_number = request.form.get('To') call_status = request.form.get('CallStatus', 'ringing') forwarded_from = request.form.get('ForwardedFrom') # SignalWire provides this for forwarded calls # Determine if this is a forwarded call is_forwarded = bool(forwarded_from) or (to_number == config.SIGNALWIRE_PHONE) forwarding_reason = 'system_forwarded' if is_forwarded else None call_logger.info(f"πŸ“ž INCOMING CALL: {from_number} β†’ {to_number} (Forwarded: {is_forwarded})") # Create call context call_context = { 'call_sid': call_sid, 'from_number': from_number, 'to_number': to_number, 'forwarded_from': forwarded_from, 'forwarding_reason': forwarding_reason, 'likely_forwarded': is_forwarded, 'timestamp': datetime.now(), 'system_state': { 'jay_phone': config.JAY_PHONE, 'ai_phone': config.SIGNALWIRE_PHONE, 'forwarding_enabled': config.IPHONE_FORWARDING_ENABLED } } # Store call data call_data = { 'call_sid': call_sid, 'from_number': from_number, 'to_number': to_number, 'forwarded_from': forwarded_from, 'forwarding_reason': forwarding_reason, 'start_time': datetime.now(), 'handled_by': 'ai', 'call_status': 'ai_handling', 'forwarding_status': 'forwarded' if is_forwarded else 'direct' } # Update system state system_state.set('total_calls_today', system_state.get('total_calls_today') + 1) if is_forwarded: system_state.set('calls_forwarded_today', system_state.get('calls_forwarded_today') + 1) system_state.get('active_calls')[call_sid] = call_data # Store in database if SessionLocal: db = SessionLocal() try: call_record = Call( call_sid=call_sid, from_number=from_number, to_number=to_number, forwarded_from=forwarded_from, forwarding_reason=forwarding_reason, handled_by='ai', call_status='ai_handling', forwarding_status='forwarded' if is_forwarded else 'direct' ) db.add(call_record) db.commit() finally: db.close() # Update Prometheus metrics CALL_COUNT.labels(type='voice', status='received').inc() if is_forwarded: CALL_COUNT.labels(type='voice', status='forwarded').inc() # Create SignalWire response response = VoiceResponse() # Determine greeting based on forwarding context if is_forwarded: if forwarded_from == config.JAY_PHONE: greeting = f"Hi! Thanks for calling Jay's Mobile Wash. I'm Jay's AI assistant, and I'm here to help while Jay is with another customer. How can I assist you today?" else: greeting = f"Hello! You've reached Jay's Mobile Wash AI assistant. I'm here to help you with our mobile car detailing services. What can I do for you?" else: greeting = f"Hi! Welcome to Jay's Mobile Wash! I'm Jay's AI assistant, ready to help with scheduling, pricing, or any questions about our mobile car detailing services. How can I help you today?" # Speak greeting and start gathering input response.say(greeting, voice='alice', rate='medium') # Gather customer speech gather = response.gather( input='speech', timeout=10, action=f'/voice/process_speech?call_sid={call_sid}', method='POST', speech_timeout=3, speech_model='experimental_conversations' ) # Fallback if no speech detected response.say("I didn't catch that. Let me ask again - how can I help you with your car detailing needs today?", voice='alice') # Second attempt response.gather( input='speech', timeout=8, action=f'/voice/process_speech?call_sid={call_sid}', method='POST', speech_timeout=2 ) # Final fallback - offer to connect to Jay response.say("I'm having trouble hearing you clearly. Let me connect you with Jay directly.", voice='alice') response.dial(config.JAY_PHONE, timeout=30) return str(response) except Exception as e: call_logger.error(f"❌ Incoming voice handler error: {e}") # Emergency fallback response response = VoiceResponse() response.say("I'm sorry, there's a technical issue. Let me connect you with Jay right away.", voice='alice') response.dial(config.JAY_PHONE, timeout=30) return str(response) @app.route('/voice/process_speech', methods=['POST']) def process_customer_speech(): """🎀 Process customer speech with advanced AI and iPhone forwarding awareness""" try: # Extract call information call_sid = request.args.get('call_sid') or request.form.get('CallSid') speech_result = request.form.get('SpeechResult', '') confidence = float(request.form.get('Confidence', '0.0')) recording_url = request.form.get('RecordingUrl') call_logger.info(f"🎀 SPEECH PROCESSING: Call {call_sid}, Confidence: {confidence:.2f}") # Get call context call_data = system_state.get('active_calls', {}).get(call_sid, {}) is_forwarded = call_data.get('forwarding_status') == 'forwarded' # Enhanced context for AI processing processing_context = { 'call_sid': call_sid, 'from_number': call_data.get('from_number'), 'likely_forwarded': is_forwarded, 'forwarding_reason': call_data.get('forwarding_reason'), 'recording_url': recording_url, 'confidence': confidence, 'timestamp': datetime.now() } # Process with Whisper if needed for better transcription if confidence < 0.7 and recording_url and api_processor_global.api_keys.get('whisper'): try: # Download and process audio with Whisper audio_data = download_recording(recording_url) whisper_result = api_processor_global.process_audio_with_whisper(audio_data, processing_context) if whisper_result.get('success') and whisper_result.get('confidence', 0) > confidence: speech_result = whisper_result['transcription'] confidence = whisper_result['confidence'] processing_context['enhanced_by_whisper'] = True call_logger.info(f"✨ Whisper enhanced transcription: {speech_result}") except Exception as e: ai_logger.warning(f"Whisper enhancement failed: {e}") if not speech_result or confidence < 0.3: return handle_low_confidence_speech(call_sid, confidence) # Clean and validate speech input cleaned_speech = clean_speech_input(speech_result) if not cleaned_speech: return handle_unclear_speech(call_sid) # Advanced AI processing with multiple APIs ai_start_time = time.time() # Analyze sentiment with forwarding context sentiment_result = api_processor_global.analyze_sentiment_advanced(cleaned_speech, processing_context) # Extract intent with forwarding awareness intent_result = api_processor_global.extract_intent_advanced(cleaned_speech, processing_context) # Extract keywords keywords_result = api_processor_global.extract_keywords_advanced(cleaned_speech, processing_context) # Create enhanced context for response generation enhanced_context = { **processing_context, 'sentiment': sentiment_result, 'intent': intent_result, 'keywords': keywords_result, 'customer_mood': 'frustrated' if sentiment_result.get('sentiment', 0) < -0.4 else 'neutral', 'urgency_level': 'high' if 'emergency' in intent_result.get('intent', '') else 'normal', 'forwarding_context': { 'likely_forwarded': is_forwarded, 'customer_mood': 'frustrated' if sentiment_result.get('sentiment', 0) < -0.4 else 'neutral', 'urgency_level': 'high' if 'emergency' in intent_result.get('intent', '') else 'normal' } } # Check for escalation triggers should_escalate = check_escalation_triggers( cleaned_speech, sentiment_result, intent_result, enhanced_context ) if should_escalate: return handle_escalation_to_jay(call_sid, cleaned_speech, enhanced_context) # Try to get learned response first learned_response = drip_learn_system.get_best_response(cleaned_speech, enhanced_context) if learned_response: ai_response_text = learned_response ai_response_data = { 'text': learned_response, 'api_used': 'learned_pattern', 'quality_score': 0.9, 'source': 'drip_learn' } else: # Generate response with multiple AI APIs ai_response_data = api_processor_global.generate_with_multiple_apis(cleaned_speech, enhanced_context) ai_response_text = ai_response_data.get('text', '') ai_processing_time = time.time() - ai_start_time # Validate AI response if not ai_response_text or len(ai_response_text.strip()) < 10: return handle_ai_failure(call_sid, cleaned_speech) # Update call data with AI interaction call_data.update({ 'ai_interactions': call_data.get('ai_interactions', 0) + 1, 'last_customer_input': cleaned_speech, 'last_ai_response': ai_response_text, 'sentiment_score': sentiment_result.get('sentiment', 0.0), 'intent_detected': intent_result.get('intent', 'unknown'), 'ai_confidence_score': ai_response_data.get('quality_score', 0.0), 'processing_time': ai_processing_time }) # Store interaction for learning interaction_data = { 'type': 'voice_call', 'input': cleaned_speech, 'ai_response': ai_response_text, 'context': enhanced_context, 'sentiment': sentiment_result.get('sentiment', 0.0), 'quality': ai_response_data.get('quality_score', 0.0), 'api_used': ai_response_data.get('api_used', 'unknown'), 'forwarding_context': enhanced_context.get('forwarding_context', {}), 'processing_time': ai_processing_time } # Learn from this interaction drip_learn_system.learn_from_interaction(interaction_data) # Update system metrics system_state.increment('ai_handled_calls') system_state.set('average_response_time', (system_state.get('average_response_time', 0.0) + ai_processing_time) / 2) # Update Prometheus metrics AI_PROCESSING_TIME.labels(api=ai_response_data.get('api_used', 'unknown')).observe(ai_processing_time) # Store in database store_voice_interaction(call_sid, cleaned_speech, ai_response_text, enhanced_context, ai_response_data) # Create voice response response = VoiceResponse() # Speak AI response response.say(ai_response_text, voice='alice', rate='medium') # Check if conversation should continue if should_continue_conversation(intent_result, ai_response_text): # Continue gathering input gather = response.gather( input='speech', timeout=12, action=f'/voice/process_speech?call_sid={call_sid}', method='POST', speech_timeout=4 ) # Prompt for more input response.say("Is there anything else I can help you with today?", voice='alice') # Final gather attempt response.gather( input='speech', timeout=8, action=f'/voice/process_speech?call_sid={call_sid}', method='POST', speech_timeout=3 ) # Conversation ending response.say("Thank you for calling Jay's Mobile Wash! Have a great day!", voice='alice') # Update call status call_data['call_status'] = 'completed' call_data['end_time'] = datetime.now() call_logger.info(f"βœ… Voice call completed: {call_sid}") return str(response) except Exception as e: call_logger.error(f"❌ Speech processing error: {e}") return handle_speech_processing_error(call_sid) def download_recording(recording_url: str) -> bytes: """Download call recording for processing""" try: response = requests.get(recording_url, timeout=30) response.raise_for_status() return response.content except Exception as e: ai_logger.error(f"Recording download failed: {e}") return b"" def clean_speech_input(speech_text: str) -> str: """Clean and validate speech input""" if not speech_text: return "" # Basic cleaning cleaned = speech_text.strip() # Remove common speech recognition artifacts artifacts = ['uh', 'um', 'er', 'ah'] words = cleaned.split() cleaned_words = [word for word in words if word.lower() not in artifacts] # Rejoin and validate result = ' '.join(cleaned_words) # Must have at least 2 words and reasonable length if len(result.split()) < 2 or len(result) > 500: return "" return result def check_escalation_triggers(speech: str, sentiment: Dict, intent: Dict, context: Dict) -> bool: """Check if call should be escalated to Jay""" # Explicit request for Jay if any(phrase in speech.lower() for phrase in ['talk to jay', 'speak to jay', 'get jay', 'jay please']): return True # Very negative sentiment if sentiment.get('sentiment', 0) < -0.7: return True # Emergency situations if intent.get('intent') == 'emergency': return True # Complex complaints if intent.get('intent') == 'complaint' and sentiment.get('sentiment', 0) < -0.5: return True # Customer frustrated from forwarding if context.get('likely_forwarded') and sentiment.get('sentiment', 0) < -0.3: return True return False def should_continue_conversation(intent: Dict, ai_response: str) -> bool: """Determine if conversation should continue""" # Continue for certain intent types continue_intents = ['pricing_inquiry', 'service_inquiry', 'scheduling_inquiry'] if intent.get('intent') in continue_intents: return True # Continue if response asks a question if '?' in ai_response: return True # Continue if booking process started if any(word in ai_response.lower() for word in ['book', 'schedule', 'when', 'available']): return True return False def handle_low_confidence_speech(call_sid: str, confidence: float) -> str: """Handle low confidence speech recognition""" call_logger.warning(f"Low confidence speech: {call_sid}, confidence: {confidence:.2f}") response = VoiceResponse() response.say("I'm sorry, I didn't catch that clearly. Could you please repeat what you need help with?", voice='alice') # Try again with longer timeout response.gather( input='speech', timeout=10, action=f'/voice/process_speech?call_sid={call_sid}', method='POST', speech_timeout=4 ) # Fallback to Jay response.say("I'm having trouble understanding. Let me connect you with Jay directly.", voice='alice') response.dial(config.JAY_PHONE, timeout=30) return str(response) def handle_unclear_speech(call_sid: str) -> str: """Handle unclear or empty speech""" response = VoiceResponse() response.say("I didn't quite understand that. Can you tell me how I can help you today?", voice='alice') response.gather( input='speech', timeout=8, action=f'/voice/process_speech?call_sid={call_sid}', method='POST', speech_timeout=3 ) response.say("Let me connect you with Jay for better assistance.", voice='alice') response.dial(config.JAY_PHONE, timeout=30) return str(response) def handle_escalation_to_jay(call_sid: str, customer_speech: str, context: Dict) -> str: """Handle escalation to Jay""" call_logger.info(f"πŸ”„ ESCALATING TO JAY: {call_sid}") # Update call data call_data = system_state.get('active_calls', {}).get(call_sid, {}) call_data.update({ 'escalated_to_human': True, 'escalation_reason': determine_escalation_reason(customer_speech, context), 'escalation_time': datetime.now() }) response = VoiceResponse() # Determine escalation message based on context if context.get('likely_forwarded'): escalation_msg = "I understand you've been waiting, and I want to make sure you get the best service. Let me connect you with Jay right now." elif 'emergency' in customer_speech.lower(): escalation_msg = "I can see this is urgent. I'm connecting you with Jay immediately." elif context.get('sentiment', {}).get('sentiment', 0) < -0.5: escalation_msg = "I want to make sure we take care of this properly. Let me get Jay on the line for you right away." else: escalation_msg = "Let me connect you with Jay directly to make sure you get exactly what you need." response.say(escalation_msg, voice='alice') # Attempt to connect to Jay dial = response.dial(config.JAY_PHONE, timeout=30, action=f'/voice/jay_unavailable?call_sid={call_sid}') return str(response) def determine_escalation_reason(speech: str, context: Dict) -> str: """Determine reason for escalation""" speech_lower = speech.lower() if 'emergency' in speech_lower: return 'emergency' elif any(word in speech_lower for word in ['jay', 'owner', 'manager']): return 'requested_human' elif context.get('sentiment', {}).get('sentiment', 0) < -0.7: return 'very_negative_sentiment' elif 'complaint' in speech_lower or 'problem' in speech_lower: return 'complaint' elif context.get('likely_forwarded') and context.get('sentiment', {}).get('sentiment', 0) < -0.3: return 'forwarded_frustrated' else: return 'general_escalation' def handle_ai_failure(call_sid: str, customer_speech: str) -> str: """Handle AI response failure""" ai_logger.error(f"AI response failure for call: {call_sid}") response = VoiceResponse() response.say("I apologize, but I'm having technical difficulties. Let me connect you with Jay right away.", voice='alice') response.dial(config.JAY_PHONE, timeout=30) return str(response) def handle_speech_processing_error(call_sid: str) -> str: """Handle speech processing errors""" call_logger.error(f"Speech processing error for call: {call_sid}") response = VoiceResponse() response.say("I'm sorry, there's a technical issue. Let me connect you with Jay immediately.", voice='alice') response.dial(config.JAY_PHONE, timeout=30) return str(response) def store_voice_interaction(call_sid: str, customer_input: str, ai_response: str, context: Dict, ai_data: Dict): """Store voice interaction in database""" try: if SessionLocal is None: return db = SessionLocal() try: # Update call record call_record = db.query(Call).filter(Call.call_sid == call_sid).first() if call_record: call_record.transcription = customer_input call_record.transcription_confidence = context.get('confidence', 0.0) call_record.sentiment_score = context.get('sentiment', {}).get('sentiment', 0.0) call_record.intent_detected = context.get('intent', {}).get('intent', 'unknown') call_record.ai_confidence_score = ai_data.get('quality_score', 0.0) call_record.escalated_to_human = context.get('escalated_to_human', False) db.commit() finally: db.close() except Exception as e: call_logger.error(f"Voice interaction storage error: {e}") @app.route('/voice/jay_unavailable', methods=['POST']) def handle_jay_unavailable(): """Handle when Jay is unavailable for escalated calls""" call_sid = request.args.get('call_sid') dial_status = request.form.get('DialStatus', 'no-answer') call_logger.warning(f"Jay unavailable for escalated call: {call_sid}, status: {dial_status}") response = VoiceResponse() if dial_status in ['busy', 'no-answer']: response.say( "Jay is currently unavailable, but I'll make sure he calls you back as soon as possible. " "You can also text us anytime, and we'll respond quickly. Thank you for your patience!", voice='alice' ) else: response.say( "I'm sorry, we're having trouble connecting you right now. " "Please try calling back in a few minutes, or send us a text message. " "We'll get back to you very quickly!", voice='alice' ) # Mark call as completed with escalation failure call_data = system_state.get('active_calls', {}).get(call_sid, {}) call_data.update({ 'call_status': 'escalation_failed', 'end_time': datetime.now(), 'notes': f'Escalation failed: {dial_status}' }) return str(response) # ===== SMS WEBHOOK HANDLERS ===== @app.route('/sms/incoming', methods=['POST']) def handle_incoming_sms(): """πŸ“± Handle incoming SMS with enhanced AI processing""" try: # Extract SMS information message_sid = request.form.get('MessageSid') from_number = request.form.get('From') to_number = request.form.get('To') message_body = request.form.get('Body', '') num_media = int(request.form.get('NumMedia', '0')) sms_logger.info(f"πŸ“± INCOMING SMS: {from_number} β†’ {to_number} | {message_body[:100]}...") # Get or create conversation ID conversation_id = get_or_create_conversation_id(from_number) # Create SMS context sms_context = { 'message_sid': message_sid, 'from_number': from_number, 'to_number': to_number, 'conversation_id': conversation_id, 'has_media': num_media > 0, 'timestamp': datetime.now() } # Update system state system_state.increment('total_sms_today') system_state.get('active_sms')[message_sid] = { 'from_number': from_number, 'conversation_id': conversation_id, 'timestamp': datetime.now(), 'status': 'processing' } # Basic input validation if not message_body.strip(): if num_media > 0: return handle_media_only_message(message_sid, sms_context) else: return send_sms_response(from_number, "Hi! I didn't receive any text. How can I help you with Jay's Mobile Wash today?") # Clean message cleaned_message = clean_sms_input(message_body) if not cleaned_message: return send_sms_response(from_number, "I didn't quite understand that. How can I help you with our mobile car detailing services?") # Advanced AI processing ai_start_time = time.time() # Analyze sentiment sentiment_result = api_processor_global.analyze_sentiment_advanced(cleaned_message, sms_context) # Extract intent intent_result = api_processor_global.extract_intent_advanced(cleaned_message, sms_context) # Extract keywords keywords_result = api_processor_global.extract_keywords_advanced(cleaned_message, sms_context) # Create enhanced context enhanced_context = { **sms_context, 'sentiment': sentiment_result, 'intent': intent_result, 'keywords': keywords_result, 'message_length': len(cleaned_message), 'word_count': len(cleaned_message.split()), 'conversation_history': get_conversation_history(conversation_id, limit=5) } # Check for escalation triggers should_escalate = check_sms_escalation_triggers(cleaned_message, sentiment_result, intent_result) if should_escalate: return handle_sms_escalation(from_number, cleaned_message, enhanced_context) # Try learned response first learned_response = drip_learn_system.get_best_response(cleaned_message, enhanced_context) if learned_response: ai_response_text = learned_response ai_response_data = { 'text': learned_response, 'api_used': 'learned_pattern', 'quality_score': 0.9, 'source': 'drip_learn' } else: # Generate AI response ai_response_data = api_processor_global.generate_with_multiple_apis(cleaned_message, enhanced_context) ai_response_text = ai_response_data.get('text', '') ai_processing_time = time.time() - ai_start_time # Validate AI response if not ai_response_text or len(ai_response_text.strip()) < 5: ai_response_text = generate_fallback_sms_response(cleaned_message, enhanced_context) ai_response_data = {'api_used': 'fallback', 'quality_score': 0.5} # Optimize response for SMS (character limit) optimized_response = optimize_sms_response(ai_response_text) # Store interaction for learning interaction_data = { 'type': 'sms', 'input': cleaned_message, 'ai_response': optimized_response, 'context': enhanced_context, 'sentiment': sentiment_result.get('sentiment', 0.0), 'quality': ai_response_data.get('quality_score', 0.0), 'api_used': ai_response_data.get('api_used', 'unknown'), 'processing_time': ai_processing_time } # Learn from interaction drip_learn_system.learn_from_interaction(interaction_data) # Store in database store_sms_interaction(message_sid, from_number, cleaned_message, optimized_response, enhanced_context, ai_response_data) # Update system metrics system_state.increment('ai_handled_sms') # Send response response_result = send_sms_response(from_number, optimized_response) # Update SMS status sms_data = system_state.get('active_sms', {}).get(message_sid, {}) sms_data.update({ 'status': 'completed', 'ai_response': optimized_response, 'processing_time': ai_processing_time, 'response_sent': True }) sms_logger.info(f"βœ… SMS processed and responded: {message_sid}") return response_result except Exception as e: sms_logger.error(f"❌ SMS processing error: {e}") return handle_sms_error(from_number, message_sid) def get_or_create_conversation_id(phone_number: str) -> str: """Get or create conversation ID for SMS threading""" # Simple conversation ID based on phone number and date date_str = datetime.now().strftime('%Y%m%d') conversation_id = f"{phone_number}_{date_str}" return conversation_id def clean_sms_input(message: str) -> str: """Clean and validate SMS input""" if not message: return "" # Basic cleaning cleaned = message.strip() # Remove excessive whitespace cleaned = ' '.join(cleaned.split()) # Validate length if len(cleaned) < 2 or len(cleaned) > 1000: return "" return cleaned def check_sms_escalation_triggers(message: str, sentiment: Dict, intent: Dict) -> bool: """Check if SMS should be escalated""" message_lower = message.lower() # Explicit request for Jay if any(phrase in message_lower for phrase in ['talk to jay', 'speak to jay', 'get jay', 'jay please', 'human']): return True # Very negative sentiment if sentiment.get('sentiment', 0) < -0.7: return True # Emergency situations if intent.get('intent') == 'emergency': return True # Complex complaints if intent.get('intent') == 'complaint' and sentiment.get('sentiment', 0) < -0.5: return True return False def handle_sms_escalation(from_number: str, message: str, context: Dict) -> str: """Handle SMS escalation to human""" sms_logger.info(f"πŸ“± SMS ESCALATION: {from_number}") escalation_reason = determine_sms_escalation_reason(message, context) if escalation_reason == 'emergency': response_text = f"I understand this is urgent. I'm alerting Jay immediately. You can also call him directly at {config.JAY_PHONE} for immediate assistance." elif escalation_reason == 'requested_human': response_text = f"I'll connect you with Jay right away. He'll respond to this text shortly, or you can call him at {config.JAY_PHONE}." elif escalation_reason == 'complaint': response_text = f"I sincerely apologize for any issue. Jay will personally handle this and respond shortly. For immediate assistance, call {config.JAY_PHONE}." else: response_text = f"Let me have Jay handle this personally. He'll respond to your text soon, or feel free to call {config.JAY_PHONE}." # TODO: Send notification to Jay (email, SMS, webhook, etc.) notify_jay_of_escalation(from_number, message, escalation_reason) return send_sms_response(from_number, response_text) def determine_sms_escalation_reason(message: str, context: Dict) -> str: """Determine reason for SMS escalation""" message_lower = message.lower() if 'emergency' in message_lower: return 'emergency' elif any(word in message_lower for word in ['jay', 'human', 'person', 'someone']): return 'requested_human' elif context.get('sentiment', {}).get('sentiment', 0) < -0.7: return 'very_negative_sentiment' elif 'complaint' in message_lower or 'problem' in message_lower: return 'complaint' else: return 'general_escalation' def notify_jay_of_escalation(from_number: str, message: str, reason: str): """Notify Jay of SMS escalation""" try: # This would integrate with Jay's preferred notification method # For now, log the escalation sms_logger.info(f"πŸ“§ Jay notification: {from_number} escalated ({reason}): {message[:100]}...") # TODO: Implement actual notification (email, SMS to Jay, Slack, etc.) except Exception as e: sms_logger.error(f"Jay notification failed: {e}") def handle_media_only_message(message_sid: str, context: Dict) -> str: """Handle SMS with only media attachments""" from_number = context['from_number'] response_text = ("Thanks for the photo! I can see you sent an image. " "How can I help you with our mobile car detailing services? " "Are you looking for a quote or wanting to schedule a wash?") return send_sms_response(from_number, response_text) def get_conversation_history(conversation_id: str, limit: int = 5) -> List[Dict]: """Get conversation history for context""" try: if SessionLocal is None: return [] db = SessionLocal() try: messages = db.query(SMS).filter( SMS.conversation_id == conversation_id ).order_by(SMS.timestamp.desc()).limit(limit).all() history = [] for msg in reversed(messages): # Reverse to get chronological order history.append({ 'message_body': msg.message_body, 'response_body': msg.response_body, 'timestamp': msg.timestamp.isoformat(), 'sentiment_score': msg.sentiment_score, 'intent_detected': msg.intent_detected }) return history finally: db.close() except Exception as e: sms_logger.error(f"Conversation history retrieval error: {e}") return [] def optimize_sms_response(response_text: str, max_length: int = 320) -> str: """Optimize response for SMS character limits""" if not response_text: return "Hi! How can I help you with Jay's Mobile Wash today?" # If response is within limit, return as-is if len(response_text) <= max_length: return response_text # Try to truncate at sentence boundary sentences = response_text.split('. ') optimized = "" for sentence in sentences: if len(optimized + sentence + '. ') <= max_length: optimized += sentence + '. ' else: break if optimized: return optimized.strip() # Fallback: truncate at word boundary words = response_text.split() optimized = "" for word in words: if len(optimized + word + ' ') <= max_length - 3: # Leave room for "..." optimized += word + ' ' else: optimized += "..." break return optimized.strip() def generate_fallback_sms_response(message: str, context: Dict) -> str: """Generate fallback SMS response""" message_lower = message.lower() # Basic intent detection for fallback if any(word in message_lower for word in ['price', 'cost', 'how much']): return "Our services: Basic wash $25, Premium $45, Full detail $85, Ceramic coating $150. Which interests you?" elif any(word in message_lower for word in ['book', 'schedule', 'appointment']): return "I'd love to schedule you! We're available Mon-Sat 8AM-6PM, Sun 10AM-4PM. What day works for you?" elif any(word in message_lower for word in ['location', 'where', 'area']): return "We come to you! We service within 15 miles of LA. What's your location?" elif any(word in message_lower for word in ['service', 'wash', 'detail']): return "We offer mobile car washing and detailing! Basic wash, premium wash, full detail, and ceramic coating. What service interests you?" else: return "Hi! Thanks for texting Jay's Mobile Wash! I can help with pricing, scheduling, or questions about our mobile car detailing. How can I help?" def send_sms_response(to_number: str, message: str) -> str: """Send SMS response using SignalWire""" try: if not signalwire_client: sms_logger.error("SignalWire client not available") return "" # Send SMS message_obj = signalwire_client.messages.create( body=message, from_=config.SIGNALWIRE_PHONE, to=to_number ) sms_logger.info(f"πŸ“€ SMS sent to {to_number}: {message[:50]}...") # Update Prometheus metrics SMS_COUNT.labels(status='sent').inc() # Create MessagingResponse for webhook response = MessagingResponse() return str(response) except Exception as e: sms_logger.error(f"SMS sending error: {e}") SMS_COUNT.labels(status='failed').inc() return "" def store_sms_interaction(message_sid: str, from_number: str, customer_message: str, ai_response: str, context: Dict, ai_data: Dict): """Store SMS interaction in database""" try: if SessionLocal is None: return db = SessionLocal() try: sms_record = SMS( message_sid=message_sid, from_number=from_number, to_number=context.get('to_number', config.SIGNALWIRE_PHONE), message_body=customer_message, response_body=ai_response, conversation_id=context.get('conversation_id'), sentiment_score=context.get('sentiment', {}).get('sentiment', 0.0), intent_detected=context.get('intent', {}).get('intent', 'unknown'), ai_confidence_score=ai_data.get('quality_score', 0.0), handled_by='ai', contains_media=context.get('has_media', False), response_time=ai_data.get('processing_time', 0.0) ) db.add(sms_record) db.commit() finally: db.close() except Exception as e: sms_logger.error(f"SMS interaction storage error: {e}") def handle_sms_error(from_number: str, message_sid: str) -> str: """Handle SMS processing errors""" sms_logger.error(f"SMS error for {message_sid}") error_response = ("Sorry, I'm having technical difficulties. " f"Please call Jay directly at {config.JAY_PHONE} or try texting again in a moment.") return send_sms_response(from_number, error_response) # ===== WEB INTERFACE ROUTES ===== @app.route('/') def dashboard(): """🏠 Main dashboard with complete system overview""" try: # Get comprehensive system metrics metrics = system_state.get_metrics() # Get recent activity recent_calls = get_recent_calls(limit=10) recent_sms = get_recent_sms(limit=10) # Get learning metrics learning_metrics = drip_learn_system.get_learning_metrics() # Get API usage stats api_stats = api_processor_global.get_usage_stats() # Calculate performance indicators performance_indicators = calculate_performance_indicators(metrics) dashboard_data = { 'metrics': metrics, 'recent_calls': recent_calls, 'recent_sms': recent_sms, 'learning_metrics': learning_metrics, 'api_stats': api_stats, 'performance_indicators': performance_indicators, 'system_health': get_system_health_status(), 'forwarding_status': { 'enabled': config.IPHONE_FORWARDING_ENABLED, 'delay_seconds': config.FORWARDING_DELAY_SECONDS, 'jay_phone': config.JAY_PHONE, 'ai_phone': config.SIGNALWIRE_PHONE } } return render_template('dashboard.html', **dashboard_data) except Exception as e: logger.error(f"Dashboard error: {e}") return render_template('error.html', error="Dashboard temporarily unavailable"), 500 @app.route('/api/metrics') def api_metrics(): """πŸ“Š API endpoint for real-time metrics""" try: metrics = { 'system': system_state.get_metrics(), 'learning': drip_learn_system.get_learning_metrics(), 'apis': api_processor_global.get_usage_stats(), 'health': get_system_health_status(), 'timestamp': datetime.now().isoformat() } return jsonify(metrics) except Exception as e: logger.error(f"API metrics error: {e}") return jsonify({'error': str(e)}), 500 @app.route('/train', methods=['GET', 'POST']) def training_interface(): """πŸŽ“ Training interface for Drip Learnβ„’ system""" if request.method == 'POST': try: action = request.form.get('action') if action == 'test_message': return handle_training_test(request.form) elif action == 'submit_correction': return handle_training_correction(request.form) elif action == 'bulk_train': return handle_bulk_training(request.files.get('training_file')) except Exception as e: training_logger.error(f"Training interface error: {e}") flash(f"Training error: {e}", "error") # GET request - show training interface training_data = { 'learning_metrics': drip_learn_system.get_learning_metrics(), 'recent_sessions': get_recent_training_sessions(), 'available_scenarios': get_training_scenarios(), 'knowledge_base_stats': get_knowledge_base_stats() } return render_template('training.html', **training_data) def handle_training_test(form_data: Dict) -> str: """Handle training test message""" try: test_message = form_data.get('test_message', '').strip() scenario_type = form_data.get('scenario_type', 'general') if not test_message: flash("Please enter a test message", "error") return redirect(url_for('training_interface')) # Create test context test_context = { 'type': 'training_test', 'scenario': scenario_type, 'timestamp': datetime.now() } # Process with AI ai_start_time = time.time() # Analyze with multiple APIs sentiment_result = api_processor_global.analyze_sentiment_advanced(test_message, test_context) intent_result = api_processor_global.extract_intent_advanced(test_message, test_context) keywords_result = api_processor_global.extract_keywords_advanced(test_message, test_context) # Generate response enhanced_context = { **test_context, 'sentiment': sentiment_result, 'intent': intent_result, 'keywords': keywords_result } ai_response_data = api_processor_global.generate_with_multiple_apis(test_message, enhanced_context) ai_processing_time = time.time() - ai_start_time # Create training session session_id = f"train_{int(time.time())}_{uuid.uuid4().hex[:8]}" training_session = { 'session_id': session_id, 'test_message': test_message, 'ai_response': ai_response_data.get('text', ''), 'scenario_type': scenario_type, 'processing_time': ai_processing_time, 'sentiment': sentiment_result, 'intent': intent_result, 'keywords': keywords_result, 'quality_score': ai_response_data.get('quality_score', 0.0), 'api_used': ai_response_data.get('api_used', 'unknown'), 'timestamp': datetime.now() } # Store session drip_learn_system.training_sessions[session_id] = training_session # Store in database store_training_session(training_session) flash(f"Test completed! Quality score: {ai_response_data.get('quality_score', 0.0):.2f}", "success") return jsonify({ 'success': True, 'session_id': session_id, 'response': ai_response_data.get('text', ''), 'metrics': { 'quality_score': ai_response_data.get('quality_score', 0.0), 'processing_time': ai_processing_time, 'sentiment': sentiment_result.get('sentiment', 0.0), 'intent': intent_result.get('intent', 'unknown'), 'api_used': ai_response_data.get('api_used', 'unknown') } }) except Exception as e: training_logger.error(f"Training test error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 def handle_training_correction(form_data: Dict) -> str: """Handle training correction submission""" try: session_id = form_data.get('session_id') corrected_response = form_data.get('corrected_response', '').strip() feedback = form_data.get('feedback', '') if not session_id or not corrected_response: return jsonify({'success': False, 'error': 'Missing required fields'}), 400 # Get original training session training_session = drip_learn_system.training_sessions.get(session_id) if not training_session: return jsonify({'success': False, 'error': 'Training session not found'}), 404 # Apply correction learning correction_data = { 'type': 'training_correction', 'input': training_session['test_message'], 'ai_response': training_session['ai_response'], 'correction': corrected_response, 'context': { 'session_id': session_id, 'scenario_type': training_session['scenario_type'], 'trainer_feedback': feedback }, 'quality': training_session['quality_score'] } # Learn from correction learning_result = drip_learn_system.learn_from_interaction(correction_data) # Update training session training_session.update({ 'corrected_response': corrected_response, 'trainer_feedback': feedback, 'correction_applied': True, 'learning_result': learning_result, 'corrected_at': datetime.now() }) # Update database update_training_session_with_correction(session_id, corrected_response, feedback) training_logger.info(f"Training correction applied: {session_id}") return jsonify({ 'success': True, 'learning_applied': learning_result.get('immediate', False), 'improvements': learning_result.get('improvements', {}) }) except Exception as e: training_logger.error(f"Training correction error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 def handle_bulk_training(training_file) -> str: """Handle bulk training file upload""" try: if not training_file or not training_file.filename.endswith('.csv'): return jsonify({'success': False, 'error': 'Please upload a CSV file'}), 400 # Read CSV data csv_data = training_file.read().decode('utf-8') csv_reader = csv.DictReader(StringIO(csv_data)) training_results = [] processed_count = 0 error_count = 0 for row in csv_reader: try: customer_input = row.get('customer_input', '').strip() expected_response = row.get('expected_response', '').strip() scenario = row.get('scenario', 'general') if not customer_input or not expected_response: continue # Process training example result = process_bulk_training_example(customer_input, expected_response, scenario) training_results.append(result) processed_count += 1 except Exception as e: error_count += 1 training_logger.error(f"Bulk training row error: {e}") training_logger.info(f"Bulk training completed: {processed_count} processed, {error_count} errors") return jsonify({ 'success': True, 'processed_count': processed_count, 'error_count': error_count, 'results': training_results[:10] # Return first 10 results }) except Exception as e: training_logger.error(f"Bulk training error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 def process_bulk_training_example(customer_input: str, expected_response: str, scenario: str) -> Dict: """Process individual bulk training example""" try: # Generate AI response context = {'type': 'bulk_training', 'scenario': scenario} ai_response_data = api_processor_global.generate_with_multiple_apis(customer_input, context) ai_response = ai_response_data.get('text', '') # Compare with expected response similarity_score = calculate_response_similarity(ai_response, expected_response) # If similarity is low, treat expected response as correction if similarity_score < 0.7: correction_data = { 'type': 'bulk_training', 'input': customer_input, 'ai_response': ai_response, 'correction': expected_response, 'context': context, 'quality': similarity_score } drip_learn_system.learn_from_interaction(correction_data) return { 'customer_input': customer_input, 'ai_response': ai_response, 'expected_response': expected_response, 'similarity_score': similarity_score, 'learning_applied': similarity_score < 0.7, 'scenario': scenario } except Exception as e: training_logger.error(f"Bulk training example error: {e}") return {'error': str(e)} def calculate_response_similarity(response1: str, response2: str) -> float: """Calculate similarity between two responses""" try: if not response1 or not response2: return 0.0 # Simple word-based similarity words1 = set(response1.lower().split()) words2 = set(response2.lower().split()) if not words1 or not words2: return 0.0 intersection = words1.intersection(words2) union = words1.union(words2) return len(intersection) / len(union) except: return 0.0 @app.route('/settings', methods=['GET', 'POST']) def settings_interface(): """βš™οΈ System settings interface""" if request.method == 'POST': try: action = request.form.get('action') if action == 'update_forwarding': return handle_forwarding_settings_update(request.form) elif action == 'update_ai_settings': return handle_ai_settings_update(request.form) elif action == 'update_business_settings': return handle_business_settings_update(request.form) elif action == 'test_api': return handle_api_test(request.form) except Exception as e: logger.error(f"Settings update error: {e}") flash(f"Settings update error: {e}", "error") # GET request - show settings interface settings_data = { 'forwarding_settings': { 'enabled': config.IPHONE_FORWARDING_ENABLED, 'delay_seconds': config.FORWARDING_DELAY_SECONDS, 'jay_phone': config.JAY_PHONE, 'ai_phone': config.SIGNALWIRE_PHONE }, 'ai_settings': { 'learning_enabled': system_state.get('learning_enabled'), 'ai_enabled': system_state.get('ai_enabled'), 'whisper_enabled': bool(config.HF_WHISPER_API_KEY), 'deepseek_enabled': bool(config.DEEPSEEK_API_KEY) }, 'api_health': system_state.get('api_health', {}), 'business_settings': { 'name': config.BUSINESS_NAME, 'phone': config.BUSINESS_PHONE, 'email': config.BUSINESS_EMAIL, 'service_radius': config.SERVICE_RADIUS_MILES, 'pricing': config.get_service_pricing() }, 'feature_flags': system_state.get('feature_flags', {}), 'system_metrics': system_state.get_metrics() } return render_template('settings.html', **settings_data) def handle_forwarding_settings_update(form_data: Dict) -> str: """Handle iPhone forwarding settings update""" try: enabled = form_data.get('forwarding_enabled') == 'on' delay_seconds = int(form_data.get('delay_seconds', config.FORWARDING_DELAY_SECONDS)) # Validate delay if delay_seconds < 5 or delay_seconds > 60: flash("Forwarding delay must be between 5 and 60 seconds", "error") return redirect(url_for('settings_interface')) # Update system state system_state.update({ 'iphone_forwarding_enabled': enabled, 'forwarding_delay_seconds': delay_seconds }) # Update forwarding rules forwarding_rules = system_state.get('forwarding_rules', {}) forwarding_rules.update({ 'unanswered_after_seconds': delay_seconds, 'enabled': enabled }) system_state.set('forwarding_rules', forwarding_rules) logger.info(f"Forwarding settings updated: enabled={enabled}, delay={delay_seconds}s") flash("iPhone forwarding settings updated successfully!", "success") return redirect(url_for('settings_interface')) except Exception as e: logger.error(f"Forwarding settings update error: {e}") flash(f"Failed to update forwarding settings: {e}", "error") return redirect(url_for('settings_interface')) def handle_ai_settings_update(form_data: Dict) -> str: """Handle AI settings update""" try: learning_enabled = form_data.get('learning_enabled') == 'on' ai_enabled = form_data.get('ai_enabled') == 'on' # Update system state system_state.update({ 'learning_enabled': learning_enabled, 'ai_enabled': ai_enabled }) logger.info(f"AI settings updated: learning={learning_enabled}, ai={ai_enabled}") flash("AI settings updated successfully!", "success") return redirect(url_for('settings_interface')) except Exception as e: logger.error(f"AI settings update error: {e}") flash(f"Failed to update AI settings: {e}", "error") return redirect(url_for('settings_interface')) def handle_business_settings_update(form_data: Dict) -> str: """Handle business settings update""" try: # Extract business information business_name = form_data.get('business_name', config.BUSINESS_NAME) business_phone = form_data.get('business_phone', config.BUSINESS_PHONE) business_email = form_data.get('business_email', config.BUSINESS_EMAIL) service_radius = int(form_data.get('service_radius', config.SERVICE_RADIUS_MILES)) # Validate inputs if service_radius < 1 or service_radius > 50: flash("Service radius must be between 1 and 50 miles", "error") return redirect(url_for('settings_interface')) # Update pricing if provided pricing_updates = {} for service in ['basic_wash', 'premium_wash', 'full_detail', 'ceramic_coating', 'headlight_restoration']: price_key = f"{service}_price" if price_key in form_data: try: price = float(form_data[price_key]) if price > 0: pricing_updates[service] = price except ValueError: flash(f"Invalid price for {service.replace('_', ' ').title()}", "error") return redirect(url_for('settings_interface')) # Store updates (in a real implementation, these would update environment variables or database) business_updates = { 'business_name': business_name, 'business_phone': business_phone, 'business_email': business_email, 'service_radius': service_radius } if pricing_updates: business_updates['pricing_updates'] = pricing_updates # Update knowledge base drip_learn_system.knowledge_base['business_info'].update(business_updates) logger.info(f"Business settings updated: {business_updates}") flash("Business settings updated successfully!", "success") return redirect(url_for('settings_interface')) except Exception as e: logger.error(f"Business settings update error: {e}") flash(f"Failed to update business settings: {e}", "error") return redirect(url_for('settings_interface')) def handle_api_test(form_data: Dict) -> str: """Handle API health test""" try: api_name = form_data.get('api_name') if api_name == 'signalwire': result = test_signalwire_api() elif api_name == 'deepseek': result = test_deepseek_api() elif api_name == 'whisper': result = test_whisper_api() elif api_name == 'all': result = api_processor_global.check_all_api_health() else: flash("Unknown API for testing", "error") return redirect(url_for('settings_interface')) if result.get('status') == 'healthy': flash(f"{api_name.title()} API test successful!", "success") else: flash(f"{api_name.title()} API test failed: {result.get('error', 'Unknown error')}", "error") return redirect(url_for('settings_interface')) except Exception as e: logger.error(f"API test error: {e}") flash(f"API test failed: {e}", "error") return redirect(url_for('settings_interface')) def test_signalwire_api() -> Dict: """Test SignalWire API health""" try: if not signalwire_client: return {'status': 'unhealthy', 'error': 'SignalWire client not initialized'} # Test by fetching account info account = signalwire_client.api.accounts(config.SIGNALWIRE_PROJECT_ID).fetch() return { 'status': 'healthy', 'account_sid': account.sid, 'account_status': account.status } except Exception as e: return {'status': 'unhealthy', 'error': str(e)} def test_deepseek_api() -> Dict: """Test DeepSeek API health""" try: if not config.DEEPSEEK_API_KEY: return {'status': 'unhealthy', 'error': 'DeepSeek API key not configured'} test_response = api_processor_global.generate_with_deepseek("Test", {}) if test_response.get('success'): return {'status': 'healthy', 'response_time': test_response.get('response_time', 0)} else: return {'status': 'unhealthy', 'error': test_response.get('error', 'Unknown error')} except Exception as e: return {'status': 'unhealthy', 'error': str(e)} def test_whisper_api() -> Dict: """Test Whisper API health""" try: if not config.HF_WHISPER_API_KEY: return {'status': 'unhealthy', 'error': 'Whisper API key not configured'} # Create dummy audio data for testing dummy_audio = b'\x00' * 1024 # Simple dummy audio data test_response = api_processor_global.process_audio_with_whisper(dummy_audio, {}) if test_response.get('success'): return {'status': 'healthy', 'response_time': test_response.get('response_time', 0)} else: return {'status': 'unhealthy', 'error': test_response.get('error', 'Unknown error')} except Exception as e: return {'status': 'unhealthy', 'error': str(e)} # ===== ANALYTICS AND REPORTING ===== @app.route('/analytics') def analytics_dashboard(): """πŸ“Š Advanced analytics dashboard""" try: # Get date range from query parameters days = int(request.args.get('days', 7)) end_date = datetime.now() start_date = end_date - timedelta(days=days) # Generate analytics data analytics_data = { 'summary_metrics': get_summary_metrics(start_date, end_date), 'call_analytics': get_call_analytics(start_date, end_date), 'sms_analytics': get_sms_analytics(start_date, end_date), 'ai_performance': get_ai_performance_analytics(start_date, end_date), 'business_metrics': get_business_analytics(start_date, end_date), 'forwarding_analytics': get_forwarding_analytics(start_date, end_date), 'customer_analytics': get_customer_analytics(start_date, end_date), 'charts_data': generate_charts_data(start_date, end_date), 'date_range': { 'start': start_date.strftime('%Y-%m-%d'), 'end': end_date.strftime('%Y-%m-%d'), 'days': days } } return render_template('analytics.html', **analytics_data) except Exception as e: logger.error(f"Analytics dashboard error: {e}") return render_template('error.html', error="Analytics temporarily unavailable"), 500 def get_summary_metrics(start_date: datetime, end_date: datetime) -> Dict: """Get summary metrics for the date range""" try: if SessionLocal is None: return {} db = SessionLocal() try: # Call metrics total_calls = db.query(Call).filter( Call.start_time >= start_date, Call.start_time <= end_date ).count() forwarded_calls = db.query(Call).filter( Call.start_time >= start_date, Call.start_time <= end_date, Call.forwarding_status == 'forwarded' ).count() ai_handled_calls = db.query(Call).filter( Call.start_time >= start_date, Call.start_time <= end_date, Call.handled_by == 'ai' ).count() # SMS metrics total_sms = db.query(SMS).filter( SMS.timestamp >= start_date, SMS.timestamp <= end_date ).count() # Revenue metrics total_revenue = db.query(Call.revenue_generated).filter( Call.start_time >= start_date, Call.start_time <= end_date ).scalar() or 0.0 # Customer satisfaction avg_satisfaction = db.query(func.avg(Call.customer_satisfaction)).filter( Call.start_time >= start_date, Call.start_time <= end_date, Call.customer_satisfaction.isnot(None) ).scalar() or 5.0 return { 'total_calls': total_calls, 'forwarded_calls': forwarded_calls, 'ai_handled_calls': ai_handled_calls, 'forwarding_rate': (forwarded_calls / max(total_calls, 1)) * 100, 'ai_success_rate': (ai_handled_calls / max(total_calls, 1)) * 100, 'total_sms': total_sms, 'total_revenue': total_revenue, 'avg_satisfaction': float(avg_satisfaction), 'total_interactions': total_calls + total_sms } finally: db.close() except Exception as e: logger.error(f"Summary metrics error: {e}") return {} def get_call_analytics(start_date: datetime, end_date: datetime) -> Dict: """Get detailed call analytics""" try: if SessionLocal is None: return {} db = SessionLocal() try: # Call volume by hour hourly_calls = db.query( func.extract('hour', Call.start_time).label('hour'), func.count(Call.id).label('count') ).filter( Call.start_time >= start_date, Call.start_time <= end_date ).group_by(func.extract('hour', Call.start_time)).all() # Call duration statistics avg_duration = db.query(func.avg(Call.duration)).filter( Call.start_time >= start_date, Call.start_time <= end_date, Call.duration.isnot(None) ).scalar() or 0.0 # Most common intents intent_distribution = db.query( Call.intent_detected, func.count(Call.id).label('count') ).filter( Call.start_time >= start_date, Call.start_time <= end_date, Call.intent_detected.isnot(None) ).group_by(Call.intent_detected).order_by(func.count(Call.id).desc()).limit(10).all() # Escalation rate escalated_calls = db.query(Call).filter( Call.start_time >= start_date, Call.start_time <= end_date, Call.escalated_to_human == True ).count() total_calls = db.query(Call).filter( Call.start_time >= start_date, Call.start_time <= end_date ).count() return { 'hourly_distribution': [{'hour': row.hour, 'count': row.count} for row in hourly_calls], 'avg_duration_minutes': float(avg_duration) / 60 if avg_duration else 0, 'intent_distribution': [{'intent': row.intent_detected, 'count': row.count} for row in intent_distribution], 'escalation_rate': (escalated_calls / max(total_calls, 1)) * 100, 'total_escalations': escalated_calls } finally: db.close() except Exception as e: logger.error(f"Call analytics error: {e}") return {} def get_sms_analytics(start_date: datetime, end_date: datetime) -> Dict: """Get detailed SMS analytics""" try: if SessionLocal is None: return {} db = SessionLocal() try: # SMS volume by hour hourly_sms = db.query( func.extract('hour', SMS.timestamp).label('hour'), func.count(SMS.id).label('count') ).filter( SMS.timestamp >= start_date, SMS.timestamp <= end_date ).group_by(func.extract('hour', SMS.timestamp)).all() # Average response time avg_response_time = db.query(func.avg(SMS.response_time)).filter( SMS.timestamp >= start_date, SMS.timestamp <= end_date, SMS.response_time.isnot(None) ).scalar() or 0.0 # Conversation length distribution conversation_lengths = db.query( SMS.conversation_id, func.count(SMS.id).label('message_count') ).filter( SMS.timestamp >= start_date, SMS.timestamp <= end_date ).group_by(SMS.conversation_id).all() # Language distribution language_distribution = db.query( SMS.language_detected, func.count(SMS.id).label('count') ).filter( SMS.timestamp >= start_date, SMS.timestamp <= end_date ).group_by(SMS.language_detected).all() return { 'hourly_distribution': [{'hour': row.hour, 'count': row.count} for row in hourly_sms], 'avg_response_time_seconds': float(avg_response_time), 'avg_conversation_length': sum(row.message_count for row in conversation_lengths) / max(len(conversation_lengths), 1), 'language_distribution': [{'language': row.language_detected, 'count': row.count} for row in language_distribution], 'total_conversations': len(conversation_lengths) } finally: db.close() except Exception as e: logger.error(f"SMS analytics error: {e}") return {} def get_ai_performance_analytics(start_date: datetime, end_date: datetime) -> Dict: """Get AI performance analytics""" try: learning_metrics = drip_learn_system.get_learning_metrics() api_stats = api_processor_global.get_usage_stats() # Get AI accuracy trends from database if SessionLocal: db = SessionLocal() try: # Average AI confidence scores avg_ai_confidence = db.query(func.avg(Call.ai_confidence_score)).filter( Call.start_time >= start_date, Call.start_time <= end_date, Call.ai_confidence_score.isnot(None) ).scalar() or 0.0 # Sentiment accuracy (positive sentiment with positive outcomes) sentiment_accuracy = calculate_sentiment_accuracy(db, start_date, end_date) # Learning improvements over time learning_trends = get_learning_trends(db, start_date, end_date) finally: db.close() else: avg_ai_confidence = 0.8 sentiment_accuracy = 0.85 learning_trends = [] return { 'api_usage': api_stats.get('api_usage', {}), 'performance_metrics': api_stats.get('performance_metrics', {}), 'avg_ai_confidence': float(avg_ai_confidence), 'sentiment_accuracy': sentiment_accuracy, 'learning_metrics': learning_metrics, 'learning_trends': learning_trends, 'api_health': api_stats.get('health_status', {}) } except Exception as e: logger.error(f"AI performance analytics error: {e}") return {} def calculate_sentiment_accuracy(db: Session, start_date: datetime, end_date: datetime) -> float: """Calculate sentiment analysis accuracy""" try: # This is a simplified accuracy calculation # In a real implementation, you would compare predicted sentiment with actual outcomes positive_sentiment_calls = db.query(Call).filter( Call.start_time >= start_date, Call.start_time <= end_date, Call.sentiment_score > 0.3, Call.customer_satisfaction >= 4 ).count() negative_sentiment_calls = db.query(Call).filter( Call.start_time >= start_date, Call.start_time <= end_date, Call.sentiment_score < -0.3, Call.customer_satisfaction <= 2 ).count() total_sentiment_calls = db.query(Call).filter( Call.start_time >= start_date, Call.start_time <= end_date, Call.sentiment_score.isnot(None), Call.customer_satisfaction.isnot(None) ).count() if total_sentiment_calls == 0: return 0.85 # Default accuracy accuracy = (positive_sentiment_calls + negative_sentiment_calls) / total_sentiment_calls return min(1.0, accuracy) except Exception as e: logger.error(f"Sentiment accuracy calculation error: {e}") return 0.85 def get_learning_trends(db: Session, start_date: datetime, end_date: datetime) -> List[Dict]: """Get learning improvement trends""" try: # Group learning data by day daily_learning = db.query( func.date(LearningData.timestamp).label('date'), func.count(LearningData.id).label('learning_count'), func.avg(LearningData.improvement_score).label('avg_improvement') ).filter( LearningData.timestamp >= start_date, LearningData.timestamp <= end_date ).group_by(func.date(LearningData.timestamp)).order_by(func.date(LearningData.timestamp)).all() return [ { 'date': row.date.isoformat(), 'learning_count': row.learning_count, 'avg_improvement': float(row.avg_improvement) if row.avg_improvement else 0.0 } for row in daily_learning ] except Exception as e: logger.error(f"Learning trends error: {e}") return [] def get_business_analytics(start_date: datetime, end_date: datetime) -> Dict: """Get business performance analytics""" try: if SessionLocal is None: return {} db = SessionLocal() try: # Revenue by service type revenue_by_service = db.query( BusinessData.service_type, func.sum(BusinessData.revenue).label('total_revenue'), func.count(BusinessData.id).label('booking_count') ).filter( BusinessData.date >= start_date, BusinessData.date <= end_date ).group_by(BusinessData.service_type).all() # Revenue trends daily_revenue = db.query( func.date(BusinessData.date).label('date'), func.sum(BusinessData.revenue).label('revenue'), func.count(BusinessData.id).label('bookings') ).filter( BusinessData.date >= start_date, BusinessData.date <= end_date ).group_by(func.date(BusinessData.date)).order_by(func.date(BusinessData.date)).all() # Customer metrics total_customers = db.query(Customer).filter( Customer.last_contact >= start_date, Customer.last_contact <= end_date ).count() new_customers = db.query(Customer).filter( Customer.first_contact >= start_date, Customer.first_contact <= end_date ).count() # Performance by location location_performance = db.query( BusinessData.location, func.avg(BusinessData.customer_rating).label('avg_rating'), func.sum(BusinessData.revenue).label('total_revenue'), func.count(BusinessData.id).label('booking_count') ).filter( BusinessData.date >= start_date, BusinessData.date <= end_date, BusinessData.location.isnot(None) ).group_by(BusinessData.location).order_by(func.sum(BusinessData.revenue).desc()).limit(10).all() return { 'revenue_by_service': [ { 'service': row.service_type, 'revenue': float(row.total_revenue), 'bookings': row.booking_count } for row in revenue_by_service ], 'daily_revenue': [ { 'date': row.date.isoformat(), 'revenue': float(row.revenue), 'bookings': row.bookings } for row in daily_revenue ], 'customer_metrics': { 'total_customers': total_customers, 'new_customers': new_customers, 'customer_growth_rate': (new_customers / max(total_customers, 1)) * 100 }, 'location_performance': [ { 'location': row.location, 'avg_rating': float(row.avg_rating) if row.avg_rating else 5.0, 'revenue': float(row.total_revenue), 'bookings': row.booking_count } for row in location_performance ] } finally: db.close() except Exception as e: logger.error(f"Business analytics error: {e}") return {} def get_forwarding_analytics(start_date: datetime, end_date: datetime) -> Dict: """Get iPhone forwarding analytics""" try: if SessionLocal is None: return {} db = SessionLocal() try: # Forwarding success rate total_forwarded = db.query(Call).filter( Call.start_time >= start_date, Call.start_time <= end_date, Call.forwarding_status == 'forwarded' ).count() successful_forwarded = db.query(Call).filter( Call.start_time >= start_date, Call.start_time <= end_date, Call.forwarding_status == 'forwarded', Call.call_status == 'completed' ).count() # Forwarding reasons forwarding_reasons = db.query( Call.forwarding_reason, func.count(Call.id).label('count') ).filter( Call.start_time >= start_date, Call.start_time <= end_date, Call.forwarding_status == 'forwarded' ).group_by(Call.forwarding_reason).all() # Forwarding by time of day forwarding_by_hour = db.query( func.extract('hour', Call.start_time).label('hour'), func.count(Call.id).label('forwarded_count') ).filter( Call.start_time >= start_date, Call.start_time <= end_date, Call.forwarding_status == 'forwarded' ).group_by(func.extract('hour', Call.start_time)).all() # Customer satisfaction for forwarded calls forwarded_satisfaction = db.query(func.avg(Call.customer_satisfaction)).filter( Call.start_time >= start_date, Call.start_time <= end_date, Call.forwarding_status == 'forwarded', Call.customer_satisfaction.isnot(None) ).scalar() or 5.0 return { 'forwarding_success_rate': (successful_forwarded / max(total_forwarded, 1)) * 100, 'total_forwarded': total_forwarded, 'successful_forwarded': successful_forwarded, 'forwarding_reasons': [ {'reason': row.forwarding_reason, 'count': row.count} for row in forwarding_reasons ], 'forwarding_by_hour': [ {'hour': row.hour, 'count': row.forwarded_count} for row in forwarding_by_hour ], 'avg_forwarded_satisfaction': float(forwarded_satisfaction) } finally: db.close() except Exception as e: logger.error(f"Forwarding analytics error: {e}") return {} def get_customer_analytics(start_date: datetime, end_date: datetime) -> Dict: """Get customer behavior analytics""" try: if SessionLocal is None: return {} db = SessionLocal() try: # Customer segmentation customer_segments = db.query( Customer.customer_segment, func.count(Customer.id).label('count'), func.avg(Customer.lifetime_value).label('avg_ltv') ).filter( Customer.last_contact >= start_date, Customer.last_contact <= end_date ).group_by(Customer.customer_segment).all() # Communication preferences comm_preferences = db.query( Customer.communication_preference, func.count(Customer.id).label('count') ).filter( Customer.last_contact >= start_date, Customer.last_contact <= end_date ).group_by(Customer.communication_preference).all() # Top customers by value top_customers = db.query( Customer.phone_number, Customer.name, Customer.lifetime_value, Customer.total_bookings, Customer.customer_segment ).filter( Customer.last_contact >= start_date, Customer.last_contact <= end_date ).order_by(Customer.lifetime_value.desc()).limit(10).all() # Customer retention metrics total_customers = db.query(Customer).filter( Customer.first_contact < start_date ).count() returning_customers = db.query(Customer).filter( Customer.first_contact < start_date, Customer.last_contact >= start_date, Customer.last_contact <= end_date ).count() return { 'customer_segments': [ { 'segment': row.customer_segment, 'count': row.count, 'avg_lifetime_value': float(row.avg_ltv) if row.avg_ltv else 0.0 } for row in customer_segments ], 'communication_preferences': [ {'preference': row.communication_preference, 'count': row.count} for row in comm_preferences ], 'top_customers': [ { 'phone': row.phone_number, 'name': row.name or 'Unknown', 'lifetime_value': float(row.lifetime_value), 'total_bookings': row.total_bookings, 'segment': row.customer_segment } for row in top_customers ], 'retention_metrics': { 'total_existing_customers': total_customers, 'returning_customers': returning_customers, 'retention_rate': (returning_customers / max(total_customers, 1)) * 100 } } finally: db.close() except Exception as e: logger.error(f"Customer analytics error: {e}") return {} def generate_charts_data(start_date: datetime, end_date: datetime) -> Dict: """Generate data for charts and visualizations""" try: # This would generate data for various charts # For now, returning sample structure # Generate date range for time series date_range = [] current_date = start_date while current_date <= end_date: date_range.append(current_date.strftime('%Y-%m-%d')) current_date += timedelta(days=1) return { 'date_range': date_range, 'call_volume_chart': generate_call_volume_chart_data(start_date, end_date), 'revenue_chart': generate_revenue_chart_data(start_date, end_date), 'sentiment_trend_chart': generate_sentiment_trend_chart_data(start_date, end_date), 'forwarding_performance_chart': generate_forwarding_chart_data(start_date, end_date) } except Exception as e: logger.error(f"Charts data generation error: {e}") return {} def generate_call_volume_chart_data(start_date: datetime, end_date: datetime) -> Dict: """Generate call volume chart data""" try: if SessionLocal is None: return {} db = SessionLocal() try: daily_calls = db.query( func.date(Call.start_time).label('date'), func.count(Call.id).label('total_calls'), func.sum(case([(Call.forwarding_status == 'forwarded', 1)], else_=0)).label('forwarded_calls'), func.sum(case([(Call.handled_by == 'ai', 1)], else_=0)).label('ai_handled_calls') ).filter( Call.start_time >= start_date, Call.start_time <= end_date ).group_by(func.date(Call.start_time)).order_by(func.date(Call.start_time)).all() return { 'labels': [row.date.strftime('%Y-%m-%d') for row in daily_calls], 'datasets': [ { 'label': 'Total Calls', 'data': [row.total_calls for row in daily_calls], 'borderColor': 'rgb(75, 192, 192)', 'backgroundColor': 'rgba(75, 192, 192, 0.2)' }, { 'label': 'Forwarded Calls', 'data': [row.forwarded_calls for row in daily_calls], 'borderColor': 'rgb(255, 99, 132)', 'backgroundColor': 'rgba(255, 99, 132, 0.2)' }, { 'label': 'AI Handled', 'data': [row.ai_handled_calls for row in daily_calls], 'borderColor': 'rgb(54, 162, 235)', 'backgroundColor': 'rgba(54, 162, 235, 0.2)' } ] } finally: db.close() except Exception as e: logger.error(f"Call volume chart error: {e}") return {} def generate_revenue_chart_data(start_date: datetime, end_date: datetime) -> Dict: """Generate revenue chart data""" try: if SessionLocal is None: return {} db = SessionLocal() try: daily_revenue = db.query( func.date(BusinessData.date).label('date'), func.sum(BusinessData.revenue).label('revenue'), func.sum(BusinessData.profit).label('profit') ).filter( BusinessData.date >= start_date, BusinessData.date <= end_date ).group_by(func.date(BusinessData.date)).order_by(func.date(BusinessData.date)).all() return { 'labels': [row.date.strftime('%Y-%m-%d') for row in daily_revenue], 'datasets': [ { 'label': 'Revenue', 'data': [float(row.revenue) for row in daily_revenue], 'borderColor': 'rgb(34, 197, 94)', 'backgroundColor': 'rgba(34, 197, 94, 0.2)' }, { 'label': 'Profit', 'data': [float(row.profit) for row in daily_revenue], 'borderColor': 'rgb(168, 85, 247)', 'backgroundColor': 'rgba(168, 85, 247, 0.2)' } ] } finally: db.close() except Exception as e: logger.error(f"Revenue chart error: {e}") return {} def generate_sentiment_trend_chart_data(start_date: datetime, end_date: datetime) -> Dict: """Generate sentiment trend chart data""" try: if SessionLocal is None: return {} db = SessionLocal() try: daily_sentiment = db.query( func.date(Call.start_time).label('date'), func.avg(Call.sentiment_score).label('avg_sentiment'), func.avg(Call.customer_satisfaction).label('avg_satisfaction') ).filter( Call.start_time >= start_date, Call.start_time <= end_date, Call.sentiment_score.isnot(None) ).group_by(func.date(Call.start_time)).order_by(func.date(Call.start_time)).all() return { 'labels': [row.date.strftime('%Y-%m-%d') for row in daily_sentiment], 'datasets': [ { 'label': 'Average Sentiment', 'data': [float(row.avg_sentiment) if row.avg_sentiment else 0 for row in daily_sentiment], 'borderColor': 'rgb(234, 179, 8)', 'backgroundColor': 'rgba(234, 179, 8, 0.2)', 'yAxisID': 'y' }, { 'label': 'Customer Satisfaction', 'data': [float(row.avg_satisfaction) if row.avg_satisfaction else 5 for row in daily_sentiment], 'borderColor': 'rgb(239, 68, 68)', 'backgroundColor': 'rgba(239, 68, 68, 0.2)', 'yAxisID': 'y1' } ] } finally: db.close() except Exception as e: logger.error(f"Sentiment trend chart error: {e}") return {} def generate_forwarding_chart_data(start_date: datetime, end_date: datetime) -> Dict: """Generate iPhone forwarding performance chart data""" try: if SessionLocal is None: return {} db = SessionLocal() try: daily_forwarding = db.query( func.date(Call.start_time).label('date'), func.count(Call.id).label('total_calls'), func.sum(case([(Call.forwarding_status == 'forwarded', 1)], else_=0)).label('forwarded_calls'), func.sum(case([ (and_(Call.forwarding_status == 'forwarded', Call.call_status == 'completed'), 1) ], else_=0)).label('successful_forwarded') ).filter( Call.start_time >= start_date, Call.start_time <= end_date ).group_by(func.date(Call.start_time)).order_by(func.date(Call.start_time)).all() # Calculate forwarding success rate for each day forwarding_rates = [] success_rates = [] for row in daily_forwarding: if row.total_calls > 0: forwarding_rate = (row.forwarded_calls / row.total_calls) * 100 forwarding_rates.append(forwarding_rate) else: forwarding_rates.append(0) if row.forwarded_calls > 0: success_rate = (row.successful_forwarded / row.forwarded_calls) * 100 success_rates.append(success_rate) else: success_rates.append(100) # No forwarded calls = 100% success return { 'labels': [row.date.strftime('%Y-%m-%d') for row in daily_forwarding], 'datasets': [ { 'label': 'Forwarding Rate (%)', 'data': forwarding_rates, 'borderColor': 'rgb(139, 92, 246)', 'backgroundColor': 'rgba(139, 92, 246, 0.2)', 'type': 'line' }, { 'label': 'Forwarding Success Rate (%)', 'data': success_rates, 'borderColor': 'rgb(16, 185, 129)', 'backgroundColor': 'rgba(16, 185, 129, 0.2)', 'type': 'line' } ] } finally: db.close() except Exception as e: logger.error(f"Forwarding chart error: {e}") return {} # ===== UTILITY FUNCTIONS ===== def get_recent_calls(limit: int = 10) -> List[Dict]: """Get recent call records""" try: if SessionLocal is None: return [] db = SessionLocal() try: recent_calls = db.query(Call).order_by( Call.start_time.desc() ).limit(limit).all() return [ { 'call_sid': call.call_sid, 'from_number': call.from_number, 'start_time': call.start_time.strftime('%Y-%m-%d %H:%M:%S') if call.start_time else '', 'duration': call.duration or 0, 'handled_by': call.handled_by, 'call_status': call.call_status, 'forwarding_status': call.forwarding_status, 'customer_satisfaction': call.customer_satisfaction, 'revenue_generated': call.revenue_generated or 0.0 } for call in recent_calls ] finally: db.close() except Exception as e: logger.error(f"Recent calls retrieval error: {e}") return [] def get_recent_sms(limit: int = 10) -> List[Dict]: """Get recent SMS records""" try: if SessionLocal is None: return [] db = SessionLocal() try: recent_sms = db.query(SMS).order_by( SMS.timestamp.desc() ).limit(limit).all() return [ { 'message_sid': sms.message_sid, 'from_number': sms.from_number, 'timestamp': sms.timestamp.strftime('%Y-%m-%d %H:%M:%S') if sms.timestamp else '', 'message_body': sms.message_body[:100] + '...' if len(sms.message_body) > 100 else sms.message_body, 'response_body': sms.response_body[:100] + '...' if sms.response_body and len(sms.response_body) > 100 else sms.response_body, 'conversation_id': sms.conversation_id, 'sentiment_score': sms.sentiment_score, 'intent_detected': sms.intent_detected, 'handled_by': sms.handled_by, 'customer_satisfaction': sms.customer_satisfaction, 'revenue_generated': sms.revenue_generated or 0.0 } for sms in recent_sms ] finally: db.close() except Exception as e: logger.error(f"Recent SMS retrieval error: {e}") return [] def calculate_performance_indicators(metrics: Dict) -> Dict: """Calculate key performance indicators""" try: total_interactions = metrics.get('total_calls_today', 0) + metrics.get('total_sms_today', 0) return { 'interaction_velocity': total_interactions / max(1, metrics.get('system_uptime_seconds', 1) / 3600), # Interactions per hour 'ai_efficiency': metrics.get('ai_success_rate', 0) / 100, 'customer_satisfaction_index': metrics.get('customer_satisfaction', 0) / 5, 'revenue_per_interaction': metrics.get('revenue_today', 0) / max(total_interactions, 1), 'forwarding_effectiveness': metrics.get('jay_answer_rate', 0) / 100, 'system_reliability': 1.0 if metrics.get('system_health') == 'healthy' else 0.5, 'learning_rate': metrics.get('ai_accuracy', 0) } except Exception as e: logger.error(f"Performance indicators calculation error: {e}") return {} def get_system_health_status() -> Dict: """Get comprehensive system health status""" try: health_status = { 'overall_status': 'healthy', 'components': {}, 'alerts': [], 'last_check': datetime.now().isoformat() } # Check SignalWire if signalwire_client: health_status['components']['signalwire'] = 'healthy' else: health_status['components']['signalwire'] = 'unhealthy' health_status['alerts'].append('SignalWire client not available') health_status['overall_status'] = 'degraded' # Check Database if SessionLocal: health_status['components']['database'] = 'healthy' else: health_status['components']['database'] = 'unhealthy' health_status['alerts'].append('Database not available') health_status['overall_status'] = 'degraded' # Check AI APIs api_health = system_state.get('api_health', {}) healthy_apis = sum(1 for status in api_health.values() if status.get('status') == 'healthy') total_apis = len(api_health) if total_apis > 0: api_health_rate = healthy_apis / total_apis if api_health_rate >= 0.8: health_status['components']['ai_apis'] = 'healthy' elif api_health_rate >= 0.5: health_status['components']['ai_apis'] = 'degraded' health_status['overall_status'] = 'degraded' else: health_status['components']['ai_apis'] = 'unhealthy' health_status['alerts'].append('Multiple AI APIs unhealthy') health_status['overall_status'] = 'unhealthy' # Check system resources try: cpu_usage = psutil.cpu_percent() memory_usage = psutil.virtual_memory().percent if cpu_usage > 90 or memory_usage > 90: health_status['components']['system_resources'] = 'critical' health_status['alerts'].append('High resource usage') health_status['overall_status'] = 'critical' elif cpu_usage > 70 or memory_usage > 70: health_status['components']['system_resources'] = 'degraded' health_status['overall_status'] = 'degraded' else: health_status['components']['system_resources'] = 'healthy' except Exception: health_status['components']['system_resources'] = 'unknown' return health_status except Exception as e: logger.error(f"System health check error: {e}") return { 'overall_status': 'unknown', 'components': {}, 'alerts': [f'Health check failed: {e}'], 'last_check': datetime.now().isoformat() } def get_recent_training_sessions(limit: int = 10) -> List[Dict]: """Get recent training sessions""" try: if SessionLocal is None: return [] db = SessionLocal() try: recent_sessions = db.query(TrainingSession).order_by( TrainingSession.timestamp.desc() ).limit(limit).all() return [ { 'session_id': session.session_id, 'session_type': session.session_type, 'test_message': session.test_message[:100] + '...' if len(session.test_message) > 100 else session.test_message, 'bot_response': session.bot_response[:100] + '...' if session.bot_response and len(session.bot_response) > 100 else session.bot_response, 'approved': session.approved, 'quality_score': session.quality_score, 'timestamp': session.timestamp.strftime('%Y-%m-%d %H:%M:%S') if session.timestamp else '' } for session in recent_sessions ] finally: db.close() except Exception as e: training_logger.error(f"Recent training sessions error: {e}") return [] def get_training_scenarios() -> List[Dict]: """Get available training scenarios""" return [ { 'id': 'pricing_inquiry', 'name': 'Pricing Inquiry', 'description': 'Customer asking about service prices', 'example': 'How much does a car wash cost?' }, { 'id': 'scheduling_request', 'name': 'Scheduling Request', 'description': 'Customer wanting to book an appointment', 'example': 'I want to schedule a car wash for tomorrow' }, { 'id': 'service_inquiry', 'name': 'Service Inquiry', 'description': 'Customer asking about available services', 'example': 'What kind of car detailing do you offer?' }, { 'id': 'location_inquiry', 'name': 'Location Inquiry', 'description': 'Customer asking about service area', 'example': 'Do you come to my area?' }, { 'id': 'complaint_handling', 'name': 'Complaint Handling', 'description': 'Customer with a complaint or issue', 'example': 'I\'m not satisfied with the service I received' }, { 'id': 'forwarded_call', 'name': 'Forwarded Call', 'description': 'Customer whose call was forwarded from Jay\'s iPhone', 'example': 'I tried calling Jay but got transferred here' }, { 'id': 'urgent_request', 'name': 'Urgent Request', 'description': 'Customer with an urgent or emergency situation', 'example': 'I need help right away, this is urgent' }, { 'id': 'general_inquiry', 'name': 'General Inquiry', 'description': 'General questions about the business', 'example': 'Tell me about your company' } ] def get_knowledge_base_stats() -> Dict: """Get knowledge base statistics""" try: knowledge_stats = drip_learn_system.knowledge_base return { 'total_patterns': len(knowledge_stats.get('patterns', {})), 'total_corrections': len(knowledge_stats.get('corrections', {})), 'common_responses': sum(len(responses) for responses in knowledge_stats.get('common_responses', {}).values()), 'forwarding_scenarios': len(knowledge_stats.get('forwarding_scenarios', {})), 'services_configured': len(knowledge_stats.get('services', {})), 'business_info_items': len(knowledge_stats.get('business_info', {})) } except Exception as e: training_logger.error(f"Knowledge base stats error: {e}") return {} def store_training_session(session_data: Dict): """Store training session in database""" try: if SessionLocal is None: return db = SessionLocal() try: training_record = TrainingSession( session_id=session_data['session_id'], session_type=session_data.get('scenario_type', 'general'), test_message=session_data['test_message'], bot_response=session_data.get('ai_response', ''), quality_score=session_data.get('quality_score', 0.0), learning_category=session_data.get('scenario_type', 'general'), model_performance=json.dumps({ 'processing_time': session_data.get('processing_time', 0.0), 'sentiment': session_data.get('sentiment', {}), 'intent': session_data.get('intent', {}), 'api_used': session_data.get('api_used', 'unknown') }) ) db.add(training_record) db.commit() finally: db.close() except Exception as e: training_logger.error(f"Training session storage error: {e}") def update_training_session_with_correction(session_id: str, corrected_response: str, feedback: str): """Update training session with correction""" try: if SessionLocal is None: return db = SessionLocal() try: session = db.query(TrainingSession).filter( TrainingSession.session_id == session_id ).first() if session: session.corrected_response = corrected_response session.trainer_feedback = feedback session.correction_applied = True session.approved = True db.commit() finally: db.close() except Exception as e: training_logger.error(f"Training session update error: {e}") # ===== SOCKETIO EVENTS FOR REAL-TIME UPDATES ===== @socketio.on('connect') def handle_connect(): """Handle client connection""" emit('status', {'message': 'Connected to Jay\'s Mobile Wash AI System'}) logger.info(f"Client connected: {request.sid}") @socketio.on('disconnect') def handle_disconnect(): """Handle client disconnection""" logger.info(f"Client disconnected: {request.sid}") @socketio.on('request_metrics') def handle_metrics_request(): """Handle real-time metrics request""" try: metrics = { 'system': system_state.get_metrics(), 'learning': drip_learn_system.get_learning_metrics(), 'apis': api_processor_global.get_usage_stats(), 'timestamp': datetime.now().isoformat() } emit('metrics_update', metrics) except Exception as e: logger.error(f"Metrics request error: {e}") emit('error', {'message': f'Metrics request failed: {e}'}) @socketio.on('request_live_calls') def handle_live_calls_request(): """Handle live calls request""" try: active_calls = system_state.get('active_calls', {}) calls_data = [] for call_sid, call_data in active_calls.items(): calls_data.append({ 'call_sid': call_sid, 'from_number': call_data.get('from_number'), 'status': call_data.get('call_status'), 'handled_by': call_data.get('handled_by'), 'start_time': call_data.get('start_time').isoformat() if call_data.get('start_time') else '', 'forwarding_status': call_data.get('forwarding_status') }) emit('live_calls_update', {'active_calls': calls_data}) except Exception as e: logger.error(f"Live calls request error: {e}") emit('error', {'message': f'Live calls request failed: {e}'}) @socketio.on('request_live_sms') def handle_live_sms_request(): """Handle live SMS request""" try: active_sms = system_state.get('active_sms', {}) sms_data = [] for message_sid, sms_info in active_sms.items(): sms_data.append({ 'message_sid': message_sid, 'from_number': sms_info.get('from_number'), 'conversation_id': sms_info.get('conversation_id'), 'status': sms_info.get('status'), 'timestamp': sms_info.get('timestamp').isoformat() if sms_info.get('timestamp') else '' }) emit('live_sms_update', {'active_sms': sms_data}) except Exception as e: logger.error(f"Live SMS request error: {e}") emit('error', {'message': f'Live SMS request failed: {e}'}) # ===== BACKGROUND TASKS ===== def start_background_tasks(): """Start background monitoring and maintenance tasks""" def performance_monitor(): """Monitor system performance""" while True: try: system_state.update_performance_metrics() time.sleep(30) # Update every 30 seconds except Exception as e: logger.error(f"Performance monitoring error: {e}") time.sleep(60) def cleanup_old_data(): """Clean up old data periodically""" while True: try: cleanup_old_records() time.sleep(3600) # Run every hour except Exception as e: logger.error(f"Data cleanup error: {e}") time.sleep(1800) # Retry in 30 minutes def health_check_broadcast(): """Broadcast health status to connected clients""" while True: try: health_status = get_system_health_status() socketio.emit('health_update', health_status) time.sleep(60) # Broadcast every minute except Exception as e: logger.error(f"Health broadcast error: {e}") time.sleep(120) def metrics_broadcast(): """Broadcast metrics to connected clients""" while True: try: metrics = { 'system': system_state.get_metrics(), 'timestamp': datetime.now().isoformat() } socketio.emit('metrics_broadcast', metrics) time.sleep(30) # Broadcast every 30 seconds except Exception as e: logger.error(f"Metrics broadcast error: {e}") time.sleep(60) # Start background threads threads = [ Thread(target=performance_monitor, daemon=True), Thread(target=cleanup_old_data, daemon=True), Thread(target=health_check_broadcast, daemon=True), Thread(target=metrics_broadcast, daemon=True) ] for thread in threads: thread.start() logger.info("βœ… Background tasks started") def cleanup_old_records(): """Clean up old database records""" try: if SessionLocal is None: return db = SessionLocal() try: # Clean up old system logs (keep 30 days) cutoff_date = datetime.now() - timedelta(days=30) deleted_logs = db.query(SystemLog).filter( SystemLog.timestamp < cutoff_date ).delete() # Clean up old learning data (keep successful learning, remove failed attempts older than 7 days) learning_cutoff = datetime.now() - timedelta(days=7) deleted_learning = db.query(LearningData).filter( LearningData.timestamp < learning_cutoff, LearningData.approved == False, LearningData.correction_applied == False ).delete() db.commit() if deleted_logs > 0 or deleted_learning > 0: logger.info(f"Cleaned up {deleted_logs} old logs and {deleted_learning} failed learning records") finally: db.close() except Exception as e: logger.error(f"Data cleanup error: {e}") # ===== ERROR HANDLERS ===== @app.errorhandler(404) def not_found_error(error): """Handle 404 errors""" return render_template('error.html', error="Page not found", error_code=404, error_description="The page you requested could not be found."), 404 @app.errorhandler(500) def internal_error(error): """Handle 500 errors""" logger.error(f"Internal server error: {error}") return render_template('error.html', error="Internal server error", error_code=500, error_description="An internal error occurred. Please try again."), 500 @app.errorhandler(Exception) def handle_exception(e): """Handle all unhandled exceptions""" logger.error(f"Unhandled exception: {e}", exc_info=True) # Return JSON error for API requests if request.path.startswith('/api/'): return jsonify({'error': 'Internal server error', 'message': str(e)}), 500 # Return HTML error page for web requests return render_template('error.html', error="System error", error_code=500, error_description="An unexpected error occurred."), 500 # ===== HEALTH CHECK ENDPOINT ===== @app.route('/health') def health_check(): """Health check endpoint for monitoring""" try: health_status = get_system_health_status() if health_status['overall_status'] == 'healthy': return jsonify(health_status), 200 elif health_status['overall_status'] == 'degraded': return jsonify(health_status), 200 else: return jsonify(health_status), 503 except Exception as e: logger.error(f"Health check error: {e}") return jsonify({ 'overall_status': 'unhealthy', 'error': str(e), 'timestamp': datetime.now().isoformat() }), 503 # ===== SCHEDULED TASKS ===== def setup_scheduled_tasks(): """Setup scheduled maintenance tasks""" scheduler = BackgroundScheduler() # Daily report generation scheduler.add_job( func=generate_daily_report, trigger=CronTrigger(hour=6, minute=0), # 6 AM daily id='daily_report', name='Generate daily report', replace_existing=True ) # Weekly analytics scheduler.add_job( func=generate_weekly_analytics, trigger=CronTrigger(day_of_week=0, hour=7, minute=0), # Sunday 7 AM id='weekly_analytics', name='Generate weekly analytics', replace_existing=True ) # API health monitoring scheduler.add_job( func=api_processor_global.check_all_api_health, trigger='interval', minutes=5, id='api_health_check', name='Check API health', replace_existing=True ) # Database optimization scheduler.add_job( func=optimize_database, trigger=CronTrigger(hour=2, minute=0), # 2 AM daily id='database_optimization', name='Optimize database', replace_existing=True ) scheduler.start() logger.info("βœ… Scheduled tasks configured") def generate_daily_report(): """Generate daily performance report""" try: logger.info("Generating daily report...") end_date = datetime.now() start_date = end_date - timedelta(days=1) # Generate comprehensive daily metrics daily_metrics = { 'date': start_date.strftime('%Y-%m-%d'), 'summary': get_summary_metrics(start_date, end_date), 'calls': get_call_analytics(start_date, end_date), 'sms': get_sms_analytics(start_date, end_date), 'ai_performance': get_ai_performance_analytics(start_date, end_date), 'business': get_business_analytics(start_date, end_date), 'forwarding': get_forwarding_analytics(start_date, end_date), 'customers': get_customer_analytics(start_date, end_date) } # Store report (in a real implementation, this would be saved to file or sent via email) logger.info(f"Daily report generated: {daily_metrics['summary']}") # TODO: Send email report to Jay # send_daily_report_email(daily_metrics) except Exception as e: logger.error(f"Daily report generation error: {e}") def generate_weekly_analytics(): """Generate weekly analytics report""" try: logger.info("Generating weekly analytics...") end_date = datetime.now() start_date = end_date - timedelta(days=7) # Generate comprehensive weekly analytics weekly_analytics = { 'week_ending': end_date.strftime('%Y-%m-%d'), 'summary': get_summary_metrics(start_date, end_date), 'performance_trends': calculate_performance_trends(start_date, end_date), 'learning_progress': drip_learn_system.get_learning_metrics(), 'recommendations': generate_improvement_recommendations() } logger.info(f"Weekly analytics generated: {weekly_analytics['summary']}") # TODO: Send comprehensive weekly report # send_weekly_analytics_email(weekly_analytics) except Exception as e: logger.error(f"Weekly analytics generation error: {e}") def calculate_performance_trends(start_date: datetime, end_date: datetime) -> Dict: """Calculate performance trends over the period""" try: # This would analyze trends in various metrics # For now, returning basic structure return { 'call_volume_trend': 'increasing', 'ai_accuracy_trend': 'stable', 'customer_satisfaction_trend': 'improving', 'forwarding_efficiency_trend': 'stable', 'revenue_trend': 'increasing' } except Exception as e: logger.error(f"Performance trends calculation error: {e}") return {} def generate_improvement_recommendations() -> List[str]: """Generate AI-based improvement recommendations""" try: recommendations = [] # Analyze current performance metrics = system_state.get_metrics() learning_metrics = drip_learn_system.get_learning_metrics() # Generate recommendations based on metrics if metrics.get('ai_success_rate', 0) < 85: recommendations.append("Consider additional AI training to improve response quality") if metrics.get('jay_answer_rate', 0) < 60: recommendations.append("Review iPhone forwarding settings to optimize call routing") if learning_metrics.get('total_corrections', 0) > 50: recommendations.append("High correction volume indicates potential for enhanced training scenarios") if metrics.get('customer_satisfaction', 0) < 4.5: recommendations.append("Focus on improving customer interaction quality and response personalization") # Add more sophisticated recommendations based on trends recommendations.append("Continue monitoring API performance for optimal system reliability") return recommendations except Exception as e: logger.error(f"Improvement recommendations error: {e}") return ["Monitor system performance and adjust settings as needed"] def optimize_database(): """Optimize database performance""" try: if SessionLocal is None: return logger.info("Starting database optimization...") db = SessionLocal() try: # Run VACUUM and ANALYZE for SQLite if 'sqlite' in config.DATABASE_URL.lower(): db.execute("VACUUM") db.execute("ANALYZE") # Update statistics for other databases else: db.execute("ANALYZE") db.commit() logger.info("Database optimization completed") finally: db.close() except Exception as e: logger.error(f"Database optimization error: {e}") # ===== APPLICATION STARTUP ===== def initialize_application(): """Initialize the application with all components""" try: console.print("\nπŸš€ [bold blue]INITIALIZING JAY'S MOBILE WASH AI SYSTEM[/bold blue] πŸš—\n") # Display startup banner console.print(Panel.fit( f"""[bold green]Jay's Mobile Wash AI System[/bold green] [cyan]Complete iPhone Forwarding + AI Assistant[/cyan] [yellow]Configuration:[/yellow] β€’ iPhone Forwarding: {config.IPHONE_FORWARDING_ENABLED} β€’ Jay's Phone: {config.JAY_PHONE} β€’ AI Phone: {config.SIGNALWIRE_PHONE} β€’ Forwarding Delay: {config.FORWARDING_DELAY_SECONDS}s [yellow]Features Enabled:[/yellow] β€’ 🎀 Whisper Speech Recognition β€’ 🧠 15 HuggingFace APIs β€’ πŸŽ“ Drip Learnβ„’ Training System β€’ πŸ“ž iPhone Call Forwarding β€’ πŸ“± SMS AI Assistant β€’ πŸ“Š Advanced Analytics β€’ πŸ”„ Real-time Learning [green]Status: READY FOR DEPLOYMENT[/green]""", title="πŸš— Mobile Wash AI", border_style="green" )) # Initialize components console.print("βœ… Configuration validated") console.print("βœ… Database initialized") console.print("βœ… SignalWire client ready") console.print("βœ… AI APIs configured") console.print("βœ… Learning system active") console.print("βœ… iPhone forwarding configured") # Start background tasks start_background_tasks() console.print("βœ… Background tasks started") # Setup scheduled tasks setup_scheduled_tasks() console.print("βœ… Scheduled tasks configured") # Final system status system_state.set('system_health', 'healthy') system_state.set('last_health_check', datetime.now()) console.print(f"\nπŸŽ‰ [bold green]SYSTEM READY![/bold green] πŸŽ‰") console.print(f"[cyan]Dashboard:[/cyan] http://localhost:7860") console.print(f"[cyan]Health Check:[/cyan] http://localhost:7860/health") console.print(f"[cyan]Metrics API:[/cyan] http://localhost:7860/api/metrics") return True except Exception as e: console.print(f"❌ [bold red]INITIALIZATION FAILED:[/bold red] {e}") return False # ===== MAIN APPLICATION ENTRY POINT ===== if __name__ == '__main__': try: # Initialize application if not initialize_application(): sys.exit(1) # Run the application if config.DEVELOPMENT_MODE: # Development mode with debug socketio.run( app, host='0.0.0.0', port=7860, debug=True, use_reloader=True, log_output=True ) else: # Production mode socketio.run( app, host='0.0.0.0', port=7860, debug=False, use_reloader=False, log_output=False ) except KeyboardInterrupt: console.print("\nπŸ›‘ [yellow]Application stopped by user[/yellow]") sys.exit(0) except Exception as e: console.print(f"\nπŸ’₯ [bold red]APPLICATION CRASH:[/bold red] {e}") logger.error(f"Application crash: {e}", exc_info=True) sys.exit(1)