Spaces:
Sleeping
Sleeping
import asyncio | |
import base64 | |
import json | |
import os | |
import traceback | |
from typing import Dict, List, Any, Optional, Tuple | |
from handler import AudioLoop | |
import gradio as gr | |
class GeminiGradioIntegration: | |
""" | |
Integration class that connects the Gemini AudioLoop with Gradio components | |
""" | |
def __init__(self): | |
self.audio_loop = None | |
self.response_queue = asyncio.Queue() | |
self.is_connected = False | |
async def initialize(self): | |
"""Initialize the connection to Gemini""" | |
try: | |
if self.audio_loop is None: | |
self.audio_loop = AudioLoop() | |
# Start the AudioLoop | |
self.loop_task = asyncio.create_task(self.audio_loop.run()) | |
# Start response listener | |
self.response_task = asyncio.create_task(self.listen_for_responses()) | |
self.is_connected = True | |
return "Connected to Gemini successfully" | |
else: | |
return "Already connected to Gemini" | |
except Exception as e: | |
self.is_connected = False | |
traceback.print_exc() | |
return f"Failed to connect to Gemini: {str(e)}" | |
async def listen_for_responses(self): | |
"""Listen for responses from Gemini's audio queue and process them""" | |
try: | |
while True: | |
# Get PCM data from the audio_loop | |
pcm_data = await self.audio_loop.audio_in_queue.get() | |
# Put the data in our response queue for Gradio to pick up | |
await self.response_queue.put({ | |
"type": "audio", | |
"data": pcm_data | |
}) | |
# In a real implementation, we'd also listen for text responses | |
# and tool calls from Gemini, process them, and update the UI | |
except asyncio.CancelledError: | |
print("[GeminiGradioIntegration] Response listener cancelled") | |
except Exception as e: | |
print(f"[GeminiGradioIntegration] Error in response listener: {str(e)}") | |
traceback.print_exc() | |
async def send_query(self, repo_url: str, query: str) -> str: | |
""" | |
Send a text query to Gemini about a GitHub repository | |
""" | |
if not self.is_connected: | |
return "Not connected to Gemini. Please connect first." | |
try: | |
# Format the query to include the repository context | |
formatted_query = f"Analyze GitHub repository: {repo_url}\nQuery: {query}" | |
# Send the query to Gemini | |
forward_msg = { | |
"client_content": { | |
"turn_complete": True, | |
"turns": [ | |
{ | |
"role": "user", | |
"parts": [ | |
{"text": formatted_query} | |
] | |
} | |
] | |
} | |
} | |
await self.audio_loop.out_queue.put(forward_msg) | |
return "Query sent to Gemini" | |
except Exception as e: | |
traceback.print_exc() | |
return f"Error sending query: {str(e)}" | |
async def send_audio(self, audio_data): | |
""" | |
Send audio data to Gemini | |
""" | |
if not self.is_connected: | |
return "Not connected to Gemini. Please connect first." | |
try: | |
# Convert the audio data to PCM format | |
# Note: In a real implementation, you'd need to convert from Gradio's | |
# audio format to the format expected by Gemini | |
# For simplicity, let's assume audio_data is already in PCM format | |
raw_pcm = audio_data.tobytes() | |
# Create the message to send to Gemini | |
forward_msg = { | |
"realtime_input": { | |
"media_chunks": [ | |
{ | |
"data": base64.b64encode(raw_pcm).decode(), | |
"mime_type": "audio/pcm" | |
} | |
] | |
} | |
} | |
# Send to Gemini | |
await self.audio_loop.out_queue.put(forward_msg) | |
return "Audio sent to Gemini" | |
except Exception as e: | |
traceback.print_exc() | |
return f"Error sending audio: {str(e)}" | |
async def get_next_response(self): | |
""" | |
Get the next response from Gemini (for Gradio to poll) | |
""" | |
try: | |
# Non-blocking check if there's a response | |
if self.response_queue.empty(): | |
return None | |
# Get the response | |
response = await self.response_queue.get() | |
return response | |
except Exception as e: | |
print(f"[GeminiGradioIntegration] Error getting response: {str(e)}") | |
return None | |
def cleanup(self): | |
"""Clean up resources when shutting down""" | |
if self.audio_loop: | |
# Cancel the tasks | |
if hasattr(self, 'loop_task') and self.loop_task: | |
self.loop_task.cancel() | |
if hasattr(self, 'response_task') and self.response_task: | |
self.response_task.cancel() | |
self.is_connected = False | |
print("[GeminiGradioIntegration] Cleaned up resources") | |
# Initialize the integration | |
gemini_integration = GeminiGradioIntegration() | |
# Function to be called from Gradio | |
def connect_to_gemini(): | |
"""Connect to Gemini (for Gradio button)""" | |
return asyncio.run(gemini_integration.initialize()) | |
def send_text_query(repo_url, query): | |
"""Send a text query to Gemini (for Gradio interface)""" | |
return asyncio.run(gemini_integration.send_query(repo_url, query)) | |
def send_audio_query(audio_data): | |
"""Send audio to Gemini (for Gradio interface)""" | |
if audio_data is None: | |
return "No audio data provided" | |
return asyncio.run(gemini_integration.send_audio(audio_data)) | |
# This function would be called periodically by Gradio to check for new responses | |
def check_for_responses(): | |
"""Check for new responses from Gemini""" | |
response = asyncio.run(gemini_integration.get_next_response()) | |
if response and response["type"] == "audio": | |
# Convert PCM data back to a format Gradio can understand | |
# This depends on how Gradio expects audio data | |
return response["data"] | |
return None | |