File size: 4,202 Bytes
0ec61d2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# -- coding: utf-8 --
"""
https://help.aliyun.com/zh/model-studio/realtime#1234095db03g3
"""
import os, time, base64, asyncio
from omni_realtime_client import OmniRealtimeClient, TurnDetectionMode
import pyaudio
import queue
import threading

from project_settings import environment
DASHSCOPE_API_KEY = environment.get("QWEN_API_KEY")
os.environ["DASHSCOPE_API_KEY"] = DASHSCOPE_API_KEY

# 创建一个全局音频队列和播放线程
audio_queue = queue.Queue()
audio_player = None

# 初始化PyAudio
p = pyaudio.PyAudio()
RATE = 16000  # 采样率 16kHz
CHUNK = 3200  # 每个音频块的大小
FORMAT = pyaudio.paInt16  # 16位PCM格式
CHANNELS = 1  # 单声道

def audio_player_thread():
    """后台线程用于播放音频数据"""
    stream = p.open(format=FORMAT,
                    channels=CHANNELS,
                    rate=24000,
                    output=True,
                    frames_per_buffer=CHUNK)
    
    try:
        while True:
            try:
                # 从队列获取音频数据
                audio_data = audio_queue.get(block=True, timeout=0.5)
                if audio_data is None:  # 结束信号
                    break
                # 播放音频数据
                stream.write(audio_data)
                audio_queue.task_done()
            except queue.Empty:
                # 如果队列为空,继续等待
                continue
    finally:
        # 清理
        stream.stop_stream()
        stream.close()

def start_audio_player():
    """启动音频播放线程"""
    global audio_player
    if audio_player is None or not audio_player.is_alive():
        audio_player = threading.Thread(target=audio_player_thread, daemon=True)
        audio_player.start()

def handle_audio_data(audio_data):
    """处理接收到的音频数据"""
    # 打印接收到的音频数据长度(调试用)
    print(f"\n接收到音频数据: {len(audio_data)} 字节")
    # 将音频数据放入队列
    audio_queue.put(audio_data)

async def start_microphone_streaming(client: OmniRealtimeClient):
    CHUNK = 3200
    FORMAT = pyaudio.paInt16
    CHANNELS = 1
    RATE = 16000
    
    p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT,
                    channels=CHANNELS,
                    rate=RATE,
                    input=True,
                    frames_per_buffer=CHUNK)
    
    try:
        print("开始录音,请讲话...")
        while True:
            audio_data = stream.read(CHUNK)
            encoded_data = base64.b64encode(audio_data).decode('utf-8')
            
            eventd = {
                "event_id": "event_" + str(int(time.time() * 1000)),
                "type": "input_audio_buffer.append",
                "audio": encoded_data
            }
            await client.send_event(eventd)
            
            # 保持较短的等待时间以模拟实时交互
            await asyncio.sleep(0.05)
    finally:
        stream.stop_stream()
        stream.close()
        p.terminate()

async def main():
    # 启动音频播放线程
    start_audio_player()

    realtime_client = OmniRealtimeClient(
        base_url="wss://dashscope.aliyuncs.com/api-ws/v1/realtime",
        api_key=os.environ.get("DASHSCOPE_API_KEY"),
        model="qwen-omni-turbo-realtime-2025-05-08",
        voice="Chelsie",
        on_text_delta=lambda text: print(f"\nAssistant: {text}", end="", flush=True),
        on_audio_delta=handle_audio_data,
        turn_detection_mode=TurnDetectionMode.SERVER_VAD
    )
    
    try:
        await realtime_client.connect()
        # 启动消息处理和麦克风录音
        message_handler = asyncio.create_task(realtime_client.handle_messages())
        streaming_task = asyncio.create_task(start_microphone_streaming(realtime_client))

        while True:
            await asyncio.Queue().get()
    except Exception as e:
        print(f"Error: {e}")
    finally:
        # 结束音频播放线程
        audio_queue.put(None)
        if audio_player:
            audio_player.join(timeout=1)
        await realtime_client.close()
        p.terminate()

if __name__ == "__main__":
    # Install required packages:
    asyncio.run(main())