Spaces:
Running
Running
import gradio as gr | |
import subprocess | |
import os | |
import base64 | |
from typing import List, Dict, Any | |
from pathlib import Path | |
from datetime import datetime | |
class MusicRecognitionMCPServer: | |
def __init__(self, allowed_directories: List[str] = None): | |
"""Initialize MCP Server with configurable file access""" | |
self.allowed_directories = allowed_directories or ["/tmp", "uploads", "output"] | |
self.processed_files = {} # Cache of processed results | |
# Convert relative paths to absolute and check access | |
abs_directories = [] | |
for directory in self.allowed_directories: | |
# Convert relative paths to absolute | |
if not os.path.isabs(directory): | |
directory = os.path.abspath(directory) | |
abs_directories.append(directory) | |
print(f"Checking directory: {directory}") | |
# Only try to create if it doesn't exist and we have permission | |
if not os.path.exists(directory): | |
try: | |
os.makedirs(directory, exist_ok=True) | |
os.chmod(directory, 0o755) | |
print(f"β Directory created: {directory}") | |
except PermissionError: | |
print(f"β οΈ Directory doesn't exist but can't create: {directory}") | |
print(f" This is normal for system directories like /tmp") | |
except Exception as e: | |
print(f"β οΈ Warning creating {directory}: {e}") | |
else: | |
print(f"β Directory exists: {directory}") | |
self.allowed_directories = abs_directories | |
print(f"Final allowed directories: {self.allowed_directories}") | |
# Test Audiveris installation | |
self._test_audiveris() | |
def list_resources(self) -> List[Dict[str, Any]]: | |
"""List available resources following MCP patterns""" | |
resources = [] | |
# Add processed files as resources | |
for file_id, file_info in self.processed_files.items(): | |
resources.append({ | |
"uri": f"musicxml://{file_id}", | |
"name": file_info["original_name"], | |
"description": f"MusicXML file converted from {file_info['original_name']} on {file_info['processed_at']}", | |
"mimeType": "application/vnd.recordare.musicxml+xml" | |
}) | |
# Add available PDF files in allowed directories | |
for directory in self.allowed_directories: | |
if os.path.exists(directory): | |
for file_path in Path(directory).rglob("*.pdf"): | |
if self._is_file_accessible(str(file_path)): | |
resources.append({ | |
"uri": f"file://{file_path}", | |
"name": file_path.name, | |
"description": f"PDF music score available for processing: {file_path.name}", | |
"mimeType": "application/pdf" | |
}) | |
return resources | |
def read_resource(self, uri: str) -> Dict[str, Any]: | |
"""Read resource content following MCP patterns""" | |
if uri.startswith("musicxml://"): | |
# Return processed MusicXML file | |
file_id = uri.replace("musicxml://", "") | |
if file_id in self.processed_files: | |
file_info = self.processed_files[file_id] | |
try: | |
with open(file_info["output_path"], "rb") as f: | |
content = base64.b64encode(f.read()).decode() | |
return { | |
"contents": [{ | |
"type": "resource", | |
"resource": { | |
"uri": uri, | |
"text": content, | |
"mimeType": "application/vnd.recordare.musicxml+xml" | |
} | |
}] | |
} | |
except FileNotFoundError: | |
raise Exception(f"MusicXML file not found: {file_info['output_path']}") | |
else: | |
raise Exception(f"Resource not found: {uri}") | |
elif uri.startswith("file://"): | |
# Return PDF file content | |
file_path = uri.replace("file://", "") | |
if not self._is_file_accessible(file_path): | |
raise Exception(f"File access denied: {file_path}") | |
try: | |
with open(file_path, "rb") as f: | |
content = base64.b64encode(f.read()).decode() | |
return { | |
"contents": [{ | |
"type": "resource", | |
"resource": { | |
"uri": uri, | |
"text": content, | |
"mimeType": "application/pdf" | |
} | |
}] | |
} | |
except FileNotFoundError: | |
raise Exception(f"File not found: {file_path}") | |
else: | |
raise Exception(f"Unsupported URI scheme: {uri}") | |
def _is_file_accessible(self, file_path: str) -> bool: | |
"""Check if file is within allowed directories""" | |
abs_path = os.path.abspath(file_path) | |
return any(abs_path.startswith(os.path.abspath(d)) for d in self.allowed_directories) | |
def recognize_music_tool(self, pdf_uri: str, output_dir: str = None) -> Dict[str, Any]: | |
"""Tool for music recognition following MCP patterns""" | |
# Handle different URI formats | |
if pdf_uri.startswith("file://"): | |
pdf_path = pdf_uri.replace("file://", "") | |
elif pdf_uri.startswith("data:"): | |
# Handle data URIs (base64 encoded) | |
return self._process_data_uri(pdf_uri, output_dir) | |
else: | |
# Assume it's a direct file path | |
pdf_path = pdf_uri | |
if not self._is_file_accessible(pdf_path): | |
raise Exception(f"File access denied: {pdf_path}") | |
if not os.path.exists(pdf_path): | |
raise Exception(f"PDF file not found: {pdf_path}") | |
try: | |
output_file = self._recognize_music_core(pdf_path, output_dir) | |
# Store in processed files cache | |
file_id = f"music_{len(self.processed_files) + 1}_{int(datetime.now().timestamp())}" | |
self.processed_files[file_id] = { | |
"original_name": os.path.basename(pdf_path), | |
"original_path": pdf_path, | |
"output_path": output_file, | |
"processed_at": datetime.now().isoformat(), | |
"file_id": file_id | |
} | |
# Return MCP-compliant response | |
return { | |
"content": [{ | |
"type": "text", | |
"text": f"β Successfully converted '{os.path.basename(pdf_path)}' to MusicXML.\n\n" | |
f"π Output file: {output_file}\n" | |
f"π Resource URI: musicxml://{file_id}\n" | |
f"π File size: {os.path.getsize(output_file)} bytes\n\n" | |
f"You can now access this MusicXML file as a resource using the URI: `musicxml://{file_id}`" | |
}], | |
"isError": False | |
} | |
except Exception as e: | |
return { | |
"content": [{ | |
"type": "text", | |
"text": f"β Music recognition failed: {str(e)}" | |
}], | |
"isError": True | |
} | |
def _process_data_uri(self, data_uri: str, output_dir: str = None) -> Dict[str, Any]: | |
"""Process base64 encoded data URI""" | |
try: | |
# Parse data URI: data:application/pdf;base64,<data> | |
header, data = data_uri.split(',', 1) | |
mime_type = header.split(';')[0].replace('data:', '') | |
if mime_type != 'application/pdf': | |
raise Exception(f"Unsupported MIME type: {mime_type}") | |
# Fix base64 padding if needed | |
data = self._fix_base64_padding(data) | |
# Decode base64 data | |
pdf_data = base64.b64decode(data) | |
# Save to temporary file | |
temp_dir = output_dir or "/tmp" | |
temp_pdf = os.path.join(temp_dir, f"temp_{int(datetime.now().timestamp())}.pdf") | |
with open(temp_pdf, 'wb') as f: | |
f.write(pdf_data) | |
# Process the file | |
return self.recognize_music_tool(f"file://{temp_pdf}", output_dir) | |
except Exception as e: | |
raise Exception(f"Failed to process data URI: {str(e)}") | |
def _fix_base64_padding(self, data: str) -> str: | |
"""Fix base64 padding to make it valid""" | |
# Remove any whitespace | |
data = data.strip().replace('\n', '').replace('\r', '').replace(' ', '') | |
# Add padding if needed | |
missing_padding = len(data) % 4 | |
if missing_padding: | |
data += '=' * (4 - missing_padding) | |
return data | |
def _recognize_music_core(self, pdf_file_path: str, output_dir: str = None) -> str: | |
"""Core music recognition function""" | |
audiveris = "/opt/audiveris/bin/Audiveris" | |
if output_dir is None: | |
output_dir = "/tmp" | |
# Ensure output directory exists with proper permissions | |
os.makedirs(output_dir, exist_ok=True) | |
try: | |
os.chmod(output_dir, 0o755) | |
except Exception as e: | |
print(f"Warning: Could not set permissions for {output_dir}: {e}") | |
if not self._is_file_accessible(output_dir): | |
raise Exception(f"Output directory access denied: {output_dir}") | |
# Verify input file exists | |
if not os.path.exists(pdf_file_path): | |
raise Exception(f"Input PDF file not found: {pdf_file_path}") | |
pdf_file_name = os.path.basename(pdf_file_path) | |
pdf_name_without_ext = os.path.splitext(pdf_file_name)[0] | |
# Try both possible extensions | |
possible_extensions = [".mxl", ".xml", ".musicxml"] | |
output_files = [os.path.join(output_dir, f"{pdf_name_without_ext}{ext}") for ext in possible_extensions] | |
cmd = [ | |
audiveris, "-batch", "-export", "-output", output_dir, pdf_file_path | |
] | |
print(f"Running Audiveris command: {' '.join(cmd)}") | |
result = subprocess.run(cmd, capture_output=True, text=True) | |
print(f"Audiveris stdout: {result.stdout}") | |
print(f"Audiveris stderr: {result.stderr}") | |
print(f"Audiveris return code: {result.returncode}") | |
# List files in output directory for debugging | |
if os.path.exists(output_dir): | |
files_in_output = os.listdir(output_dir) | |
print(f"Files in output directory: {files_in_output}") | |
# Check if any of the possible output files exist | |
existing_output = None | |
for output_file in output_files: | |
if os.path.exists(output_file): | |
existing_output = output_file | |
break | |
if existing_output: | |
print(f"Found output file: {existing_output}") | |
return existing_output | |
# If no output file found, provide detailed error | |
error_msg = f"Audiveris processing failed.\n" | |
error_msg += f"Return code: {result.returncode}\n" | |
error_msg += f"Stdout: {result.stdout}\n" | |
error_msg += f"Stderr: {result.stderr}\n" | |
error_msg += f"Expected files: {output_files}\n" | |
error_msg += f"Files in output dir: {os.listdir(output_dir) if os.path.exists(output_dir) else 'Directory does not exist'}\n" | |
raise Exception(error_msg) | |
def _test_audiveris(self): | |
"""Test if Audiveris is properly installed""" | |
audiveris = "/opt/audiveris/bin/Audiveris" | |
if not os.path.exists(audiveris): | |
print(f"β οΈ Warning: Audiveris not found at {audiveris}") | |
return False | |
try: | |
# Test Audiveris with help command | |
result = subprocess.run([audiveris, "-help"], capture_output=True, text=True, timeout=10) | |
if "Audiveris" in result.stdout or "Audiveris" in result.stderr: | |
print("β Audiveris installation verified") | |
return True | |
else: | |
print(f"β οΈ Warning: Audiveris may not be working properly") | |
print(f"Output: {result.stdout}") | |
print(f"Error: {result.stderr}") | |
return False | |
except Exception as e: | |
print(f"β οΈ Warning: Could not test Audiveris: {e}") | |
return False | |
# Initialize MCP Server with proper absolute paths | |
mcp_server = MusicRecognitionMCPServer(["/tmp", "/app/uploads", "/app/output"]) | |
def recognize_music_gradio(pdf_file): | |
"""Gradio wrapper for music recognition""" | |
try: | |
print(f"Processing file: {pdf_file.name}") | |
result = mcp_server.recognize_music_tool(f"file://{pdf_file.name}") | |
if result.get("isError"): | |
error_msg = result["content"][0]["text"] | |
print(f"Error in music recognition: {error_msg}") | |
return None | |
# Extract file ID from the response | |
response_text = result["content"][0]["text"] | |
print(f"Response text: {response_text}") | |
if "musicxml://" in response_text: | |
file_id = response_text.split("musicxml://")[1].split("`")[0] | |
print(f"Extracted file ID: {file_id}") | |
if file_id in mcp_server.processed_files: | |
file_info = mcp_server.processed_files[file_id] | |
output_path = file_info["output_path"] | |
print(f"Output path from cache: {output_path}") | |
if os.path.exists(output_path): | |
print(f"β File exists: {output_path}") | |
return output_path | |
else: | |
print(f"β File not found: {output_path}") | |
# If the above doesn't work, try to find the file directly | |
pdf_basename = os.path.splitext(os.path.basename(pdf_file.name))[0] | |
possible_files = [ | |
f"/tmp/{pdf_basename}.mxl", | |
f"/tmp/{pdf_basename}.xml", | |
f"/tmp/{pdf_basename}.musicxml" | |
] | |
for file_path in possible_files: | |
print(f"Checking: {file_path}") | |
if os.path.exists(file_path): | |
print(f"β Found file: {file_path}") | |
return file_path | |
print("β No output file found in any expected location") | |
print(f"Files in /tmp: {os.listdir('/tmp') if os.path.exists('/tmp') else 'Directory not found'}") | |
return None | |
except Exception as e: | |
print(f"Exception in Gradio wrapper: {str(e)}") | |
import traceback | |
traceback.print_exc() | |
return None | |
# Create enhanced Gradio interface with custom CSS and better UX | |
custom_css = """ | |
.gradio-container { | |
max-width: 1200px !important; | |
margin: auto !important; | |
} | |
.main-header { | |
text-align: center; | |
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
color: white; | |
padding: 2rem; | |
border-radius: 15px; | |
margin-bottom: 2rem; | |
box-shadow: 0 8px 32px rgba(0,0,0,0.1); | |
} | |
.feature-card { | |
background: white; | |
border-radius: 12px; | |
padding: 1.5rem; | |
margin: 1rem 0; | |
box-shadow: 0 4px 16px rgba(0,0,0,0.1); | |
border-left: 4px solid #667eea; | |
} | |
.upload-area { | |
border: 2px dashed #667eea; | |
border-radius: 12px; | |
padding: 2rem; | |
text-align: center; | |
background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); | |
transition: all 0.3s ease; | |
} | |
.upload-area:hover { | |
border-color: #764ba2; | |
background: linear-gradient(135deg, #e3f2fd 0%, #bbdefb 100%); | |
} | |
.status-success { | |
background: linear-gradient(135deg, #4caf50 0%, #45a049 100%); | |
color: white; | |
padding: 1rem; | |
border-radius: 8px; | |
margin: 1rem 0; | |
} | |
.status-error { | |
background: linear-gradient(135deg, #f44336 0%, #d32f2f 100%); | |
color: white; | |
padding: 1rem; | |
border-radius: 8px; | |
margin: 1rem 0; | |
} | |
.info-box { | |
background: linear-gradient(135deg, #e3f2fd 0%, #bbdefb 100%); | |
border-radius: 8px; | |
padding: 1rem; | |
margin: 1rem 0; | |
border-left: 4px solid #2196f3; | |
} | |
""" | |
def create_gradio_interface(): | |
with gr.Blocks(css=custom_css, title="πΌ Audiveris Music Score Recognition", theme=gr.themes.Soft()) as interface: | |
# Header | |
gr.HTML(""" | |
<div class="main-header"> | |
<h1>πΌ Audiveris Music Score Recognition</h1> | |
<p style="font-size: 1.2em; margin-top: 1rem; opacity: 0.9;"> | |
Transform your PDF music scores into editable MusicXML files using advanced AI recognition | |
</p> | |
</div> | |
""") | |
# Main content area | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.HTML(""" | |
<div class="feature-card"> | |
<h3>β¨ Features</h3> | |
<ul style="line-height: 1.8;"> | |
<li>π΅ High-accuracy music notation recognition</li> | |
<li>π PDF to MusicXML conversion</li> | |
<li>πΉ Supports complex musical scores</li> | |
<li>β‘ Fast processing with Audiveris engine</li> | |
<li>πΎ Downloadable results</li> | |
</ul> | |
</div> | |
""") | |
gr.HTML(""" | |
<div class="info-box"> | |
<h4>π How to use:</h4> | |
<ol style="line-height: 1.6;"> | |
<li>Upload your PDF music score</li> | |
<li>Click "π΅ Convert to MusicXML"</li> | |
<li>Wait for processing to complete</li> | |
<li>Download your MusicXML file</li> | |
</ol> | |
</div> | |
""") | |
with gr.Column(scale=2): | |
# File upload section | |
gr.HTML("<h3 style='text-align: center; color: #667eea;'>π Upload Your Music Score</h3>") | |
pdf_input = gr.File( | |
file_types=[".pdf"], | |
label="Select PDF File", | |
file_count="single", | |
height=200, | |
elem_classes=["upload-area"] | |
) | |
# Processing button | |
convert_btn = gr.Button( | |
"π΅ Convert to MusicXML", | |
variant="primary", | |
size="lg", | |
scale=1 | |
) | |
# Status and progress | |
status_display = gr.HTML(visible=False) | |
progress_bar = gr.Progress() | |
# Output section | |
gr.HTML("<h3 style='text-align: center; color: #667eea; margin-top: 2rem;'>π₯ Download Results</h3>") | |
output_file = gr.File( | |
label="MusicXML Output", | |
visible=False, | |
height=100 | |
) | |
# Processing info | |
processing_info = gr.Textbox( | |
label="Processing Details", | |
lines=8, | |
visible=False, | |
interactive=False | |
) | |
# Footer | |
gr.HTML(""" | |
<div style="text-align: center; margin-top: 3rem; padding: 2rem; background: #f8f9fa; border-radius: 12px;"> | |
<p style="color: #666; margin: 0;"> | |
Powered by <strong>Audiveris</strong> β’ Built with β€οΈ using Gradio | |
</p> | |
<p style="color: #888; font-size: 0.9em; margin-top: 0.5rem;"> | |
For best results, use high-quality PDF scans with clear musical notation | |
</p> | |
</div> | |
""") | |
# Enhanced processing function with better feedback | |
def process_with_feedback(pdf_file, progress=gr.Progress()): | |
if pdf_file is None: | |
return ( | |
gr.HTML("<div class='status-error'>β Please upload a PDF file first!</div>", visible=True), | |
None, | |
gr.Textbox(visible=False), | |
gr.File(visible=False) | |
) | |
try: | |
# Show processing status | |
progress(0.1, desc="π Analyzing PDF file...") | |
status_html = """ | |
<div class='status-success'> | |
<h4>π Processing your music score...</h4> | |
<p>File: <strong>{}</strong></p> | |
<p>Size: <strong>{:.2f} MB</strong></p> | |
<p>Please wait while Audiveris analyzes your score...</p> | |
</div> | |
""".format( | |
pdf_file.name.split('/')[-1], | |
os.path.getsize(pdf_file.name) / (1024*1024) | |
) | |
progress(0.3, desc="π΅ Running Audiveris recognition...") | |
# Process the file | |
result_file = recognize_music_gradio(pdf_file) | |
progress(0.9, desc="β Finalizing results...") | |
if result_file and os.path.exists(result_file): | |
# Success | |
success_html = """ | |
<div class='status-success'> | |
<h4>β Conversion completed successfully!</h4> | |
<p>π Output: <strong>{}</strong></p> | |
<p>π Size: <strong>{:.2f} KB</strong></p> | |
<p>π Your MusicXML file is ready for download!</p> | |
</div> | |
""".format( | |
os.path.basename(result_file), | |
os.path.getsize(result_file) / 1024 | |
) | |
# Processing details | |
details = f"""β CONVERSION SUCCESSFUL | |
π Input File: {pdf_file.name.split('/')[-1]} | |
π Input Size: {os.path.getsize(pdf_file.name) / (1024*1024):.2f} MB | |
π΅ Output File: {os.path.basename(result_file)} | |
π Output Size: {os.path.getsize(result_file) / 1024:.2f} KB | |
β±οΈ Processing completed at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | |
πΌ Your PDF music score has been successfully converted to MusicXML format! | |
You can now download the file and use it in music notation software like MuseScore, Finale, or Sibelius.""" | |
progress(1.0, desc="π Complete!") | |
return ( | |
gr.HTML(success_html, visible=True), | |
gr.File(result_file, visible=True), | |
gr.Textbox(details, visible=True), | |
gr.File(visible=True) | |
) | |
else: | |
# Failure | |
error_html = """ | |
<div class='status-error'> | |
<h4>β Conversion failed</h4> | |
<p>The music recognition process encountered an error.</p> | |
<p>Please check that your PDF contains clear musical notation and try again.</p> | |
</div> | |
""" | |
error_details = f"""β CONVERSION FAILED | |
π Input File: {pdf_file.name.split('/')[-1]} | |
π Input Size: {os.path.getsize(pdf_file.name) / (1024*1024):.2f} MB | |
β οΈ Error: No output file was generated by Audiveris | |
β±οΈ Failed at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | |
π‘ Troubleshooting tips: | |
β’ Ensure your PDF contains clear, high-quality musical notation | |
β’ Check that the PDF is not password-protected | |
β’ Try with a different PDF file | |
β’ Make sure the musical notation is not handwritten""" | |
return ( | |
gr.HTML(error_html, visible=True), | |
None, | |
gr.Textbox(error_details, visible=True), | |
gr.File(visible=False) | |
) | |
except Exception as e: | |
# Exception handling | |
error_html = f""" | |
<div class='status-error'> | |
<h4>β Processing Error</h4> | |
<p>An unexpected error occurred: <code>{str(e)}</code></p> | |
<p>Please try again or contact support if the problem persists.</p> | |
</div> | |
""" | |
error_details = f"""β PROCESSING ERROR | |
π Input File: {pdf_file.name.split('/')[-1] if pdf_file else 'Unknown'} | |
β οΈ Error: {str(e)} | |
β±οΈ Failed at: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | |
π§ Technical Details: | |
{str(e)} | |
Please try again with a different file or contact support.""" | |
return ( | |
gr.HTML(error_html, visible=True), | |
None, | |
gr.Textbox(error_details, visible=True), | |
gr.File(visible=False) | |
) | |
# Connect the button to the processing function | |
convert_btn.click( | |
fn=process_with_feedback, | |
inputs=[pdf_input], | |
outputs=[status_display, output_file, processing_info, output_file], | |
show_progress=True | |
) | |
# Auto-hide status when new file is uploaded | |
pdf_input.change( | |
fn=lambda: (gr.HTML(visible=False), gr.Textbox(visible=False), gr.File(visible=False)), | |
outputs=[status_display, processing_info, output_file] | |
) | |
return interface | |
# Create the enhanced interface | |
gradio_interface = create_gradio_interface() | |
# Removed run_gradio function - now running directly in main thread | |
if __name__ == "__main__": | |
print("===== Application Startup at {} =====".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) | |
print() | |
print("π΅ MCP-Compliant Music Recognition Service Starting...") | |
print("π± Gradio UI: http://localhost:7860") | |
# Run Gradio directly in the main thread to keep the application alive | |
try: | |
gradio_interface.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
mcp_server=True, | |
share=True, # Set to False for container deployment | |
prevent_thread_lock=False # Allow blocking to keep main thread alive | |
) | |
except Exception as e: | |
print(f"β Failed to start Gradio interface: {e}") | |
import traceback | |
traceback.print_exc() | |