import asyncio import base64 import json import os import traceback from typing import Dict, List, Any, Optional, Tuple from handler import AudioLoop import gradio as gr class GeminiGradioIntegration: """ Integration class that connects the Gemini AudioLoop with Gradio components """ def __init__(self): self.audio_loop = None self.response_queue = asyncio.Queue() self.is_connected = False async def initialize(self): """Initialize the connection to Gemini""" try: if self.audio_loop is None: self.audio_loop = AudioLoop() # Start the AudioLoop self.loop_task = asyncio.create_task(self.audio_loop.run()) # Start response listener self.response_task = asyncio.create_task(self.listen_for_responses()) self.is_connected = True return "Connected to Gemini successfully" else: return "Already connected to Gemini" except Exception as e: self.is_connected = False traceback.print_exc() return f"Failed to connect to Gemini: {str(e)}" async def listen_for_responses(self): """Listen for responses from Gemini's audio queue and process them""" try: while True: # Get PCM data from the audio_loop pcm_data = await self.audio_loop.audio_in_queue.get() # Put the data in our response queue for Gradio to pick up await self.response_queue.put({ "type": "audio", "data": pcm_data }) # In a real implementation, we'd also listen for text responses # and tool calls from Gemini, process them, and update the UI except asyncio.CancelledError: print("[GeminiGradioIntegration] Response listener cancelled") except Exception as e: print(f"[GeminiGradioIntegration] Error in response listener: {str(e)}") traceback.print_exc() async def send_query(self, repo_url: str, query: str) -> str: """ Send a text query to Gemini about a GitHub repository """ if not self.is_connected: return "Not connected to Gemini. Please connect first." try: # Format the query to include the repository context formatted_query = f"Analyze GitHub repository: {repo_url}\nQuery: {query}" # Send the query to Gemini forward_msg = { "client_content": { "turn_complete": True, "turns": [ { "role": "user", "parts": [ {"text": formatted_query} ] } ] } } await self.audio_loop.out_queue.put(forward_msg) return "Query sent to Gemini" except Exception as e: traceback.print_exc() return f"Error sending query: {str(e)}" async def send_audio(self, audio_data): """ Send audio data to Gemini """ if not self.is_connected: return "Not connected to Gemini. Please connect first." try: # Convert the audio data to PCM format # Note: In a real implementation, you'd need to convert from Gradio's # audio format to the format expected by Gemini # For simplicity, let's assume audio_data is already in PCM format raw_pcm = audio_data.tobytes() # Create the message to send to Gemini forward_msg = { "realtime_input": { "media_chunks": [ { "data": base64.b64encode(raw_pcm).decode(), "mime_type": "audio/pcm" } ] } } # Send to Gemini await self.audio_loop.out_queue.put(forward_msg) return "Audio sent to Gemini" except Exception as e: traceback.print_exc() return f"Error sending audio: {str(e)}" async def get_next_response(self): """ Get the next response from Gemini (for Gradio to poll) """ try: # Non-blocking check if there's a response if self.response_queue.empty(): return None # Get the response response = await self.response_queue.get() return response except Exception as e: print(f"[GeminiGradioIntegration] Error getting response: {str(e)}") return None def cleanup(self): """Clean up resources when shutting down""" if self.audio_loop: # Cancel the tasks if hasattr(self, 'loop_task') and self.loop_task: self.loop_task.cancel() if hasattr(self, 'response_task') and self.response_task: self.response_task.cancel() self.is_connected = False print("[GeminiGradioIntegration] Cleaned up resources") # Initialize the integration gemini_integration = GeminiGradioIntegration() # Function to be called from Gradio def connect_to_gemini(): """Connect to Gemini (for Gradio button)""" return asyncio.run(gemini_integration.initialize()) def send_text_query(repo_url, query): """Send a text query to Gemini (for Gradio interface)""" return asyncio.run(gemini_integration.send_query(repo_url, query)) def send_audio_query(audio_data): """Send audio to Gemini (for Gradio interface)""" if audio_data is None: return "No audio data provided" return asyncio.run(gemini_integration.send_audio(audio_data)) # This function would be called periodically by Gradio to check for new responses def check_for_responses(): """Check for new responses from Gemini""" response = asyncio.run(gemini_integration.get_next_response()) if response and response["type"] == "audio": # Convert PCM data back to a format Gradio can understand # This depends on how Gradio expects audio data return response["data"] return None