Spaces:
Build error
Build error
File size: 7,068 Bytes
8d72f48 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
import asyncio
import base64
import os
import time
from dotenv import load_dotenv
import datetime
# Import Hume client and related classes
from hume.client import AsyncHumeClient
from hume.empathic_voice.chat.socket_client import ChatConnectOptions, ChatWebsocketConnection
from hume.empathic_voice.chat.types import SubscribeEvent
from hume.core.api_error import ApiError
from hume import MicrophoneInterface, Stream
# Import simpleaudio for playback
import simpleaudio as sa
import wave
def play_audio(audio_bytes):
"""
Play raw audio bytes using simpleaudio.
"""
play_obj = sa.play_buffer(audio_bytes, num_channels=1, bytes_per_sample=2, sample_rate=16000)
play_obj.wait_done()
class WebSocketHandler:
"""Interface for containing the EVI WebSocket and associated socket handling behavior."""
def __init__(self):
"""Construct the WebSocketHandler, initially assigning the socket to None and the byte stream to a new Stream object."""
self.socket = None
self.byte_strs = Stream.new()
self.waiting_for_response = False
self.assistant_speaking = False
self.audio_player_task = None
self.user_has_spoken = False # Flag to track if the user has spoken
self.chat_initialized = False # Flag to track if chat is initialized
def set_socket(self, socket: ChatWebsocketConnection):
"""Set the socket."""
self.socket = socket
async def on_open(self):
"""Logic invoked when the WebSocket connection is opened."""
print("WebSocket connection opened.")
print("Waiting for you to speak... (Press Ctrl+C to exit)")
async def on_message(self, message: SubscribeEvent):
"""Callback function to handle a WebSocket message event."""
now = datetime.datetime.now().strftime("%H:%M:%S")
if message.type == "chat_metadata":
chat_id = message.chat_id
chat_group_id = message.chat_group_id
print(f"[{now}] Chat initialized - ID: {chat_id}, Group: {chat_group_id}")
self.chat_initialized = True
elif message.type == "user_message":
role = message.message.role.upper()
message_text = message.message.content
print(f"[{now}] {role}: {message_text}")
self.waiting_for_response = True
self.user_has_spoken = True # Mark that the user has spoken
elif message.type == "assistant_message":
# Only process assistant messages after the user has spoken
if self.user_has_spoken:
role = message.message.role.upper()
message_text = message.message.content
print(f"[{now}] {role}: {message_text}")
self.assistant_speaking = True
elif message.type == "audio_output":
# Only process audio if the user has spoken first
if self.user_has_spoken:
message_str: str = message.data
message_bytes = base64.b64decode(message_str.encode("utf-8"))
await self.byte_strs.put(message_bytes)
elif message.type == "assistant_message_done":
if self.user_has_spoken:
self.waiting_for_response = False
self.assistant_speaking = False
print(f"[{now}] Assistant finished speaking. Ready for your input...")
elif message.type == "error":
error_message = message.message
error_code = message.code
print(f"[{now}] ERROR ({error_code}): {error_message}")
raise ApiError(f"Error ({error_code}): {error_message}")
elif message.type == "speech_detection":
if message.is_speech_detected:
print(f"[{now}] Speech detected...")
else:
print(f"[{now}] Speech ended.")
elif message.type == "transcript_partial":
print(f"[{now}] Partial: {message.text}")
elif message.type == "transcript_final":
print(f"[{now}] Final: {message.text}")
async def audio_player(self):
"""Process audio from the stream."""
try:
while True:
audio_chunk = await self.byte_strs.get()
if audio_chunk and self.user_has_spoken:
# Play the audio chunk only if user has spoken
play_audio(audio_chunk)
except Exception as e:
print(f"Error in audio player: {e}")
async def on_close(self):
"""Logic invoked when the WebSocket connection is closed."""
print("WebSocket connection closed.")
async def on_error(self, error):
"""Logic invoked when an error occurs in the WebSocket connection."""
print(f"Error: {error}")
async def main() -> None:
# Load environment variables from the .env file.
load_dotenv()
HUME_API_KEY = os.getenv("HUMEAI_API_KEY")
HUME_CONFIG_ID = os.getenv("HUMEAI_CONFIG_ID")
HUME_SECRET_KEY = os.getenv("HUMEAI_SECRET_KEY")
if not HUME_API_KEY or not HUME_CONFIG_ID or not HUME_SECRET_KEY:
raise ValueError("Please set HUMEAI_API_KEY, HUMEAI_CONFIG_ID, and HUMEAI_SECRET_KEY in your .env file.")
client = AsyncHumeClient(api_key=HUME_API_KEY)
# Define the connection options.
options = ChatConnectOptions(
config_id=HUME_CONFIG_ID,
secret_key=HUME_SECRET_KEY,
wait_for_user_message=True, # This ensures the AI waits for user input before responding
enable_auto_ptt=False # Disable auto push-to-talk to ensure AI doesn't speak first
)
# Instantiate your WebSocketHandler.
websocket_handler = WebSocketHandler()
try:
# Connect with callbacks for open, message, close, and error.
async with client.empathic_voice.chat.connect_with_callbacks(
options=options,
on_open=websocket_handler.on_open,
on_message=websocket_handler.on_message,
on_close=websocket_handler.on_close,
on_error=websocket_handler.on_error
) as socket:
# Set the socket into the handler.
websocket_handler.set_socket(socket)
# Start the audio player task
audio_player_task = asyncio.create_task(websocket_handler.audio_player())
# Start the microphone interface with the correct parameters
mic_task = asyncio.create_task(
MicrophoneInterface.start(
socket,
# byte_stream=websocket_handler.byte_strs,
allow_user_interrupt=True
)
)
# Wait for both tasks
await asyncio.gather(mic_task, audio_player_task)
except KeyboardInterrupt:
print("\nExiting program...")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
asyncio.run(main()) |