Music_Player / app.py
AdityaAdaki
docker update
f988d03
from flask import Flask, render_template, send_from_directory, jsonify, request
import os
import mutagen
from mutagen.flac import FLAC
from mutagen.mp3 import MP3
from mutagen.wave import WAVE
from functools import lru_cache
import base64
from pathlib import Path
app = Flask(__name__)
# Configure music folder using Path for better path handling
app.config['MUSIC_FOLDER'] = Path(__file__).parent / 'music'
# Global cache for metadata
METADATA_CACHE = {}
STRUCTURE_CACHE = None
CACHE_VERSION = 1 # Increment this when changing cache logic
@lru_cache(maxsize=100)
def get_audio_metadata(file_path):
"""Cache metadata for frequently accessed files"""
try:
# Check if metadata is in cache
if file_path in METADATA_CACHE:
return METADATA_CACHE[file_path]
metadata = None
if file_path.endswith('.flac'):
audio = FLAC(file_path)
metadata = {
'title': str(audio.get('title', [Path(file_path).name])[0]),
'artist': str(audio.get('artist', ['Unknown'])[0]),
'album': str(audio.get('album', ['Unknown'])[0]),
'duration': int(audio.info.length),
'sample_rate': audio.info.sample_rate,
'channels': audio.info.channels,
'artwork': None,
'lyrics': str(audio.get('lyrics', [''])[0]),
'synchronized_lyrics': get_synchronized_lyrics(audio)
}
if audio.pictures:
try:
artwork_data = audio.pictures[0].data
metadata['artwork'] = f"data:image/jpeg;base64,{base64.b64encode(artwork_data).decode('utf-8')}"
except Exception:
pass
elif file_path.endswith('.mp3'):
audio = MP3(file_path)
metadata = {
'title': audio.tags.get('TIT2', [Path(file_path).name])[0] if hasattr(audio, 'tags') else Path(file_path).name,
'artist': audio.tags.get('TPE1', ['Unknown'])[0] if hasattr(audio, 'tags') else 'Unknown',
'album': audio.tags.get('TALB', ['Unknown'])[0] if hasattr(audio, 'tags') else 'Unknown',
'duration': int(audio.info.length),
'sample_rate': audio.info.sample_rate,
'channels': audio.info.channels,
'artwork': None,
'lyrics': get_mp3_lyrics(audio),
'synchronized_lyrics': get_mp3_synchronized_lyrics(audio)
}
if hasattr(audio, 'tags'):
try:
apic_keys = [k for k in audio.tags.keys() if k.startswith('APIC:')]
if apic_keys:
artwork_data = audio.tags[apic_keys[0]].data
metadata['artwork'] = f"data:image/jpeg;base64,{base64.b64encode(artwork_data).decode('utf-8')}"
except Exception:
pass
elif file_path.endswith('.wav'):
audio = WAVE(file_path)
metadata = {
'title': Path(file_path).name,
'artist': 'Unknown',
'album': 'Unknown',
'duration': int(audio.info.length),
'sample_rate': audio.info.sample_rate,
'channels': audio.info.channels,
'artwork': None,
'lyrics': '',
'synchronized_lyrics': []
}
if metadata:
METADATA_CACHE[file_path] = metadata
return metadata
return None
except Exception as e:
print(f"Error reading metadata for {file_path}: {str(e)}")
return {
'title': Path(file_path).name,
'artist': 'Unknown',
'album': 'Unknown',
'duration': 0,
'sample_rate': 0,
'channels': 0,
'artwork': None
}
def get_folder_structure(path):
"""Get folder structure with caching"""
global STRUCTURE_CACHE
try:
if STRUCTURE_CACHE is not None:
return STRUCTURE_CACHE
if not Path(path).exists():
return []
structure = []
for item in Path(path).iterdir():
if item.is_file() and item.suffix.lower() in ('.mp3', '.wav', '.flac'):
metadata = get_audio_metadata(str(item))
if metadata:
structure.append({
'type': 'file',
'name': item.name,
'path': str(item.relative_to(app.config['MUSIC_FOLDER'])),
'metadata': metadata
})
elif item.is_dir():
structure.append({
'type': 'folder',
'name': item.name,
'path': str(item.relative_to(app.config['MUSIC_FOLDER'])),
'contents': get_folder_structure(item)
})
STRUCTURE_CACHE = structure
return structure
except Exception as e:
print(f"Error scanning directory {path}: {str(e)}")
return []
@app.route('/api/music-structure')
def get_music_structure():
"""Get music structure with caching"""
music_dir = app.config['MUSIC_FOLDER']
if not music_dir.exists():
music_dir.mkdir(parents=True, exist_ok=True)
return jsonify([])
if not any(music_dir.iterdir()):
return jsonify([])
structure = get_folder_structure(music_dir)
return jsonify(structure)
# Add cache invalidation endpoint for development
@app.route('/api/clear-cache', methods=['POST'])
def clear_cache():
"""Clear all caches - use in development only"""
global STRUCTURE_CACHE, METADATA_CACHE
STRUCTURE_CACHE = None
METADATA_CACHE.clear()
get_audio_metadata.cache_clear()
return jsonify({"message": "Cache cleared"})
@app.route('/music/<path:filename>')
def serve_music(filename):
return send_from_directory('music', filename)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/static/default-cover.png')
def default_cover():
return send_from_directory('static', 'default-cover.png')
@app.route('/api/playlists', methods=['GET', 'POST', 'DELETE'])
def manage_playlists():
global playlists
if request.method == 'GET':
return jsonify(playlists)
if request.method == 'POST':
data = request.get_json()
playlist_name = data.get('name')
songs = data.get('songs', [])
if playlist_name:
playlists[playlist_name] = songs
return jsonify({'message': f'Playlist {playlist_name} created/updated'})
return jsonify({'error': 'No playlist name provided'}), 400
if request.method == 'DELETE':
playlist_name = request.args.get('name')
if playlist_name in playlists:
del playlists[playlist_name]
return jsonify({'message': f'Playlist {playlist_name} deleted'})
return jsonify({'error': 'Playlist not found'}), 404
@app.route('/api/playlists/<name>/add', methods=['POST'])
def add_to_playlist(name):
data = request.get_json()
songs = data.get('songs', [])
if name not in playlists:
playlists[name] = []
playlists[name].extend(songs)
return jsonify(playlists[name])
@app.route('/api/playlists/<name>/remove', methods=['POST'])
def remove_from_playlist(name):
if name not in playlists:
return jsonify({'error': 'Playlist not found'}), 404
data = request.get_json()
indices = data.get('indices', [])
# Remove songs in reverse order to avoid index shifting
for index in sorted(indices, reverse=True):
if 0 <= index < len(playlists[name]):
playlists[name].pop(index)
return jsonify(playlists[name])
def get_synchronized_lyrics(audio):
"""Extract synchronized lyrics from FLAC metadata"""
try:
if 'LYRICS' in audio.tags:
# Try to parse synchronized lyrics in LRC format
lyrics = audio.tags['LYRICS'][0]
return parse_lrc_format(lyrics)
except Exception as e:
print(f"Error parsing synchronized lyrics: {str(e)}")
return []
def get_mp3_lyrics(audio):
"""Extract plain lyrics from MP3 metadata"""
try:
if hasattr(audio, 'tags'):
# Try different common lyrics tag formats
for tag in ['USLT::', 'LYRICS', 'LYRICS:']:
if tag in audio.tags:
return str(audio.tags[tag])
except Exception as e:
print(f"Error extracting MP3 lyrics: {str(e)}")
return ''
def get_mp3_synchronized_lyrics(audio):
"""Extract synchronized lyrics from MP3 metadata"""
try:
if hasattr(audio, 'tags'):
# Look for SYLT (Synchronized Lyrics) frames
sylt_frames = [f for f in audio.tags if f.startswith('SYLT:')]
if sylt_frames:
sylt = audio.tags[sylt_frames[0]]
return parse_sylt_format(sylt)
except Exception as e:
print(f"Error extracting MP3 synchronized lyrics: {str(e)}")
return []
def parse_lrc_format(lrc_text):
"""Parse LRC format synchronized lyrics"""
lyrics = []
for line in lrc_text.split('\n'):
if line.strip():
try:
# LRC format: [mm:ss.xx]lyrics text
time_str = line[1:line.find(']')]
text = line[line.find(']')+1:].strip()
# Convert time to seconds
if '.' in time_str:
mm_ss, ms = time_str.split('.')
else:
mm_ss, ms = time_str, '0'
minutes, seconds = map(int, mm_ss.split(':'))
timestamp = minutes * 60 + seconds + float(f'0.{ms}')
lyrics.append({'time': timestamp, 'text': text})
except:
continue
return sorted(lyrics, key=lambda x: x['time'])
def parse_sylt_format(sylt):
"""Parse SYLT format synchronized lyrics"""
lyrics = []
for time, text in sylt.text:
lyrics.append({
'time': time / 1000.0, # Convert milliseconds to seconds
'text': text
})
return sorted(lyrics, key=lambda x: x['time'])
if __name__ == '__main__':
# Create music directory if it doesn't exist
if not os.path.exists(app.config['MUSIC_FOLDER']):
os.makedirs(app.config['MUSIC_FOLDER'])
# Use environment variables for host and port
port = int(os.environ.get('PORT', 7860))
app.run(host='0.0.0.0', port=port)