from flask import Flask, render_template, send_from_directory, jsonify, request import os import mutagen from mutagen.flac import FLAC from mutagen.mp3 import MP3 from mutagen.wave import WAVE from functools import lru_cache import base64 from pathlib import Path app = Flask(__name__) # Configure music folder using Path for better path handling app.config['MUSIC_FOLDER'] = Path(__file__).parent / 'music' # Global cache for metadata METADATA_CACHE = {} STRUCTURE_CACHE = None CACHE_VERSION = 1 # Increment this when changing cache logic @lru_cache(maxsize=100) def get_audio_metadata(file_path): """Cache metadata for frequently accessed files""" try: # Check if metadata is in cache if file_path in METADATA_CACHE: return METADATA_CACHE[file_path] metadata = None if file_path.endswith('.flac'): audio = FLAC(file_path) metadata = { 'title': str(audio.get('title', [Path(file_path).name])[0]), 'artist': str(audio.get('artist', ['Unknown'])[0]), 'album': str(audio.get('album', ['Unknown'])[0]), 'duration': int(audio.info.length), 'sample_rate': audio.info.sample_rate, 'channels': audio.info.channels, 'artwork': None, 'lyrics': str(audio.get('lyrics', [''])[0]), 'synchronized_lyrics': get_synchronized_lyrics(audio) } if audio.pictures: try: artwork_data = audio.pictures[0].data metadata['artwork'] = f"data:image/jpeg;base64,{base64.b64encode(artwork_data).decode('utf-8')}" except Exception: pass elif file_path.endswith('.mp3'): audio = MP3(file_path) metadata = { 'title': audio.tags.get('TIT2', [Path(file_path).name])[0] if hasattr(audio, 'tags') else Path(file_path).name, 'artist': audio.tags.get('TPE1', ['Unknown'])[0] if hasattr(audio, 'tags') else 'Unknown', 'album': audio.tags.get('TALB', ['Unknown'])[0] if hasattr(audio, 'tags') else 'Unknown', 'duration': int(audio.info.length), 'sample_rate': audio.info.sample_rate, 'channels': audio.info.channels, 'artwork': None, 'lyrics': get_mp3_lyrics(audio), 'synchronized_lyrics': get_mp3_synchronized_lyrics(audio) } if hasattr(audio, 'tags'): try: apic_keys = [k for k in audio.tags.keys() if k.startswith('APIC:')] if apic_keys: artwork_data = audio.tags[apic_keys[0]].data metadata['artwork'] = f"data:image/jpeg;base64,{base64.b64encode(artwork_data).decode('utf-8')}" except Exception: pass elif file_path.endswith('.wav'): audio = WAVE(file_path) metadata = { 'title': Path(file_path).name, 'artist': 'Unknown', 'album': 'Unknown', 'duration': int(audio.info.length), 'sample_rate': audio.info.sample_rate, 'channels': audio.info.channels, 'artwork': None, 'lyrics': '', 'synchronized_lyrics': [] } if metadata: METADATA_CACHE[file_path] = metadata return metadata return None except Exception as e: print(f"Error reading metadata for {file_path}: {str(e)}") return { 'title': Path(file_path).name, 'artist': 'Unknown', 'album': 'Unknown', 'duration': 0, 'sample_rate': 0, 'channels': 0, 'artwork': None } def get_folder_structure(path): """Get folder structure with caching""" global STRUCTURE_CACHE try: if STRUCTURE_CACHE is not None: return STRUCTURE_CACHE if not Path(path).exists(): return [] structure = [] for item in Path(path).iterdir(): if item.is_file() and item.suffix.lower() in ('.mp3', '.wav', '.flac'): metadata = get_audio_metadata(str(item)) if metadata: structure.append({ 'type': 'file', 'name': item.name, 'path': str(item.relative_to(app.config['MUSIC_FOLDER'])), 'metadata': metadata }) elif item.is_dir(): structure.append({ 'type': 'folder', 'name': item.name, 'path': str(item.relative_to(app.config['MUSIC_FOLDER'])), 'contents': get_folder_structure(item) }) STRUCTURE_CACHE = structure return structure except Exception as e: print(f"Error scanning directory {path}: {str(e)}") return [] @app.route('/api/music-structure') def get_music_structure(): """Get music structure with caching""" music_dir = app.config['MUSIC_FOLDER'] if not music_dir.exists(): music_dir.mkdir(parents=True, exist_ok=True) return jsonify([]) if not any(music_dir.iterdir()): return jsonify([]) structure = get_folder_structure(music_dir) return jsonify(structure) # Add cache invalidation endpoint for development @app.route('/api/clear-cache', methods=['POST']) def clear_cache(): """Clear all caches - use in development only""" global STRUCTURE_CACHE, METADATA_CACHE STRUCTURE_CACHE = None METADATA_CACHE.clear() get_audio_metadata.cache_clear() return jsonify({"message": "Cache cleared"}) @app.route('/music/') def serve_music(filename): return send_from_directory('music', filename) @app.route('/') def index(): return render_template('index.html') @app.route('/static/default-cover.png') def default_cover(): return send_from_directory('static', 'default-cover.png') @app.route('/api/playlists', methods=['GET', 'POST', 'DELETE']) def manage_playlists(): global playlists if request.method == 'GET': return jsonify(playlists) if request.method == 'POST': data = request.get_json() playlist_name = data.get('name') songs = data.get('songs', []) if playlist_name: playlists[playlist_name] = songs return jsonify({'message': f'Playlist {playlist_name} created/updated'}) return jsonify({'error': 'No playlist name provided'}), 400 if request.method == 'DELETE': playlist_name = request.args.get('name') if playlist_name in playlists: del playlists[playlist_name] return jsonify({'message': f'Playlist {playlist_name} deleted'}) return jsonify({'error': 'Playlist not found'}), 404 @app.route('/api/playlists//add', methods=['POST']) def add_to_playlist(name): data = request.get_json() songs = data.get('songs', []) if name not in playlists: playlists[name] = [] playlists[name].extend(songs) return jsonify(playlists[name]) @app.route('/api/playlists//remove', methods=['POST']) def remove_from_playlist(name): if name not in playlists: return jsonify({'error': 'Playlist not found'}), 404 data = request.get_json() indices = data.get('indices', []) # Remove songs in reverse order to avoid index shifting for index in sorted(indices, reverse=True): if 0 <= index < len(playlists[name]): playlists[name].pop(index) return jsonify(playlists[name]) def get_synchronized_lyrics(audio): """Extract synchronized lyrics from FLAC metadata""" try: if 'LYRICS' in audio.tags: # Try to parse synchronized lyrics in LRC format lyrics = audio.tags['LYRICS'][0] return parse_lrc_format(lyrics) except Exception as e: print(f"Error parsing synchronized lyrics: {str(e)}") return [] def get_mp3_lyrics(audio): """Extract plain lyrics from MP3 metadata""" try: if hasattr(audio, 'tags'): # Try different common lyrics tag formats for tag in ['USLT::', 'LYRICS', 'LYRICS:']: if tag in audio.tags: return str(audio.tags[tag]) except Exception as e: print(f"Error extracting MP3 lyrics: {str(e)}") return '' def get_mp3_synchronized_lyrics(audio): """Extract synchronized lyrics from MP3 metadata""" try: if hasattr(audio, 'tags'): # Look for SYLT (Synchronized Lyrics) frames sylt_frames = [f for f in audio.tags if f.startswith('SYLT:')] if sylt_frames: sylt = audio.tags[sylt_frames[0]] return parse_sylt_format(sylt) except Exception as e: print(f"Error extracting MP3 synchronized lyrics: {str(e)}") return [] def parse_lrc_format(lrc_text): """Parse LRC format synchronized lyrics""" lyrics = [] for line in lrc_text.split('\n'): if line.strip(): try: # LRC format: [mm:ss.xx]lyrics text time_str = line[1:line.find(']')] text = line[line.find(']')+1:].strip() # Convert time to seconds if '.' in time_str: mm_ss, ms = time_str.split('.') else: mm_ss, ms = time_str, '0' minutes, seconds = map(int, mm_ss.split(':')) timestamp = minutes * 60 + seconds + float(f'0.{ms}') lyrics.append({'time': timestamp, 'text': text}) except: continue return sorted(lyrics, key=lambda x: x['time']) def parse_sylt_format(sylt): """Parse SYLT format synchronized lyrics""" lyrics = [] for time, text in sylt.text: lyrics.append({ 'time': time / 1000.0, # Convert milliseconds to seconds 'text': text }) return sorted(lyrics, key=lambda x: x['time']) if __name__ == '__main__': # Create music directory if it doesn't exist if not os.path.exists(app.config['MUSIC_FOLDER']): os.makedirs(app.config['MUSIC_FOLDER']) # Use environment variables for host and port port = int(os.environ.get('PORT', 7860)) app.run(host='0.0.0.0', port=port)