Spaces:
Sleeping
Sleeping
from flask import Flask, render_template, send_from_directory, jsonify, request | |
import os | |
import mutagen | |
from mutagen.flac import FLAC | |
from mutagen.mp3 import MP3 | |
from mutagen.wave import WAVE | |
from functools import lru_cache | |
import base64 | |
from pathlib import Path | |
app = Flask(__name__) | |
# Configure music folder using Path for better path handling | |
app.config['MUSIC_FOLDER'] = Path(__file__).parent / 'music' | |
# Global cache for metadata | |
METADATA_CACHE = {} | |
STRUCTURE_CACHE = None | |
CACHE_VERSION = 1 # Increment this when changing cache logic | |
def get_audio_metadata(file_path): | |
"""Cache metadata for frequently accessed files""" | |
try: | |
# Check if metadata is in cache | |
if file_path in METADATA_CACHE: | |
return METADATA_CACHE[file_path] | |
metadata = None | |
if file_path.endswith('.flac'): | |
audio = FLAC(file_path) | |
metadata = { | |
'title': str(audio.get('title', [Path(file_path).name])[0]), | |
'artist': str(audio.get('artist', ['Unknown'])[0]), | |
'album': str(audio.get('album', ['Unknown'])[0]), | |
'duration': int(audio.info.length), | |
'sample_rate': audio.info.sample_rate, | |
'channels': audio.info.channels, | |
'artwork': None, | |
'lyrics': str(audio.get('lyrics', [''])[0]), | |
'synchronized_lyrics': get_synchronized_lyrics(audio) | |
} | |
if audio.pictures: | |
try: | |
artwork_data = audio.pictures[0].data | |
metadata['artwork'] = f"data:image/jpeg;base64,{base64.b64encode(artwork_data).decode('utf-8')}" | |
except Exception: | |
pass | |
elif file_path.endswith('.mp3'): | |
audio = MP3(file_path) | |
metadata = { | |
'title': audio.tags.get('TIT2', [Path(file_path).name])[0] if hasattr(audio, 'tags') else Path(file_path).name, | |
'artist': audio.tags.get('TPE1', ['Unknown'])[0] if hasattr(audio, 'tags') else 'Unknown', | |
'album': audio.tags.get('TALB', ['Unknown'])[0] if hasattr(audio, 'tags') else 'Unknown', | |
'duration': int(audio.info.length), | |
'sample_rate': audio.info.sample_rate, | |
'channels': audio.info.channels, | |
'artwork': None, | |
'lyrics': get_mp3_lyrics(audio), | |
'synchronized_lyrics': get_mp3_synchronized_lyrics(audio) | |
} | |
if hasattr(audio, 'tags'): | |
try: | |
apic_keys = [k for k in audio.tags.keys() if k.startswith('APIC:')] | |
if apic_keys: | |
artwork_data = audio.tags[apic_keys[0]].data | |
metadata['artwork'] = f"data:image/jpeg;base64,{base64.b64encode(artwork_data).decode('utf-8')}" | |
except Exception: | |
pass | |
elif file_path.endswith('.wav'): | |
audio = WAVE(file_path) | |
metadata = { | |
'title': Path(file_path).name, | |
'artist': 'Unknown', | |
'album': 'Unknown', | |
'duration': int(audio.info.length), | |
'sample_rate': audio.info.sample_rate, | |
'channels': audio.info.channels, | |
'artwork': None, | |
'lyrics': '', | |
'synchronized_lyrics': [] | |
} | |
if metadata: | |
METADATA_CACHE[file_path] = metadata | |
return metadata | |
return None | |
except Exception as e: | |
print(f"Error reading metadata for {file_path}: {str(e)}") | |
return { | |
'title': Path(file_path).name, | |
'artist': 'Unknown', | |
'album': 'Unknown', | |
'duration': 0, | |
'sample_rate': 0, | |
'channels': 0, | |
'artwork': None | |
} | |
def get_folder_structure(path): | |
"""Get folder structure with caching""" | |
global STRUCTURE_CACHE | |
try: | |
if STRUCTURE_CACHE is not None: | |
return STRUCTURE_CACHE | |
if not Path(path).exists(): | |
return [] | |
structure = [] | |
for item in Path(path).iterdir(): | |
if item.is_file() and item.suffix.lower() in ('.mp3', '.wav', '.flac'): | |
metadata = get_audio_metadata(str(item)) | |
if metadata: | |
structure.append({ | |
'type': 'file', | |
'name': item.name, | |
'path': str(item.relative_to(app.config['MUSIC_FOLDER'])), | |
'metadata': metadata | |
}) | |
elif item.is_dir(): | |
structure.append({ | |
'type': 'folder', | |
'name': item.name, | |
'path': str(item.relative_to(app.config['MUSIC_FOLDER'])), | |
'contents': get_folder_structure(item) | |
}) | |
STRUCTURE_CACHE = structure | |
return structure | |
except Exception as e: | |
print(f"Error scanning directory {path}: {str(e)}") | |
return [] | |
def get_music_structure(): | |
"""Get music structure with caching""" | |
music_dir = app.config['MUSIC_FOLDER'] | |
if not music_dir.exists(): | |
music_dir.mkdir(parents=True, exist_ok=True) | |
return jsonify([]) | |
if not any(music_dir.iterdir()): | |
return jsonify([]) | |
structure = get_folder_structure(music_dir) | |
return jsonify(structure) | |
# Add cache invalidation endpoint for development | |
def clear_cache(): | |
"""Clear all caches - use in development only""" | |
global STRUCTURE_CACHE, METADATA_CACHE | |
STRUCTURE_CACHE = None | |
METADATA_CACHE.clear() | |
get_audio_metadata.cache_clear() | |
return jsonify({"message": "Cache cleared"}) | |
def serve_music(filename): | |
return send_from_directory('music', filename) | |
def index(): | |
return render_template('index.html') | |
def default_cover(): | |
return send_from_directory('static', 'default-cover.png') | |
def manage_playlists(): | |
global playlists | |
if request.method == 'GET': | |
return jsonify(playlists) | |
if request.method == 'POST': | |
data = request.get_json() | |
playlist_name = data.get('name') | |
songs = data.get('songs', []) | |
if playlist_name: | |
playlists[playlist_name] = songs | |
return jsonify({'message': f'Playlist {playlist_name} created/updated'}) | |
return jsonify({'error': 'No playlist name provided'}), 400 | |
if request.method == 'DELETE': | |
playlist_name = request.args.get('name') | |
if playlist_name in playlists: | |
del playlists[playlist_name] | |
return jsonify({'message': f'Playlist {playlist_name} deleted'}) | |
return jsonify({'error': 'Playlist not found'}), 404 | |
def add_to_playlist(name): | |
data = request.get_json() | |
songs = data.get('songs', []) | |
if name not in playlists: | |
playlists[name] = [] | |
playlists[name].extend(songs) | |
return jsonify(playlists[name]) | |
def remove_from_playlist(name): | |
if name not in playlists: | |
return jsonify({'error': 'Playlist not found'}), 404 | |
data = request.get_json() | |
indices = data.get('indices', []) | |
# Remove songs in reverse order to avoid index shifting | |
for index in sorted(indices, reverse=True): | |
if 0 <= index < len(playlists[name]): | |
playlists[name].pop(index) | |
return jsonify(playlists[name]) | |
def get_synchronized_lyrics(audio): | |
"""Extract synchronized lyrics from FLAC metadata""" | |
try: | |
if 'LYRICS' in audio.tags: | |
# Try to parse synchronized lyrics in LRC format | |
lyrics = audio.tags['LYRICS'][0] | |
return parse_lrc_format(lyrics) | |
except Exception as e: | |
print(f"Error parsing synchronized lyrics: {str(e)}") | |
return [] | |
def get_mp3_lyrics(audio): | |
"""Extract plain lyrics from MP3 metadata""" | |
try: | |
if hasattr(audio, 'tags'): | |
# Try different common lyrics tag formats | |
for tag in ['USLT::', 'LYRICS', 'LYRICS:']: | |
if tag in audio.tags: | |
return str(audio.tags[tag]) | |
except Exception as e: | |
print(f"Error extracting MP3 lyrics: {str(e)}") | |
return '' | |
def get_mp3_synchronized_lyrics(audio): | |
"""Extract synchronized lyrics from MP3 metadata""" | |
try: | |
if hasattr(audio, 'tags'): | |
# Look for SYLT (Synchronized Lyrics) frames | |
sylt_frames = [f for f in audio.tags if f.startswith('SYLT:')] | |
if sylt_frames: | |
sylt = audio.tags[sylt_frames[0]] | |
return parse_sylt_format(sylt) | |
except Exception as e: | |
print(f"Error extracting MP3 synchronized lyrics: {str(e)}") | |
return [] | |
def parse_lrc_format(lrc_text): | |
"""Parse LRC format synchronized lyrics""" | |
lyrics = [] | |
for line in lrc_text.split('\n'): | |
if line.strip(): | |
try: | |
# LRC format: [mm:ss.xx]lyrics text | |
time_str = line[1:line.find(']')] | |
text = line[line.find(']')+1:].strip() | |
# Convert time to seconds | |
if '.' in time_str: | |
mm_ss, ms = time_str.split('.') | |
else: | |
mm_ss, ms = time_str, '0' | |
minutes, seconds = map(int, mm_ss.split(':')) | |
timestamp = minutes * 60 + seconds + float(f'0.{ms}') | |
lyrics.append({'time': timestamp, 'text': text}) | |
except: | |
continue | |
return sorted(lyrics, key=lambda x: x['time']) | |
def parse_sylt_format(sylt): | |
"""Parse SYLT format synchronized lyrics""" | |
lyrics = [] | |
for time, text in sylt.text: | |
lyrics.append({ | |
'time': time / 1000.0, # Convert milliseconds to seconds | |
'text': text | |
}) | |
return sorted(lyrics, key=lambda x: x['time']) | |
if __name__ == '__main__': | |
# Create music directory if it doesn't exist | |
if not os.path.exists(app.config['MUSIC_FOLDER']): | |
os.makedirs(app.config['MUSIC_FOLDER']) | |
# Use environment variables for host and port | |
port = int(os.environ.get('PORT', 7860)) | |
app.run(host='0.0.0.0', port=port) |