Spaces:
Running
Running
# app.py | |
from flask import Flask, render_template, send_from_directory, request, jsonify | |
import os | |
from supabase import create_client | |
from dotenv import load_dotenv | |
from werkzeug.utils import secure_filename | |
import uuid | |
import logging | |
import sys | |
# Try to load .env file (for local development) | |
# In production (Hugging Face Spaces), environment variables are usually available directly | |
try: | |
load_dotenv() | |
logger_temp = logging.getLogger('dotenv') | |
logger_temp.info("β dotenv loaded successfully") | |
except Exception as e: | |
logger_temp = logging.getLogger('dotenv') | |
logger_temp.warning(f"β οΈ dotenv loading failed (this is normal in production): {e}") | |
app = Flask(__name__) | |
# Configure logging for production deployment | |
# This replaces print() statements which don't show up in Hugging Face Spaces | |
# | |
# To view logs in Hugging Face Spaces: | |
# 1. Go to your Space settings β Logs tab | |
# 2. Use the /api/debug/logs endpoint to view logs in the web interface | |
# 3. Check the app.log file that gets created | |
# | |
# For debugging: | |
# - Use logger.info() for informational messages | |
# - Use logger.error() for errors | |
# - Use logger.warning() for warnings | |
# - Use logger.debug() for detailed debugging (set level to DEBUG) | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler('app.log', mode='a'), | |
logging.StreamHandler(sys.stdout) | |
], | |
force=True | |
) | |
logger = logging.getLogger(__name__) | |
# Ensure logs are flushed immediately | |
logging.getLogger().handlers[0].flush() | |
logging.getLogger().handlers[1].flush() | |
# Upload configuration | |
UPLOAD_FOLDER = 'static/audio' | |
ALLOWED_EXTENSIONS = {'mp3', 'wav', 'm4a', 'flac', 'ogg', 'aac'} | |
MAX_CONTENT_LENGTH = 50 * 1024 * 1024 # 50MB max file size | |
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH | |
# Ensure upload directory exists | |
os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
def allowed_file(filename): | |
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
# Supabase client | |
def get_supabase_client(): | |
url = os.environ.get("SUPABASE_URL") | |
key = os.environ.get("SUPABASE_KEY") | |
logger.info(f"URL: {url}") | |
logger.info(f"Key: {key[:5]}..." if key else "Key: None") | |
# Additional debug info for production | |
logger.info(f"Environment variables loaded: URL={'β' if url else 'β'}, KEY={'β' if key else 'β'}") | |
logger.info(f"URL length: {len(url) if url else 0}") | |
logger.info(f"Key length: {len(key) if key else 0}") | |
logger.info(f"β Environment variables loaded successfully") | |
if not url or not key: | |
logger.error("β Failed to initialize Supabase client: SUPABASE_URL and SUPABASE_KEY environment variables are required") | |
raise ValueError("SUPABASE_URL and SUPABASE_KEY environment variables are required") | |
try: | |
client = create_client(url, key) | |
logger.info("β Supabase client created successfully") | |
# Test the connection with a simple query | |
test_response = client.table("playlists").select("count", count="exact").limit(1).execute() | |
logger.info(f"β Supabase connection test successful: {test_response}") | |
return client | |
except Exception as e: | |
logger.error(f"β Failed to create or test Supabase client: {e}") | |
logger.error(f"Error type: {type(e).__name__}") | |
raise e | |
# Initialize database tables | |
def init_db(): | |
try: | |
supabase = get_supabase_client() | |
# Create tables using SQL (Supabase client doesn't have table creation methods) | |
# Tables will be created automatically by Supabase or you can create them manually in the dashboard | |
logger.info("β Supabase client initialized successfully!") | |
return True | |
except Exception as e: | |
logger.error(f"β Failed to initialize Supabase client: {e}") | |
return False | |
# Add route for serving audio files | |
def serve_audio(filename): | |
return send_from_directory('static/audio', filename) | |
def home(): | |
# Get list of audio files | |
audio_dir = os.path.join('static', 'audio') | |
audio_files = sorted([f for f in os.listdir(audio_dir) if f.endswith(('.mp3', '.wav'))]) | |
return render_template("index.html", audio_files=audio_files) | |
# API Routes for playlist management | |
def get_playlists(): | |
try: | |
supabase = get_supabase_client() | |
response = supabase.table("playlists").select("*").order("created_at", desc=True).execute() | |
playlists = response.data | |
return jsonify(playlists) | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
def create_playlist(): | |
try: | |
logger.info("Creating new playlist") | |
data = request.get_json() | |
logger.info(f"Playlist data: {data}") | |
name = data.get('name') | |
songs = data.get('songs', []) | |
if not name: | |
logger.error("Playlist name is required") | |
return jsonify({'error': 'Playlist name is required'}), 400 | |
logger.info(f"Creating playlist: {name} with {len(songs)} songs") | |
supabase = get_supabase_client() | |
# Create playlist | |
logger.info("Inserting playlist into database") | |
playlist_response = supabase.table("playlists").insert({"name": name}).execute() | |
logger.info(f"Playlist creation response: {playlist_response}") | |
playlist = playlist_response.data[0] | |
playlist_id = playlist['id'] | |
logger.info(f"Created playlist with ID: {playlist_id}") | |
# Add songs to playlist | |
if songs: | |
logger.info(f"Adding {len(songs)} songs to playlist") | |
song_data = [{"playlist_id": playlist_id, "song_name": song, "song_order": i} | |
for i, song in enumerate(songs)] | |
song_response = supabase.table("playlist_songs").insert(song_data).execute() | |
logger.info(f"Songs added response: {song_response}") | |
result = {'id': playlist_id, 'name': name, 'songs': songs} | |
logger.info(f"Playlist created successfully: {result}") | |
return jsonify(result), 201 | |
except Exception as e: | |
logger.error(f"β Error creating playlist: {e}") | |
logger.error(f"Error type: {type(e).__name__}") | |
logger.error(f"Error details: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def get_playlist_songs(playlist_id): | |
try: | |
supabase = get_supabase_client() | |
response = supabase.table("playlist_songs").select("song_name, song_order").eq("playlist_id", playlist_id).order("song_order").execute() | |
songs = response.data | |
return jsonify(songs) | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
def delete_playlist(playlist_id): | |
try: | |
supabase = get_supabase_client() | |
# Delete playlist (cascade will handle playlist_songs) | |
response = supabase.table("playlists").delete().eq("id", playlist_id).execute() | |
if not response.data: | |
return jsonify({'error': 'Playlist not found'}), 404 | |
return jsonify({'message': 'Playlist deleted successfully'}) | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
def get_preference(key): | |
try: | |
logger.info(f"Getting preference for key: {key}") | |
supabase = get_supabase_client() | |
response = supabase.table("user_preferences").select("preference_value").eq("preference_key", key).execute() | |
logger.info(f"Supabase response: {response}") | |
if response.data: | |
logger.info(f"Preference found: {response.data[0]}") | |
return jsonify({'value': response.data[0]['preference_value']}) | |
else: | |
logger.info(f"Preference not found for key: {key}") | |
return jsonify({'error': 'Preference not found'}), 404 | |
except Exception as e: | |
logger.error(f"β Error getting preference '{key}': {e}") | |
logger.error(f"Error type: {type(e).__name__}") | |
logger.error(f"Error details: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def set_preference(key): | |
try: | |
data = request.get_json() | |
value = data.get('value') | |
logger.info(f"Setting preference: {key} = {value}") | |
supabase = get_supabase_client() | |
# First, try to update existing preference | |
update_response = supabase.table("user_preferences").update({ | |
"preference_value": value | |
}).eq("preference_key", key).execute() | |
# If no rows were updated, insert new preference | |
if not update_response.data: | |
insert_response = supabase.table("user_preferences").insert({ | |
"preference_key": key, | |
"preference_value": value | |
}).execute() | |
logger.info(f"Preference inserted successfully: {insert_response.data}") | |
else: | |
logger.info(f"Preference updated successfully: {update_response.data}") | |
return jsonify({'message': 'Preference updated successfully'}) | |
except Exception as e: | |
logger.error(f"Error setting preference {key}: {e}") | |
return jsonify({'error': str(e)}), 500 | |
def upload_file(): | |
try: | |
if 'file' not in request.files: | |
return jsonify({'error': 'No file selected'}), 400 | |
file = request.files['file'] | |
if file.filename == '': | |
return jsonify({'error': 'No file selected'}), 400 | |
if not allowed_file(file.filename): | |
return jsonify({'error': 'File type not supported. Please upload MP3, WAV, M4A, FLAC, OGG, or AAC files.'}), 400 | |
# Generate unique filename to prevent conflicts | |
filename = secure_filename(file.filename) | |
name, ext = os.path.splitext(filename) | |
unique_filename = f"{name}_{uuid.uuid4().hex[:8]}{ext}" | |
# Save file to upload folder | |
file_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename) | |
file.save(file_path) | |
return jsonify({ | |
'message': 'File uploaded successfully', | |
'filename': unique_filename, | |
'original_name': filename | |
}), 200 | |
except Exception as e: | |
logger.error(f"Error uploading file: {e}") | |
return jsonify({'error': str(e)}), 500 | |
def refresh_songs(): | |
"""Refresh the list of available songs""" | |
try: | |
audio_files = [] | |
audio_dir = os.path.join(app.static_folder, 'audio') | |
if os.path.exists(audio_dir): | |
for file in os.listdir(audio_dir): | |
if allowed_file(file): | |
audio_files.append(file) | |
audio_files.sort() | |
return jsonify({'songs': audio_files}), 200 | |
except Exception as e: | |
logger.error(f"Error refreshing songs: {e}") | |
return jsonify({'error': str(e)}), 500 | |
def add_song_to_playlist(playlist_id): | |
"""Add a song to an existing playlist""" | |
try: | |
data = request.get_json() | |
song_name = data.get('song_name') | |
if not song_name: | |
return jsonify({'error': 'Song name is required'}), 400 | |
supabase = get_supabase_client() | |
# Check if song already exists in playlist | |
existing = supabase.table("playlist_songs").select("*").eq("playlist_id", playlist_id).eq("song_name", song_name).execute() | |
if existing.data: | |
return jsonify({'error': 'Song already exists in playlist'}), 400 | |
# Get the current max order for ordering | |
max_order = supabase.table("playlist_songs").select("song_order").eq("playlist_id", playlist_id).order("song_order", desc=True).limit(1).execute() | |
next_order = 1 | |
if max_order.data: | |
next_order = max_order.data[0]['song_order'] + 1 | |
# Add song to playlist | |
result = supabase.table("playlist_songs").insert({ | |
"playlist_id": playlist_id, | |
"song_name": song_name, | |
"song_order": next_order | |
}).execute() | |
return jsonify({'message': 'Song added to playlist successfully', 'data': result.data}), 200 | |
except Exception as e: | |
logger.error(f"Error adding song to playlist: {e}") | |
return jsonify({'error': str(e)}), 500 | |
def remove_song_from_playlist(playlist_id): | |
"""Remove a song from a playlist""" | |
try: | |
data = request.get_json() | |
song_name = data.get('song_name') | |
if not song_name: | |
return jsonify({'error': 'Song name is required'}), 400 | |
supabase = get_supabase_client() | |
# Remove song from playlist | |
result = supabase.table("playlist_songs").delete().eq("playlist_id", playlist_id).eq("song_name", song_name).execute() | |
if not result.data: | |
return jsonify({'error': 'Song not found in playlist'}), 404 | |
return jsonify({'message': 'Song removed from playlist successfully'}), 200 | |
except Exception as e: | |
logger.error(f"Error removing song from playlist: {e}") | |
return jsonify({'error': str(e)}), 500 | |
def reorder_playlist_songs(playlist_id): | |
"""Reorder songs in a playlist""" | |
try: | |
data = request.get_json() | |
song_order = data.get('song_order') # Array of song names in new order | |
if not song_order or not isinstance(song_order, list): | |
return jsonify({'error': 'Song order array is required'}), 400 | |
supabase = get_supabase_client() | |
# Update song_order for each song | |
for index, song_name in enumerate(song_order): | |
supabase.table("playlist_songs").update({ | |
"song_order": index + 1 | |
}).eq("playlist_id", playlist_id).eq("song_name", song_name).execute() | |
return jsonify({'message': 'Playlist reordered successfully'}), 200 | |
except Exception as e: | |
logger.error(f"Error reordering playlist: {e}") | |
return jsonify({'error': str(e)}), 500 | |
def rename_playlist(playlist_id): | |
"""Rename a playlist""" | |
try: | |
data = request.get_json() | |
new_name = data.get('name') | |
if not new_name: | |
return jsonify({'error': 'New playlist name is required'}), 400 | |
supabase = get_supabase_client() | |
# Update playlist name | |
result = supabase.table("playlists").update({ | |
"name": new_name | |
}).eq("id", playlist_id).execute() | |
if not result.data: | |
return jsonify({'error': 'Playlist not found'}), 404 | |
return jsonify({'message': 'Playlist renamed successfully', 'data': result.data}), 200 | |
except Exception as e: | |
logger.error(f"Error renaming playlist: {e}") | |
return jsonify({'error': str(e)}), 500 | |
def get_logs(): | |
"""Get recent log entries for debugging""" | |
try: | |
log_lines = [] | |
if os.path.exists('app.log'): | |
with open('app.log', 'r') as f: | |
log_lines = f.readlines()[-100:] # Get last 100 lines | |
return jsonify({'logs': log_lines}), 200 | |
except Exception as e: | |
logger.error(f"Error reading logs: {e}") | |
return jsonify({'error': str(e)}), 500 | |
def get_debug_status(): | |
"""Get system status for debugging""" | |
try: | |
status = { | |
'environment_variables': { | |
'SUPABASE_URL': 'β' if os.environ.get("SUPABASE_URL") else 'β', | |
'SUPABASE_KEY': 'β' if os.environ.get("SUPABASE_KEY") else 'β', | |
'SUPABASE_URL_LENGTH': len(os.environ.get("SUPABASE_URL", "")), | |
'SUPABASE_KEY_LENGTH': len(os.environ.get("SUPABASE_KEY", "")), | |
'SUPABASE_URL_PREFIX': os.environ.get("SUPABASE_URL", "")[:20] + "..." if os.environ.get("SUPABASE_URL") else None, | |
'SUPABASE_KEY_PREFIX': os.environ.get("SUPABASE_KEY", "")[:10] + "..." if os.environ.get("SUPABASE_KEY") else None, | |
}, | |
'dotenv_loaded': True, # We know load_dotenv() was called | |
'python_version': sys.version, | |
'cwd': os.getcwd(), | |
'env_file_exists': os.path.exists('.env'), | |
} | |
# Try to create Supabase client | |
try: | |
supabase = get_supabase_client() | |
status['supabase_client'] = 'β' | |
# Try a simple query | |
try: | |
test_response = supabase.table("playlists").select("count", count="exact").limit(1).execute() | |
status['supabase_connection'] = 'β' | |
status['test_query_result'] = str(test_response) | |
except Exception as e: | |
status['supabase_connection'] = 'β' | |
status['supabase_error'] = str(e) | |
except Exception as e: | |
status['supabase_client'] = 'β' | |
status['supabase_client_error'] = str(e) | |
return jsonify(status), 200 | |
except Exception as e: | |
logger.error(f"Error getting debug status: {e}") | |
return jsonify({'error': str(e)}), 500 | |
def check_env(): | |
"""Simple environment variable check""" | |
try: | |
import os | |
# Check various ways to access environment variables | |
env_check = { | |
'os.environ.get("SUPABASE_URL")': os.environ.get("SUPABASE_URL"), | |
'os.getenv("SUPABASE_URL")': os.getenv("SUPABASE_URL"), | |
'os.environ.get("SUPABASE_KEY")': os.environ.get("SUPABASE_KEY"), | |
'os.getenv("SUPABASE_KEY")': os.getenv("SUPABASE_KEY"), | |
'SUPABASE_URL' in os.environ: "SUPABASE_URL" in os.environ, | |
'SUPABASE_KEY' in os.environ: "SUPABASE_KEY" in os.environ, | |
'all_env_vars': list(os.environ.keys()) | |
} | |
# Only show first 10 chars of sensitive data | |
for key in env_check: | |
if "SUPABASE" in key and isinstance(env_check[key], str) and len(env_check[key]) > 10: | |
env_check[key] = env_check[key][:10] + "..." | |
return jsonify(env_check), 200 | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
if __name__ == "__main__": | |
init_db() | |
app.run(debug=True) |