# -*- coding: utf-8 -*- # Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ https://ai.google.dev/gemini-api/docs/live?hl=zh-cn#python_1 https://github.com/google-gemini/cookbook/blob/main/quickstarts/Get_started_LiveAPI_NativeAudio.py ## Setup To install the dependencies for this script, run: ``` brew install portaudio pip install -U google-genai pyaudio ``` ## API key Ensure the `GOOGLE_API_KEY` environment variable is set to the api-key you obtained from Google AI Studio. ## Run To run the script: ``` python Get_started_LiveAPI_NativeAudio.py ``` Start talking to Gemini """ import asyncio import os import sys import traceback import pyaudio from google import genai from project_settings import environment if sys.version_info < (3, 11, 0): import taskgroup, exceptiongroup asyncio.TaskGroup = taskgroup.TaskGroup asyncio.ExceptionGroup = exceptiongroup.ExceptionGroup FORMAT = pyaudio.paInt16 CHANNELS = 1 SEND_SAMPLE_RATE = 16000 RECEIVE_SAMPLE_RATE = 24000 CHUNK_SIZE = 1024 pya = pyaudio.PyAudio() GOOGLE_API_KEY = environment.get("GEMINI_API_KEY") os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY client = genai.Client() # GOOGLE_API_KEY must be set as env variable MODEL = "gemini-2.5-flash-preview-native-audio-dialog" CONFIG = {"response_modalities": ["AUDIO"]} class AudioLoop: def __init__(self): self.audio_in_queue = None self.out_queue = None self.session = None self.audio_stream = None self.receive_audio_task = None self.play_audio_task = None async def listen_audio(self): mic_info = pya.get_default_input_device_info() self.audio_stream = await asyncio.to_thread( pya.open, format=FORMAT, channels=CHANNELS, rate=SEND_SAMPLE_RATE, input=True, input_device_index=mic_info["index"], frames_per_buffer=CHUNK_SIZE, ) if __debug__: kwargs = {"exception_on_overflow": False} else: kwargs = {} while True: data = await asyncio.to_thread(self.audio_stream.read, CHUNK_SIZE, **kwargs) await self.out_queue.put({"data": data, "mime_type": "audio/pcm"}) async def send_realtime(self): while True: msg = await self.out_queue.get() await self.session.send_realtime_input(audio=msg) async def receive_audio(self): "Background task to reads from the websocket and write pcm chunks to the output queue" while True: turn = self.session.receive() async for response in turn: if data := response.data: self.audio_in_queue.put_nowait(data) continue if text := response.text: print(text, end="") # If you interrupt the model, it sends a turn_complete. # For interruptions to work, we need to stop playback. # So empty out the audio queue because it may have loaded # much more audio than has played yet. while not self.audio_in_queue.empty(): self.audio_in_queue.get_nowait() async def play_audio(self): stream = await asyncio.to_thread( pya.open, format=FORMAT, channels=CHANNELS, rate=RECEIVE_SAMPLE_RATE, output=True, ) while True: bytestream = await self.audio_in_queue.get() await asyncio.to_thread(stream.write, bytestream) async def run(self): try: async with ( client.aio.live.connect(model=MODEL, config=CONFIG) as session, asyncio.TaskGroup() as tg, ): self.session = session self.audio_in_queue = asyncio.Queue() self.out_queue = asyncio.Queue(maxsize=5) tg.create_task(self.send_realtime()) tg.create_task(self.listen_audio()) tg.create_task(self.receive_audio()) tg.create_task(self.play_audio()) except asyncio.CancelledError: pass except ExceptionGroup as EG: if self.audio_stream: self.audio_stream.close() traceback.print_exception(EG) if __name__ == "__main__": loop = AudioLoop() asyncio.run(loop.run())