fastrtc-voice-assistant / webrtc_handler.py
Twelve2five's picture
Upload 4 files
dff5fe4 verified
import asyncio
import json
import logging
import os
import ssl
import uuid
from typing import Dict, Optional, Callable
import aiohttp
from aiortc import RTCPeerConnection, RTCSessionDescription, MediaStreamTrack, RTCIceCandidate
from aiortc.contrib.media import MediaBlackhole, MediaRelay
logger = logging.getLogger("webrtc_handler")
pcs = set()
relay = MediaRelay()
class AudioTransformTrack(MediaStreamTrack):
"""
A track that processes audio and sends it to a callback function
"""
kind = "audio"
def __init__(self, track, callback):
super().__init__()
self.track = track
self.callback = callback
async def recv(self):
frame = await self.track.recv()
# Process audio frame
if self.callback:
self.callback(frame)
return frame
async def handle_offer(offer, audio_callback=None):
offer_data = RTCSessionDescription(sdp=offer["sdp"], type=offer["type"])
pc = RTCPeerConnection()
pcs.add(pc)
@pc.on("connectionstatechange")
async def on_connectionstatechange():
logger.info(f"Connection state is {pc.connectionState}")
if pc.connectionState == "failed":
await pc.close()
pcs.discard(pc)
@pc.on("track")
def on_track(track):
logger.info(f"Track {track.kind} received")
if track.kind == "audio":
pc.addTrack(AudioTransformTrack(relay.subscribe(track), audio_callback))
@track.on("ended")
async def on_ended():
logger.info(f"Track {track.kind} ended")
# Handle the incoming offer
await pc.setRemoteDescription(offer_data)
# Create an answer
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
return {
"sdp": pc.localDescription.sdp,
"type": pc.localDescription.type
}
async def add_ice_candidate(candidate, pc):
if candidate and pc:
candidate_data = RTCIceCandidate(
sdpMLineIndex=candidate.get("sdpMLineIndex"),
sdpMid=candidate.get("sdpMid"),
candidate=candidate.get("candidate")
)
await pc.addIceCandidate(candidate_data)