Spaces:
Sleeping
Sleeping
import torch | |
from flask import Flask, render_template, request, jsonify | |
import os | |
from transformers import pipeline | |
from gtts import gTTS | |
from pydub import AudioSegment | |
from pydub.silence import detect_nonsilent | |
from waitress import serve | |
from flask import Flask, render_template, request, jsonify, redirect, url_for, session | |
from flask_session import Session # Import the Session class | |
from flask.sessions import SecureCookieSessionInterface # Import the class | |
from salesforce import get_salesforce_connection | |
import logging | |
logging.basicConfig(level=logging.INFO) | |
logging.info("This is an info message") | |
logging.error("This is an error message") | |
# Initialize Flask app and Salesforce connection | |
print("Starting app...") | |
app = Flask(__name__) | |
print("Flask app initialized.") | |
# Salesforce connection setup | |
sf = get_salesforce_connection() | |
print("Salesforce connection established.") | |
# Set the secret key to handle sessions securely | |
app.secret_key = os.getenv("SECRET_KEY", "sSSjyhInIsUohKpG8sHzty2q") # Replace with a secure key | |
# Configure the session type | |
app.config["SESSION_TYPE"] = "filesystem" # Use filesystem for session storage | |
app.config["SESSION_COOKIE_NAME"] = "my_session" # Optional: Change session cookie name | |
app.config["SESSION_COOKIE_SECURE"] = True # Ensure cookies are sent over HTTPS | |
app.config["SESSION_COOKIE_SAMESITE"] = "None" # Allow cross-site cookies | |
# Initialize the session | |
Session(app) # Correctly initialize the Session object | |
print("Session interface configured.") | |
# Ensure secure session handling for environments like Hugging Face | |
app.session_interface = SecureCookieSessionInterface() | |
print("Session interface configured.") | |
def create_salesforce_record(sf, name, email, mobile): | |
try: | |
# Print the values to verify that they are correct before storing | |
print(f"Storing the following data in Salesforce:") | |
print(f"Name: {name}") | |
print(f"Email: {email}") | |
print(f"Mobile: {mobile}") | |
# Create a new record in the Salesforce object (Customer_Login__c) | |
customer_record = sf.guest_user__c.create({ | |
'Name': name, # Salesforce standard field for Name | |
'Email__c': email, # Custom field for Email | |
'Phone_Number__c': mobile, # Custom field for Mobile Number | |
}) | |
# Log the Salesforce response to verify the fields and successful creation | |
logging.info(f"Salesforce record created: {customer_record}") | |
# Check if the record was created successfully | |
if customer_record and 'id' in customer_record: | |
logging.info(f"Record successfully created with ID: {customer_record['id']}") | |
return customer_record | |
else: | |
logging.error("Failed to create Salesforce record") | |
return None | |
except Exception as e: | |
logging.error(f"Error creating Salesforce record: {e}") | |
return None | |
# Query to get the newly created guest user record based on the ID | |
def query_guest_user_record(sf, record_id): | |
try: | |
# SOQL query to fetch the record by its ID | |
query = f"SELECT Id, Name, Email__c, Phone_Number__c FROM guest_user__c WHERE Id = '{record_id}'" | |
# Execute the query and fetch the result | |
result = sf.query(query) | |
# Check if the result contains the record | |
if result['totalSize'] > 0: | |
guest_user_record = result['records'][0] | |
print(f"Guest User Record Retrieved: {guest_user_record}") | |
return guest_user_record | |
else: | |
print(f"No record found with ID: {record_id}") | |
return None | |
except Exception as e: | |
print(f"Error querying the guest user record: {e}") | |
return None | |
# Function to check if user confirmation is "yes" or "it's ok" | |
def is_user_confirmed(transcribed_text): | |
# Convert the transcription to lowercase and check for confirmation | |
confirmation_keywords = ["yes", "it's ok", "okay", "sure", "confirm", "ok yes"] | |
return any(keyword in transcribed_text.lower() for keyword in confirmation_keywords) | |
# Use whisper-small for faster processing and better speed | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
asr_model = pipeline("automatic-speech-recognition", model="openai/whisper-small", device=0 if device == "cuda" else -1) | |
# Function to convert audio to WAV format | |
def convert_to_wav(input_path, output_path): | |
try: | |
audio = AudioSegment.from_file(input_path) | |
audio = audio.set_frame_rate(16000).set_channels(1) # Convert to 16kHz, mono | |
audio.export(output_path, format="wav") | |
except Exception as e: | |
raise Exception(f"Audio conversion failed: {str(e)}") | |
# Function to check if audio contains actual speech | |
def is_silent_audio(audio_path): | |
audio = AudioSegment.from_wav(audio_path) | |
nonsilent_parts = detect_nonsilent(audio, min_silence_len=500, silence_thresh=audio.dBFS-16) # Reduced silence duration | |
return len(nonsilent_parts) == 0 # If no speech detected | |
def index(): | |
return render_template("index.html") | |
def transcribe(): | |
if "audio" not in request.files: | |
return jsonify({"error": "No audio file provided"}), 400 | |
audio_file = request.files["audio"] | |
input_audio_path = os.path.join("static", "temp_input.wav") | |
output_audio_path = os.path.join("static", "temp.wav") | |
audio_file.save(input_audio_path) | |
try: | |
# Convert to WAV | |
convert_to_wav(input_audio_path, output_audio_path) | |
# Check for silence | |
if is_silent_audio(output_audio_path): | |
return jsonify({"error": "No speech detected. Please try again."}), 400 | |
# Use Whisper ASR model for transcription | |
result = asr_model(output_audio_path, generate_kwargs={"language": "en"}) | |
transcribed_text = result["text"].strip().capitalize() | |
# Log transcribed text to check it | |
logging.info(f"Transcribed text: {transcribed_text}") | |
# Check if user confirmed (saying "yes" or "it's ok") | |
if is_user_confirmed(transcribed_text): | |
# Split the transcription into name, email, and mobile | |
parts = transcribed_text.split(" ") | |
if len(parts) >= 3: | |
name, email, mobile = parts[0], parts[1], parts[2] | |
customer_record = create_salesforce_record(sf, name, email, mobile) | |
if customer_record: | |
return jsonify({"text": transcribed_text, "message": "Data stored in Salesforce!"}) | |
else: | |
return jsonify({"error": "Failed to store data in Salesforce."}), 500 | |
else: | |
return jsonify({"error": "Insufficient data for name, email, or mobile."}), 400 | |
else: | |
return jsonify({"text": transcribed_text, "message": "Please confirm if the details are correct by saying 'yes' or 'it's ok'."}) | |
except Exception as e: | |
logging.error(f"Error during transcription process: {e}") | |
return jsonify({"error": f"Speech recognition error: {str(e)}"}), 500 | |
# Start Production Server | |
if __name__ == "__main__": | |
serve(app, host="0.0.0.0", port=7860) | |