import shutil from fastapi import FastAPI, HTTPException, Request from deezspot.deezloader import DeeLogin import requests import os import logging from typing import Optional from fastapi.staticfiles import StaticFiles from dotenv import load_dotenv from pydantic import BaseModel from urllib.parse import quote from pathlib import Path import uuid import gc # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI(title="Deezer API") # Load environment variables load_dotenv() # Mount a static files directory to serve downloaded files os.makedirs("downloads", exist_ok=True) app.mount("/downloads", StaticFiles(directory="downloads"), name="downloads") # Deezer API base URL DEEZER_API_URL = "https://api.deezer.com" # Deezer ARL token (required for deezspot downloads) ARL_TOKEN = os.getenv('ARL') # 定义请求体模型 class DownloadRequest(BaseModel): url: str quality: str def cleanup_dir(dir_path: Path): """Background task to clean up directory""" try: shutil.rmtree(dir_path) logger.info(f"Cleaned up directory: {dir_path}") except Exception as e: logger.error(f"Error cleaning up {dir_path}: {e}") @app.get("/") def read_root(): return {"message": "running"} # Download a track and return a download URL class deezloadRequest(BaseModel): url: str quality: str arl: str def convert_deezer_short_link_async(short_link: str) -> str: try: response = requests.get(short_link, allow_redirects=True) return response.url except requests.RequestException as e: print(f"An error occurred: {e}") return "" # Helper function to get track info def get_track_info(track_id: str): try: response = requests.get(f"{DEEZER_API_URL}/track/{track_id}") if response.status_code != 200: raise HTTPException(status_code=404, detail="Track not found") return response.json() except requests.exceptions.RequestException as e: logger.error(f"Network error fetching track metadata: {e}") raise HTTPException(status_code=500, detail=str(e)) except Exception as e: logger.error(f"Error fetching track metadata: {e}") raise HTTPException(status_code=500, detail=str(e)) # Fetch track metadata from Deezer API @app.get("/track/{track_id}") def get_d_track(track_id: str): return get_track_info(track_id) # Download a track and return a download URL @app.post("/download/track") def download_d_track(request: Request, deezload_request: deezloadRequest): try: if deezload_request.arl is None or deezload_request.arl.strip() == "": ARL = ARL_TOKEN else: ARL = deezload_request.arl logger.info(f'arl: {ARL}') url = deezload_request.url if 'dzr.page' in url or 'deezer.page' in url or 'link.deezer' in url: url = convert_deezer_short_link_async(url) quality = deezload_request.quality dl = DeeLogin(arl=ARL) if quality not in ["MP3_320", "MP3_128", "FLAC"]: raise HTTPException(status_code=400, detail="Invalid quality specified") # 提取 track_id (假设 url 格式为 https://api.deezer.com/track/{track_id}) track_id = url.split("/")[-1] # Fetch track info track_info = get_track_info(track_id) track_link = track_info.get("link") if not track_link: raise HTTPException(status_code=404, detail="Track link not found") # Sanitize filename track_title = track_info.get("title", "track") artist_name = track_info.get("artist", {}).get("name", "unknown") file_extension = "flac" if quality == "FLAC" else "mp3" expected_filename = f"{artist_name} - {track_title}.{file_extension}".replace("/", "_") # Sanitize filename # Clear the downloads directory for root, dirs, files in os.walk("downloads"): for file in files: os.remove(os.path.join(root, file)) for dir in dirs: shutil.rmtree(os.path.join(root, dir)) # Download the track using deezspot try: # 下载文件的代码 dl.download_trackdee( link_track=track_link, output_dir="downloads", quality_download=quality, recursive_quality=False, recursive_download=False ) except Exception as e: logger.error(f"Error downloading file: {e}") raise HTTPException(status_code=500, detail="File download failed") # Recursively search for the file in the downloads directory filepath = None for root, dirs, files in os.walk("downloads"): for file in files: if file.endswith(f'.{file_extension}'): filepath = os.path.join(root, file) break if filepath: break if not filepath: raise HTTPException(status_code=500, detail=f"{file_extension} file not found after download") if filepath: file_size = os.path.getsize(filepath) logger.info(f"Downloaded file size: {file_size} bytes") # Return the download URL relative_path = quote(str(os.path.relpath(filepath, "downloads"))) # Auto-detect base URL from request base_url = str(request.base_url).rstrip('/') download_url = f"{base_url}/downloads/{relative_path}" logger.info(f"Download successful: {download_url}") gc.collect() return {"download_url": download_url} except Exception as e: logger.error(f"Error downloading track: {e}") raise HTTPException(status_code=500, detail=str(e)) # Pydantic model for album request class AlbumRequest(BaseModel): album_id: str # Fetch album data @app.post("/z_album") def fetch_d_album(request: AlbumRequest): album_id = request.album_id try: response = requests.get(f"{DEEZER_API_URL}/album/{album_id}") response.raise_for_status() album_data = response.json() tracks = album_data.get("tracks", {}).get("data", []) result = [] for track in tracks: title = track.get("title") link = track.get("link") if title and link: result.append({ "title": title, "link": link }) return result except requests.exceptions.RequestException as e: logger.error(f"Network error fetching album: {e}") raise HTTPException(status_code=500, detail=str(e)) except Exception as e: logger.error(f"Error fetching album: {e}") raise HTTPException(status_code=500, detail=str(e)) # Pydantic model for playlist request class PlaylistRequest(BaseModel): playlist_id: str # Fetch playlist data @app.post("/z_playlist") def fetch_playlist(request: PlaylistRequest): playlist_id = request.playlist_id # This is correct in app_pre.py try: response = requests.get(f"{DEEZER_API_URL}/playlist/{playlist_id}") response.raise_for_status() playlist_data = response.json() tracks = playlist_data.get("tracks", {}).get("data", []) result = [] for track in tracks: title = track.get("title") link = track.get("link") if title and link: result.append({ "title": title, "link": link }) return result except requests.exceptions.RequestException as e: logger.error(f"Network error fetching playlist: {e}") # Fixed error message raise HTTPException(status_code=500, detail=str(e)) except Exception as e: logger.error(f"Error fetching playlist: {e}") # Fixed error message raise HTTPException(status_code=500, detail=str(e)) # Search tracks using Deezer API @app.get("/z_search") def search_tracks(query: str, limit: Optional[int] = 10): try: response = requests.get(f"{DEEZER_API_URL}/search", params={"q": query, "limit": limit}) return response.json() except requests.exceptions.RequestException as e: logger.error(f"Network error searching tracks: {e}") raise HTTPException(status_code=500, detail=str(e)) except Exception as e: logger.error(f"Error searching tracks: {e}") raise HTTPException(status_code=500, detail=str(e))