Spaces:
Build error
Build error
import asyncio | |
import base64 | |
import os | |
import time | |
from dotenv import load_dotenv | |
import datetime | |
# Import Hume client and related classes | |
from hume.client import AsyncHumeClient | |
from hume.empathic_voice.chat.socket_client import ChatConnectOptions, ChatWebsocketConnection | |
from hume.empathic_voice.chat.types import SubscribeEvent | |
from hume.core.api_error import ApiError | |
from hume import MicrophoneInterface, Stream | |
# Import simpleaudio for playback | |
import simpleaudio as sa | |
import wave | |
def play_audio(audio_bytes): | |
""" | |
Play raw audio bytes using simpleaudio. | |
""" | |
play_obj = sa.play_buffer(audio_bytes, num_channels=1, bytes_per_sample=2, sample_rate=16000) | |
play_obj.wait_done() | |
class WebSocketHandler: | |
"""Interface for containing the EVI WebSocket and associated socket handling behavior.""" | |
def __init__(self): | |
"""Construct the WebSocketHandler, initially assigning the socket to None and the byte stream to a new Stream object.""" | |
self.socket = None | |
self.byte_strs = Stream.new() | |
self.waiting_for_response = False | |
self.assistant_speaking = False | |
self.audio_player_task = None | |
self.user_has_spoken = False # Flag to track if the user has spoken | |
self.chat_initialized = False # Flag to track if chat is initialized | |
def set_socket(self, socket: ChatWebsocketConnection): | |
"""Set the socket.""" | |
self.socket = socket | |
async def on_open(self): | |
"""Logic invoked when the WebSocket connection is opened.""" | |
print("WebSocket connection opened.") | |
print("Waiting for you to speak... (Press Ctrl+C to exit)") | |
async def on_message(self, message: SubscribeEvent): | |
"""Callback function to handle a WebSocket message event.""" | |
now = datetime.datetime.now().strftime("%H:%M:%S") | |
if message.type == "chat_metadata": | |
chat_id = message.chat_id | |
chat_group_id = message.chat_group_id | |
print(f"[{now}] Chat initialized - ID: {chat_id}, Group: {chat_group_id}") | |
self.chat_initialized = True | |
elif message.type == "user_message": | |
role = message.message.role.upper() | |
message_text = message.message.content | |
print(f"[{now}] {role}: {message_text}") | |
self.waiting_for_response = True | |
self.user_has_spoken = True # Mark that the user has spoken | |
elif message.type == "assistant_message": | |
# Only process assistant messages after the user has spoken | |
if self.user_has_spoken: | |
role = message.message.role.upper() | |
message_text = message.message.content | |
print(f"[{now}] {role}: {message_text}") | |
self.assistant_speaking = True | |
elif message.type == "audio_output": | |
# Only process audio if the user has spoken first | |
if self.user_has_spoken: | |
message_str: str = message.data | |
message_bytes = base64.b64decode(message_str.encode("utf-8")) | |
await self.byte_strs.put(message_bytes) | |
elif message.type == "assistant_message_done": | |
if self.user_has_spoken: | |
self.waiting_for_response = False | |
self.assistant_speaking = False | |
print(f"[{now}] Assistant finished speaking. Ready for your input...") | |
elif message.type == "error": | |
error_message = message.message | |
error_code = message.code | |
print(f"[{now}] ERROR ({error_code}): {error_message}") | |
raise ApiError(f"Error ({error_code}): {error_message}") | |
elif message.type == "speech_detection": | |
if message.is_speech_detected: | |
print(f"[{now}] Speech detected...") | |
else: | |
print(f"[{now}] Speech ended.") | |
elif message.type == "transcript_partial": | |
print(f"[{now}] Partial: {message.text}") | |
elif message.type == "transcript_final": | |
print(f"[{now}] Final: {message.text}") | |
async def audio_player(self): | |
"""Process audio from the stream.""" | |
try: | |
while True: | |
audio_chunk = await self.byte_strs.get() | |
if audio_chunk and self.user_has_spoken: | |
# Play the audio chunk only if user has spoken | |
play_audio(audio_chunk) | |
except Exception as e: | |
print(f"Error in audio player: {e}") | |
async def on_close(self): | |
"""Logic invoked when the WebSocket connection is closed.""" | |
print("WebSocket connection closed.") | |
async def on_error(self, error): | |
"""Logic invoked when an error occurs in the WebSocket connection.""" | |
print(f"Error: {error}") | |
async def main() -> None: | |
# Load environment variables from the .env file. | |
load_dotenv() | |
HUME_API_KEY = os.getenv("HUMEAI_API_KEY") | |
HUME_CONFIG_ID = os.getenv("HUMEAI_CONFIG_ID") | |
HUME_SECRET_KEY = os.getenv("HUMEAI_SECRET_KEY") | |
if not HUME_API_KEY or not HUME_CONFIG_ID or not HUME_SECRET_KEY: | |
raise ValueError("Please set HUMEAI_API_KEY, HUMEAI_CONFIG_ID, and HUMEAI_SECRET_KEY in your .env file.") | |
client = AsyncHumeClient(api_key=HUME_API_KEY) | |
# Define the connection options. | |
options = ChatConnectOptions( | |
config_id=HUME_CONFIG_ID, | |
secret_key=HUME_SECRET_KEY, | |
wait_for_user_message=True, # This ensures the AI waits for user input before responding | |
enable_auto_ptt=False # Disable auto push-to-talk to ensure AI doesn't speak first | |
) | |
# Instantiate your WebSocketHandler. | |
websocket_handler = WebSocketHandler() | |
try: | |
# Connect with callbacks for open, message, close, and error. | |
async with client.empathic_voice.chat.connect_with_callbacks( | |
options=options, | |
on_open=websocket_handler.on_open, | |
on_message=websocket_handler.on_message, | |
on_close=websocket_handler.on_close, | |
on_error=websocket_handler.on_error | |
) as socket: | |
# Set the socket into the handler. | |
websocket_handler.set_socket(socket) | |
# Start the audio player task | |
audio_player_task = asyncio.create_task(websocket_handler.audio_player()) | |
# Start the microphone interface with the correct parameters | |
mic_task = asyncio.create_task( | |
MicrophoneInterface.start( | |
socket, | |
# byte_stream=websocket_handler.byte_strs, | |
allow_user_interrupt=True | |
) | |
) | |
# Wait for both tasks | |
await asyncio.gather(mic_task, audio_player_task) | |
except KeyboardInterrupt: | |
print("\nExiting program...") | |
except Exception as e: | |
print(f"Error: {e}") | |
if __name__ == "__main__": | |
asyncio.run(main()) |