|
import base64 |
|
import logging |
|
from typing import Optional, Dict, Any |
|
from fastapi import FastAPI, HTTPException, Request |
|
import requests |
|
from bs4 import BeautifulSoup |
|
import os |
|
from datetime import datetime, timedelta |
|
import time |
|
from requests.adapters import HTTPAdapter |
|
from urllib3.util.retry import Retry |
|
import asyncio |
|
from typing import Optional, Dict, Tuple |
|
import urllib.parse |
|
from fastapi.responses import JSONResponse |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.StreamHandler(), |
|
logging.FileHandler('spotify_api.log') |
|
] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
from pydantic import BaseModel, HttpUrl |
|
|
|
class TrackDlRequest(BaseModel): |
|
spotify_url: str |
|
album_cover_url: Optional[str] = None |
|
|
|
app = FastAPI(title="Spotify Track API", |
|
description="API for retrieving Spotify track information and download URLs") |
|
|
|
|
|
SPOTIFY_API_URL = "https://api.spotify.com/v1" |
|
SPOTIFY_CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID") |
|
SPOTIFY_CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET") |
|
TOKEN_EXPIRY = 3500 |
|
|
|
|
|
class TokenCache: |
|
def __init__(self): |
|
self.token: Optional[str] = None |
|
self.expiry_time: Optional[datetime] = None |
|
|
|
def set_token(self, token: str): |
|
self.token = token |
|
self.expiry_time = datetime.now() + timedelta(seconds=TOKEN_EXPIRY) |
|
|
|
def get_token(self) -> Optional[str]: |
|
if not self.token or not self.expiry_time or datetime.now() >= self.expiry_time: |
|
return None |
|
return self.token |
|
|
|
def is_expired(self) -> bool: |
|
return not self.token or not self.expiry_time or datetime.now() >= self.expiry_time |
|
|
|
|
|
token_cache = TokenCache() |
|
|
|
|
|
class SpotifyAPIError(Exception): |
|
pass |
|
|
|
|
|
def get_spotify_token() -> str: |
|
""" |
|
Get Spotify access token with expiration handling. |
|
Returns a valid token, either from cache or by requesting a new one. |
|
""" |
|
try: |
|
|
|
cached_token = token_cache.get_token() |
|
if cached_token: |
|
logger.info("Using cached Spotify token") |
|
return cached_token |
|
|
|
logger.info("Requesting new Spotify access token") |
|
start_time = time.time() |
|
|
|
if not SPOTIFY_CLIENT_ID or not SPOTIFY_CLIENT_SECRET: |
|
raise SpotifyAPIError("Spotify credentials not configured") |
|
|
|
auth_string = f"{SPOTIFY_CLIENT_ID}:{SPOTIFY_CLIENT_SECRET}" |
|
auth_bytes = base64.b64encode(auth_string.encode()).decode() |
|
|
|
auth_response = requests.post( |
|
'https://accounts.spotify.com/api/token', |
|
data={'grant_type': 'client_credentials'}, |
|
headers={'Authorization': f'Basic {auth_bytes}'}, |
|
timeout=10 |
|
) |
|
|
|
if auth_response.status_code != 200: |
|
raise SpotifyAPIError(f"Failed to get token: {auth_response.text}") |
|
|
|
new_token = auth_response.json()['access_token'] |
|
token_cache.set_token(new_token) |
|
|
|
logger.info(f"New token obtained successfully in {time.time() - start_time:.2f}s") |
|
return new_token |
|
|
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Network error during token request: {str(e)}") |
|
raise HTTPException(status_code=503, detail="Spotify authentication service unavailable") |
|
except Exception as e: |
|
logger.error(f"Unexpected error during token request: {str(e)}") |
|
raise HTTPException(status_code=500, detail="Internal server error") |
|
|
|
def extract_album_id(album_url: str) -> str: |
|
"""Extract album ID from Spotify URL.""" |
|
try: |
|
return album_url.split("/")[-1].split("?")[0] |
|
except Exception as e: |
|
logger.error(f"Failed to extract album ID from URL {album_url}: {str(e)}") |
|
raise HTTPException(status_code=400, detail="Invalid Spotify album URL format") |
|
|
|
|
|
@app.post("/album") |
|
async def get_album_data(request: Request): |
|
try: |
|
|
|
data = await request.json() |
|
album_url = data.get('album_url') |
|
if not album_url: |
|
raise HTTPException(status_code=400, detail="Missing 'album_url' in JSON data") |
|
|
|
|
|
album_id = extract_album_id(album_url) |
|
|
|
|
|
access_token = get_spotify_token() |
|
|
|
|
|
headers = { |
|
'Authorization': f'Bearer {access_token}' |
|
} |
|
album_api_url = f"{SPOTIFY_API_URL}/albums/{album_id}" |
|
response = requests.get(album_api_url, headers=headers, timeout=10) |
|
|
|
if response.status_code != 200: |
|
raise SpotifyAPIError(f"Failed to get album data: {response.text}") |
|
|
|
album_data = response.json() |
|
return album_data |
|
|
|
except SpotifyAPIError as e: |
|
logger.error(f"Spotify API error: {str(e)}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
except Exception as e: |
|
logger.error(f"Unexpected error: {str(e)}") |
|
raise HTTPException(status_code=500, detail="Internal server error") |
|
|
|
|
|
def extract_playlist_id(playlist_url: str) -> str: |
|
"""Extract playlist ID from Spotify URL.""" |
|
try: |
|
return playlist_url.split("/")[-1].split("?")[0] |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to extract playlist ID from URL {playlist_url}: {str(e)}") |
|
raise HTTPException(status_code=400, detail="Invalid Spotify playlist URL format") |
|
|
|
|
|
@app.post("/playlist") |
|
async def get_playlist_data(request: Request): |
|
try: |
|
|
|
data = await request.json() |
|
playlist_url = data.get('playlist_url') |
|
if not playlist_url: |
|
raise HTTPException(status_code=400, detail="Missing 'playlist_url' in JSON data") |
|
|
|
|
|
playlist_id = extract_playlist_id(playlist_url) |
|
logger.info(f"Extracted playlist ID: {playlist_id}") |
|
|
|
|
|
access_token = get_spotify_token() |
|
|
|
|
|
headers = { |
|
'Authorization': f'Bearer {access_token}' |
|
} |
|
playlist_api_url = f"{SPOTIFY_API_URL}/playlists/{playlist_id}/tracks" |
|
response = requests.get(playlist_api_url, headers=headers, timeout=10) |
|
|
|
if response.status_code != 200: |
|
raise SpotifyAPIError(f"Failed to get playlist data: {response.text}") |
|
|
|
playlist_data = response.json() |
|
return playlist_data |
|
|
|
except SpotifyAPIError as e: |
|
logger.error(f"Spotify API error: {str(e)}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
except Exception as e: |
|
logger.error(f"Unexpected error: {str(e)}") |
|
raise HTTPException(status_code=500, detail="Internal server error") |
|
|
|
|
|
def extract_track_id(track_url: str) -> str: |
|
"""Extract track ID from Spotify URL.""" |
|
try: |
|
return track_url.split("/")[-1].split("?")[0] |
|
except Exception as e: |
|
logger.error(f"Failed to extract track ID from URL {track_url}: {str(e)}") |
|
raise HTTPException(status_code=400, detail="Invalid Spotify URL format") |
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_spowload_session_and_token() -> Optional[Tuple[requests.Session, str]]: |
|
""" |
|
Creates a requests session, fetches the spowload.com homepage, |
|
extracts the CSRF token, and returns both the session and the token. |
|
|
|
Returns: |
|
A tuple containing the (requests.Session, csrf_token_string) if successful, |
|
otherwise None. |
|
""" |
|
spowload_url = "https://spowload.com" |
|
headers = { |
|
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', |
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', |
|
'Accept-Language': 'en-US,en;q=0.9', |
|
'Connection': 'keep-alive', |
|
} |
|
|
|
session = requests.Session() |
|
session.headers.update(headers) |
|
|
|
try: |
|
logger.info(f"Attempting to fetch CSRF token and session cookies from {spowload_url}") |
|
|
|
response = session.get(spowload_url, timeout=15) |
|
response.raise_for_status() |
|
|
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
|
|
meta_tag = soup.find('meta', attrs={'name': 'csrf-token'}) |
|
|
|
if meta_tag and 'content' in meta_tag.attrs: |
|
csrf_token = meta_tag['content'] |
|
logger.info(f"Successfully extracted CSRF token and established session.") |
|
|
|
return session, csrf_token |
|
else: |
|
logger.warning(f"Could not find meta tag with name='csrf-token' or 'content' attribute at {spowload_url}") |
|
return None |
|
|
|
except requests.exceptions.Timeout: |
|
logger.error(f"Request timed out while trying to reach {spowload_url}") |
|
return None |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Error fetching {spowload_url}: {str(e)}") |
|
return None |
|
except Exception as e: |
|
|
|
logger.exception(f"An unexpected error occurred while getting CSRF token/session: {str(e)}") |
|
return None |
|
|
|
|
|
|
|
@app.post("/track", response_model=Dict) |
|
async def get_track_data(request: Request): |
|
""" |
|
Retrieves specific track information from Spotify based on a track URL. |
|
|
|
Expects JSON body: {"track_url": "spotify-track-url"} |
|
|
|
Returns: |
|
JSON response with track details: |
|
{ |
|
"id": str, |
|
"title": str, |
|
"artists": list[str], |
|
"album_cover_url": str | None, |
|
"duration_ms": int, |
|
"spotify_url": str |
|
} |
|
or an HTTP error response. |
|
""" |
|
try: |
|
|
|
try: |
|
data = await request.json() |
|
track_url = data.get('track_url') |
|
except Exception: |
|
logger.error("Failed to parse request JSON body.") |
|
raise HTTPException(status_code=400, detail="Invalid JSON body.") |
|
|
|
if not track_url: |
|
logger.warning("Request received without 'track_url'.") |
|
raise HTTPException(status_code=400, detail="Missing 'track_url' in JSON data.") |
|
|
|
|
|
track_id = extract_track_id(track_url) |
|
if not track_id: |
|
logger.warning(f"Failed to extract track ID from URL: {track_url}") |
|
raise HTTPException(status_code=400, detail="Invalid Spotify track URL format or unable to extract ID.") |
|
|
|
logger.info(f"Processing request for track ID: {track_id}") |
|
|
|
|
|
try: |
|
access_token = get_spotify_token() |
|
except HTTPException as he: |
|
raise he |
|
except Exception as e: |
|
logger.error(f"Unexpected error getting Spotify token: {str(e)}") |
|
raise HTTPException(status_code=500, detail="Internal error obtaining Spotify access token.") |
|
|
|
|
|
|
|
headers = { |
|
'Authorization': f'Bearer {access_token}' |
|
} |
|
|
|
track_api_url = f"{SPOTIFY_API_URL}/tracks/{track_id}" |
|
logger.info(f"Requesting track data from Spotify API: {track_api_url}") |
|
|
|
try: |
|
response = requests.get(track_api_url, headers=headers, timeout=15) |
|
|
|
|
|
if response.status_code == 401: |
|
logger.warning("Spotify token likely expired or invalid (received 401). Requesting new one and retrying.") |
|
try: |
|
access_token = get_spotify_token() |
|
except HTTPException as he: |
|
raise he |
|
except Exception as e: |
|
logger.error(f"Unexpected error getting fresh Spotify token during retry: {str(e)}") |
|
raise HTTPException(status_code=500, detail="Internal error obtaining fresh Spotify access token.") |
|
|
|
headers['Authorization'] = f'Bearer {access_token}' |
|
response = requests.get(track_api_url, headers=headers, timeout=15) |
|
|
|
|
|
if response.status_code != 200: |
|
error_detail = f"Spotify API request failed. Status: {response.status_code}, URL: {track_api_url}, Response: {response.text[:200]}..." |
|
logger.error(error_detail) |
|
|
|
if response.status_code == 400: |
|
raise HTTPException(status_code=400, detail=f"Bad request to Spotify API (check track ID format?).") |
|
elif response.status_code == 404: |
|
raise HTTPException(status_code=404, detail=f"Track ID '{track_id}' not found on Spotify.") |
|
else: |
|
|
|
raise HTTPException(status_code=502, detail=f"Failed to retrieve data from Spotify (Status: {response.status_code}).") |
|
|
|
|
|
track_data = response.json() |
|
|
|
|
|
artists = [artist.get("name") for artist in track_data.get("artists", []) if artist.get("name")] |
|
album_images = track_data.get("album", {}).get("images", []) |
|
cover_url = None |
|
if len(album_images) > 1: |
|
cover_url = album_images[1].get("url") |
|
elif len(album_images) > 0: |
|
cover_url = album_images[0].get("url") |
|
|
|
track_info = { |
|
"id": track_data.get("id"), |
|
"title": track_data.get("name"), |
|
"artists": artists, |
|
"album_cover_url": cover_url, |
|
"duration_ms": track_data.get("duration_ms"), |
|
"spotify_url": track_data.get("external_urls", {}).get("spotify") |
|
|
|
} |
|
|
|
logger.info(f"Successfully retrieved data for track ID: {track_id}") |
|
return JSONResponse(content=track_info) |
|
|
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Network error contacting Spotify API at {track_api_url}: {str(e)}") |
|
raise HTTPException(status_code=504, detail=f"Gateway timeout or network error contacting Spotify.") |
|
except Exception as e: |
|
logger.exception(f"Error processing Spotify response or formatting data for track {track_id}: {str(e)}") |
|
raise HTTPException(status_code=500, detail="Internal server error processing track data.") |
|
|
|
|
|
|
|
except HTTPException as e: |
|
|
|
raise e |
|
except Exception as e: |
|
|
|
logger.exception(f"An unexpected critical error occurred in /track endpoint: {str(e)}") |
|
raise HTTPException(status_code=500, detail="An unexpected internal server error occurred.") |
|
|
|
|
|
|
|
@app.post("/track_dl", response_model=Dict[str, Any]) |
|
async def get_track_download_info(payload: TrackDlRequest): |
|
""" |
|
Attempts to get download information for a Spotify track via spowload.com, |
|
using a persistent session for CSRF handling. |
|
|
|
Expects JSON body: {"spotify_url": "...", "album_cover_url": "..."} |
|
|
|
Returns: |
|
The JSON response from spowload.com/convert if successful, |
|
otherwise an HTTP error response. |
|
""" |
|
logger.info(f"Received request for /track_dl for URL: {payload.spotify_url}") |
|
|
|
|
|
session_data = get_spowload_session_and_token() |
|
if not session_data: |
|
logger.error("Failed to retrieve session and CSRF token from spowload.com.") |
|
raise HTTPException(status_code=503, detail="Could not get necessary session/token from the download service.") |
|
|
|
|
|
spowload_session, csrf_token = session_data |
|
|
|
|
|
convert_url = "https://spowload.com/convert" |
|
|
|
|
|
headers = { |
|
'Content-Type': 'application/json', |
|
'X-CSRF-Token': csrf_token, |
|
'Accept': 'application/json, text/plain, */*', |
|
'Referer': 'https://spowload.com/', |
|
'Origin': 'https://spowload.com', |
|
} |
|
|
|
body = { |
|
"urls": payload.spotify_url, |
|
"cover": payload.album_cover_url |
|
} |
|
|
|
logger.info(f"Sending request to {convert_url} for Spotify URL: {payload.spotify_url} using established session.") |
|
|
|
|
|
try: |
|
|
|
|
|
response = spowload_session.post(convert_url, headers=headers, json=body, timeout=30) |
|
response.raise_for_status() |
|
|
|
|
|
try: |
|
result = response.json() |
|
logger.info(f"Successfully received response from {convert_url}.") |
|
|
|
if isinstance(result, dict) and result.get("success"): |
|
logger.info("Spowload response indicates success.") |
|
elif isinstance(result, dict): |
|
logger.warning(f"Spowload response received but may indicate failure: {result}") |
|
return JSONResponse(content=result) |
|
except json.JSONDecodeError: |
|
logger.error(f"Failed to decode JSON response from {convert_url}. Response text: {response.text[:200]}...") |
|
raise HTTPException(status_code=502, detail="Received invalid response format from the download service.") |
|
|
|
except requests.exceptions.Timeout: |
|
logger.error(f"Request timed out while contacting {convert_url}") |
|
raise HTTPException(status_code=504, detail="Download service timed out.") |
|
except requests.exceptions.HTTPError as e: |
|
|
|
|
|
if e.response.status_code == 419: |
|
logger.error(f"Received 419 Page Expired error from {convert_url}. CSRF token or session likely invalid despite using session.") |
|
raise HTTPException(status_code=419, detail="Download service reported 'Page Expired' (CSRF/Session issue).") |
|
logger.error(f"HTTP error {e.response.status_code} received from {convert_url}. Response: {e.response.text[:200]}...") |
|
|
|
if e.response.status_code == 429: |
|
raise HTTPException(status_code=429, detail="Rate limited by the download service. Try again later.") |
|
elif e.response.status_code == 403: |
|
raise HTTPException(status_code=403, detail="Request forbidden by the download service.") |
|
else: |
|
raise HTTPException(status_code=502, detail=f"Download service returned an error (Status: {e.response.status_code}).") |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Network error contacting {convert_url}: {str(e)}") |
|
raise HTTPException(status_code=502, detail="Network error communicating with the download service.") |
|
except Exception as e: |
|
logger.exception(f"An unexpected error occurred during /track_dl processing: {str(e)}") |
|
raise HTTPException(status_code=500, detail="An unexpected internal server error occurred.") |
|
finally: |
|
|
|
if 'spowload_session' in locals(): |
|
spowload_session.close() |
|
|
|
|
|
|
|
@app.get("/") |
|
async def health_check(): |
|
"""Health check endpoint.""" |
|
try: |
|
|
|
token = get_spotify_token() |
|
return { |
|
"status": "healthy", |
|
"spotify_auth": "ok", |
|
"token_expires_in": token_cache.expiry_time.timestamp() - datetime.now().timestamp() if token_cache.expiry_time else None |
|
} |
|
except Exception as e: |
|
logger.error(f"Health check failed: {str(e)}") |
|
return {"status": "unhealthy", "error": str(e)} |
|
|