Spaces:
Sleeping
Sleeping
File size: 4,453 Bytes
3e11f9b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
import base64
import json
import os
import traceback
from typing import List
from mcp.server.fastmcp import FastMCP
from openai import OpenAI
from pydantic import Field
from aworld.logs.util import logger
from mcp_servers.utils import get_file_from_source
# Initialize MCP server
mcp = FastMCP("audio-server")
client = OpenAI(
api_key=os.getenv("AUDIO_LLM_API_KEY"), base_url=os.getenv("AUDIO_LLM_BASE_URL")
)
AUDIO_TRANSCRIBE = (
"Input is a base64 encoded audio. Transcribe the audio content. "
"Return a json string with the following format: "
'{"audio_text": "transcribed text from audio"}'
)
def encode_audio(audio_source: str, with_header: bool = True) -> str:
"""
Encode audio to base64 format with robust file handling
Args:
audio_source: URL or local file path of the audio
with_header: Whether to include MIME type header
Returns:
str: Base64 encoded audio string, with MIME type prefix if with_header is True
Raises:
ValueError: When audio source is invalid or audio format is not supported
IOError: When audio file cannot be read
"""
if not audio_source:
raise ValueError("Audio source cannot be empty")
try:
# Get file with validation (only audio files allowed)
file_path, mime_type, content = get_file_from_source(
audio_source,
allowed_mime_prefixes=["audio/"],
max_size_mb=50.0, # 50MB limit for audio files
type="audio", # Specify type as audio to handle audio files
)
# Encode to base64
audio_base64 = base64.b64encode(content).decode()
# Format with header if requested
final_audio = (
f"data:{mime_type};base64,{audio_base64}" if with_header else audio_base64
)
# Clean up temporary file if it was created for a URL
if file_path != os.path.abspath(audio_source) and os.path.exists(file_path):
os.unlink(file_path)
return final_audio
except Exception:
logger.error(
f"Error encoding audio from {audio_source}: {traceback.format_exc()}"
)
raise
@mcp.tool(description="Transcribe the given audio in a list of filepaths or urls.")
async def mcp_transcribe_audio(
audio_urls: List[str] = Field(
description="The input audio in given a list of filepaths or urls."
),
) -> str:
"""
Transcribe the given audio in a list of filepaths or urls.
Args:
audio_urls: List of audio file paths or URLs
Returns:
str: JSON string containing transcriptions
"""
transcriptions = []
for audio_url in audio_urls:
try:
# Get file with validation (only audio files allowed)
file_path, _, _ = get_file_from_source(
audio_url,
allowed_mime_prefixes=["audio/"],
max_size_mb=50.0, # 50MB limit for audio files
type="audio", # Specify type as audio to handle audio files
)
# Use the file for transcription
with open(file_path, "rb") as audio_file:
transcription = client.audio.transcriptions.create(
file=audio_file,
model=os.getenv("AUDIO_LLM_MODEL_NAME"),
response_format="text",
)
transcriptions.append(transcription)
# Clean up temporary file if it was created for a URL
if file_path != os.path.abspath(audio_url) and os.path.exists(file_path):
os.unlink(file_path)
except Exception as e:
logger.error(f"Error transcribing {audio_url}: {traceback.format_exc()}")
transcriptions.append(f"Error: {str(e)}")
logger.info(f"---get_text_by_transcribe-transcription:{transcriptions}")
return json.dumps(transcriptions, ensure_ascii=False)
def main():
from dotenv import load_dotenv
load_dotenv()
print("Starting Audio MCP Server...", file=sys.stderr)
mcp.run(transport="stdio")
# Make the module callable
def __call__():
"""
Make the module callable for uvx.
This function is called when the module is executed directly.
"""
main()
# Add this for compatibility with uvx
import sys
sys.modules[__name__].__call__ = __call__
# Run the server when the script is executed directly
if __name__ == "__main__":
main()
|