Spaces:
Paused
Paused
import spotipy | |
from spotipy.exceptions import SpotifyException | |
from urllib.parse import urlparse, parse_qs | |
import gradio as gr | |
from sklearn.metrics.pairwise import cosine_similarity | |
import lyricsgenius | |
import re | |
import requests | |
from bs4 import BeautifulSoup | |
from langchain.schema import HumanMessage, SystemMessage | |
import os | |
from sentence_transformers import SentenceTransformer | |
from fuzzywuzzy import fuzz | |
import random | |
from dotenv import load_dotenv | |
load_dotenv() | |
### ### ### Authentication ### ### ### | |
redirect_uri = "https://huggingface.co/sjw" | |
scope = ['user-library-read', | |
'user-read-playback-state', | |
'user-modify-playback-state', | |
'playlist-modify-public', | |
'user-top-read'] | |
sp_state = gr.State() | |
device_id_state = gr.State() | |
with gr.Blocks() as auth_page: | |
with gr.Row(): | |
client_id = gr.Textbox(label="Spotify Client ID") | |
generate_link = gr.Button("Get Authentication Link") | |
display_link = gr.Markdown() | |
url = gr.Textbox(label="Paste URL") | |
authorize_url = gr.Button("Authorize URL") | |
auth_result = gr.Textbox() | |
def spotify_auth(client_id, url=None): | |
if url: | |
parsed_url = urlparse(url) | |
fragment = parsed_url.fragment | |
access_token = parse_qs(fragment)['access_token'][0] | |
sp = spotipy.Spotify(auth=access_token) | |
sp_state.value = sp | |
device_id = sp_state.value.devices()['devices'][0]['id'] | |
device_id_state.value = device_id | |
#print(sp_state.value, device_id_state.value) | |
# results = sp_state.value.search(q="Passionfruit", type='track') | |
# track_uri = results['tracks']['items'][0]['uri'] | |
# sp_state.value.start_playback(device_id=device_id_state.value, uris=[track_uri]) | |
return access_token | |
else: | |
auth_url = f"https://accounts.spotify.com/authorize?response_type=token&client_id={client_id}&scope={'%20'.join(scope)}&redirect_uri={redirect_uri}" | |
return f"Please authorize the app by clicking [here]({auth_url}) and then paste the URL you are redirected to below." | |
generate_link.click(spotify_auth, inputs=[client_id], outputs=display_link) | |
authorize_url.click(spotify_auth, inputs=[client_id, url], outputs=auth_result) | |
### ### ### Basic Functions ### ### ### | |
def find_track_by_name(track_name): | |
if sp_state.value is None: | |
return f"Spotify client not initialized. Authenticate Spotify first." | |
results = sp_state.value.search(q=track_name, type='track') | |
track_uri = results['tracks']['items'][0]['uri'] | |
return track_uri | |
def play_track_by_name(track_name): | |
if sp_state.value is None or device_id_state.value is None: | |
return f"Spotify client not initialized. Authenticate Spotify first." | |
track_uri = find_track_by_name(track_name) | |
track_name = sp_state.value.track(track_uri)["name"] | |
artist_name = sp_state.value.track(track_uri)['artists'][0]['name'] | |
try: | |
sp_state.value.start_playback(device_id=device_id_state.value, uris=[track_uri]) | |
return f"♫ Now playing {track_name} by {artist_name} ♫" | |
except SpotifyException as e: | |
return f"An error occurred with Spotify: {e}. \n\n**Remember to wake up Spotify.**" | |
except Exception as e: | |
return f"An unexpected error occurred: {e}." | |
def queue_track_by_name(track_name): | |
""" | |
Queues track given its name | |
""" | |
if sp_state.value is None or device_id_state.value is None: | |
return f"Spotify client not initialized. Authenticate Spotify first." | |
track_uri = find_track_by_name(track_name) | |
track_name = sp_state.value.track(track_uri)["name"] | |
sp_state.value.add_to_queue(uri=track_uri, device_id=device_id_state.value) | |
return f"♫ Added {track_name} to your queue ♫" | |
def pause_track(): | |
if sp_state.value is None or device_id_state.value is None: | |
return f"Spotify client not initialized. Authenticate Spotify first." | |
sp_state.value.pause_playback(device_id=device_id_state.value) | |
return "♫ Playback paused ♫" | |
def play_track(): | |
if sp_state.value is None or device_id_state.value is None: | |
return f"Spotify client not initialized. Authenticate Spotify first." | |
sp_state.value.start_playback(device_id=device_id_state.value) | |
return "♫ Playback started ♫" | |
def skip_track(): | |
if sp_state.value is None or device_id_state.value is None: | |
return f"Spotify client not initialized. Authenticate Spotify first." | |
sp_state.value.next_track(device_id=device_id_state.value) | |
return "♫ Skipped to your next track ♫" | |
### ### ### More Elaborate ### ### ### | |
from requests.exceptions import Timeout | |
def play_album_by_name_and_artist(album_name, artist_name): | |
""" | |
Plays an album given its name and artist | |
context_uri (for artist, album, or playlist) expects a string | |
""" | |
if sp_state.value is None or device_id_state.value is None: | |
return f"Spotify client not initialized. Authenticate Spotify first." | |
results = sp_state.value.search(q=f'{album_name} {artist_name}', type='album') | |
album_id = results['albums']['items'][0]['id'] | |
album_info = sp_state.value.album(album_id) | |
album_name = album_info['name'] | |
artist_name = album_info['artists'][0]['name'] | |
try: | |
sp_state.value.start_playback(device_id=device_id_state.value, context_uri=f'spotify:album:{album_id}') | |
return f"♫ Now playing {album_name} by {artist_name} ♫" | |
except spotipy.SpotifyException as e: | |
return f"An error occurred with Spotify: {e}. \n\n**Remember to wake up Spotify.**" | |
except Timeout: | |
return f"An unexpected error occurred: {e}." | |
def play_playlist_by_name(playlist_name): | |
if sp_state.value is None or device_id_state.value is None: | |
return f"Spotify client not initialized. Authenticate Spotify first." | |
playlists = sp_state.value.current_user_playlists() | |
playlist_dict = {playlist['name']: (playlist['id'], playlist['owner']['display_name']) for playlist in playlists['items']} | |
playlist_names = [key for key in playlist_dict.keys()] | |
# defined inside function to play new playlists created in session | |
playlist_name_embeddings = model.encode(playlist_names) | |
user_playlist_embedding = model.encode([playlist_name]) | |
similarity_scores = cosine_similarity(user_playlist_embedding, playlist_name_embeddings) | |
most_similar_index = similarity_scores.argmax() | |
playlist_name = playlist_names[most_similar_index] | |
try: | |
playlist_id, creator_name = playlist_dict[playlist_name] | |
sp_state.value.start_playback(device_id=device_id_state.value, context_uri=f'spotify:playlist:{playlist_id}') | |
return f'♫ Now playing {playlist_name} by {creator_name} ♫' | |
except: | |
return "Unable to find playlist. Please try again." | |
genius_token = os.getenv("GENIUS_ACCESS_TOKEN") | |
def get_track_info(): | |
""" | |
Harvests information for explain_track() using the Genius API and webscraping. | |
""" | |
genius = lyricsgenius.Genius(genius_token) | |
if sp_state.value is None: | |
return f"Spotify client not initialized. Authenticate Spotify first." | |
current_track_item = sp_state.value.current_user_playing_track()['item'] | |
track_name = current_track_item['name'] | |
artist_name = current_track_item['artists'][0]['name'] | |
album_name = current_track_item['album']['name'] | |
release_date = current_track_item['album']['release_date'] | |
basic_info = { | |
'track_name': track_name, | |
'artist_name': artist_name, | |
'album_name': album_name, | |
'release_date': release_date, | |
} | |
# features hinder Genius search | |
track_name = re.split(' \(with | \(feat\. ', track_name)[0] | |
result = genius.search_song(track_name, artist_name) | |
# if no Geniius page exists | |
if result is not None and hasattr(result, 'artist'): | |
genius_artist = result.artist.lower().replace(" ", "") | |
spotify_artist = artist_name.lower().replace(" ", "") | |
print(spotify_artist) | |
print(genius_artist) | |
if spotify_artist not in genius_artist: | |
return basic_info, None, None, None | |
else: | |
genius_artist = None | |
return basic_info, None, None, None | |
# if a Genius page exists | |
lyrics = result.lyrics | |
url = result.url | |
response = requests.get(url) | |
# parsing the webpage & locating About section | |
soup = BeautifulSoup(response.text, 'html.parser') | |
about_section = soup.select_one('div[class^="RichText__Container-oz284w-0"]') | |
# if no About section exists | |
if not about_section: | |
return basic_info, None, lyrics, url | |
# if an About section exists | |
else: | |
about_section = about_section.get_text(separator='\n') | |
return basic_info, about_section, lyrics, url | |
def explain_track(): | |
from final_agent import llm_state | |
""" | |
Presents track information in an organized, compelling manner. | |
""" | |
basic_info, about_section, lyrics, url = get_track_info() | |
print(basic_info, about_section, lyrics, url) | |
if lyrics: # (essentially, if a Genius page exists; lyrics != None) | |
system_message_content = """ | |
Your task is to create an engaging summary for a track using the available details | |
about the track and its lyrics. If there's insufficient or no additional information | |
besides the lyrics, craft the entire summary based solely on the lyrical content." | |
""" | |
human_message_content = f"{about_section}\n\n{lyrics}" | |
messages = [ | |
SystemMessage(content=system_message_content), | |
HumanMessage(content=human_message_content) | |
] | |
ai_response = llm_state.value(messages).content | |
summary = f""" | |
**Name:** <span style="color: red; font-weight: bold; font-style: italic;">{basic_info["track_name"]}</span> | |
**Artist:** {basic_info["artist_name"]} | |
**Album:** {basic_info["album_name"]} | |
**Release:** {basic_info["release_date"]} | |
**About:** | |
{ai_response} | |
<a href='{url}'>Click here for more information on Genius!</a> | |
""" | |
return summary | |
else: # (if no Genius page exists - Lyrics; None) | |
url = "https://genius.com/Genius-how-to-add-songs-to-genius-annotated" | |
summary = f""" | |
**Name:** <span style="color: red; font-weight: bold; font-style: italic;">{basic_info["track_name"]}</span> | |
**Artist:** {basic_info["artist_name"]} | |
**Album:** {basic_info["album_name"]} | |
**Release:** {basic_info["release_date"]} | |
**About:** | |
Unfortunately, this track has not been uploaded to Genius.com | |
<a href='{url}'>Be the first to change that!</a> | |
""" | |
return summary | |
### ### ### Genre + Mood ### ### ### | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" # satisfies warning | |
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') | |
""" | |
* MiniLM models are smaller and faster versions of BERT-like models with competitive performance. | |
* L6 indicates the number of layers in the model - | |
a reduced number of layers compared to larger models like BERT, which typically have 12 or 24 layers. | |
""" | |
mood_list = ["happy", "sad", "energetic", "calm"] | |
mood_embeddings = model.encode(mood_list) | |
def get_user_mood(user_mood): | |
if user_mood.lower() in mood_list: | |
user_mood = user_mood.lower() | |
return user_mood | |
else: | |
user_mood_embedding = model.encode([user_mood.lower()]) | |
similarity_scores = cosine_similarity(user_mood_embedding, mood_embeddings) | |
most_similar_index = similarity_scores.argmax() | |
user_mood = mood_list[most_similar_index] | |
return user_mood | |
if sp_state.value is None or device_id_state.value is None: | |
None | |
else: | |
genre_list = sp_state.value.recommendation_genre_seeds()["genres"] | |
genre_embeddings = model.encode(genre_list) | |
def get_genre_by_name(genre_name): | |
""" | |
Matches genre_name input to closest existing genre in genre_list; | |
recommendations() only accepts from list | |
""" | |
if genre_name.lower() in genre_list: | |
genre_name = genre_name.lower() | |
return genre_name | |
else: | |
genre_name_embedding = model.encode([genre_name.lower()]) | |
similarity_scores = cosine_similarity(genre_name_embedding, genre_embeddings) | |
most_similar_index = similarity_scores.argmax() | |
genre_name = genre_list[most_similar_index] | |
return genre_name | |
MOOD_SETTINGS = { | |
"happy": {"max_instrumentalness": 0.001, "min_valence": 0.6}, | |
"sad": {"max_danceability": 0.65, "max_valence": 0.4}, | |
"energetic": {"min_tempo": 120, "min_danceability": 0.75}, | |
"calm": {"max_energy": 0.65, "max_tempo": 130} | |
} | |
similarity_threshold = 75 # found after testing; out of 100 | |
def is_genre_match(genre1, genre2, threshold=similarity_threshold): | |
""" | |
if you have a short string and a longer one, | |
fuzz.partial_token_set_ratio() will score the similarity | |
of the shorter string to every possible sub-string | |
(of the same length as the shorter string) in the longer string. | |
The highest similarity score is then returned. | |
""" | |
score = fuzz.token_set_ratio(genre1, genre2) | |
print(score) # debugging | |
return score >= threshold | |
def create_track_list_str(track_uris): | |
""" | |
Helper function; creates list of track names and corresponding artists | |
""" | |
if sp_state.value is None: | |
return f"Spotify client not initialized. Authenticate Spotify first." | |
track_details = sp_state.value.tracks(track_uris) | |
track_names_with_artists = [f"{track['name']} by {track['artists'][0]['name']}" for track in track_details['tracks']] | |
track_list_str = "<br>".join(track_names_with_artists) | |
return track_list_str | |
NUM_ARTISTS = 20 # max 50 | |
TIME_RANGE = "medium_term" | |
NUM_TRACKS = 10 | |
MAX_ARTISTS = 4 | |
def play_genre_by_name_and_mood(genre_name, user_mood): | |
""" | |
1. Retrieves user's genre request and their current mood | |
2. Matches genre and mood to existing options | |
3. Gets 4 of user's top artists that align with genre | |
4. Conducts personalized recommendations() search | |
""" | |
if sp_state.value is None or device_id_state.value is None: | |
return f"Spotify client not initialized. Authenticate Spotify first." | |
genre_name = get_genre_by_name(genre_name) | |
user_mood = get_user_mood(user_mood).lower() | |
#mood_setting = mood_settings[user_mood] | |
print(genre_name) | |
print(user_mood) | |
#print(mood_setting) | |
# increased personalization | |
user_top_artists = sp_state.value.current_user_top_artists(limit=NUM_ARTISTS, time_range=TIME_RANGE) | |
matching_artists_ids = [] | |
### READABLE ### | |
for artist in user_top_artists['items']: | |
#print(artist['genres']) | |
for artist_genre in artist['genres']: | |
if is_genre_match(genre_name, artist_genre): | |
matching_artists_ids.append(artist['id']) | |
break # don't waste time cycling artist genres after match | |
if len(matching_artists_ids) == MAX_ARTISTS: # limit to pass to recommendations() | |
break | |
### EFFICIENT ### [WORSE VERSION] ~ INCLUDES ARTIST MULTIPLE TIMES PER MATCH | |
# matching_artists_ids = list(islice((artist['id'] for artist in user_top_artists['items'] | |
# for artist_genre in artist['genres'] | |
# if is_genre_match(genre_name, artist_genre)), MAX_ARTISTS)) | |
if not matching_artists_ids: | |
matching_artists_ids = None | |
else: | |
artist_names = [artist['name'] for artist in sp_state.value.artists(matching_artists_ids)['artists']] | |
print(artist_names) | |
print(matching_artists_ids) | |
recommendations = sp_state.value.recommendations( # can pass maximum {genre + artists} = 5 seeds | |
seed_artists=matching_artists_ids, | |
seed_genres=[genre_name], | |
seed_tracks=None, | |
limit=NUM_TRACKS, # number of tracks to return | |
country=None, | |
**MOOD_SETTINGS[user_mood]) | |
track_uris = [track['uri'] for track in recommendations['tracks']] | |
track_list_str = create_track_list_str(track_uris) | |
sp_state.value.start_playback(device_id=device_id_state.value, uris=track_uris) | |
return f""" | |
**♫ Now Playing:** <span style="color: red; font-weight: bold; font-style: italic;">{genre_name}</span> ♫ | |
**Selected Tracks:** | |
{track_list_str} | |
""" | |
### ### ### Artist + Mood ### ### ### | |
NUM_ALBUMS = 20 # max 20 | |
MAX_TRACKS = 10 | |
def play_artist_by_name_and_mood(artist_name, user_mood): | |
if sp_state.value is None or device_id_state.value is None: | |
return f"Spotify client not initialized. Authenticate Spotify first." | |
user_mood = get_user_mood(user_mood).lower() | |
print(user_mood) | |
# retrieving and shuffling all artist's tracks | |
first_name = artist_name.split(',')[0].strip() | |
results = sp_state.value.search(q=first_name, type='artist') | |
artist_id = results['artists']['items'][0]['id'] | |
# most recent albums retrieved first | |
artist_albums = sp_state.value.artist_albums(artist_id, album_type='album', limit=NUM_ALBUMS) | |
artist_tracks = [] | |
for album in artist_albums['items']: | |
album_tracks = sp_state.value.album_tracks(album['id'])['items'] | |
artist_tracks.extend(album_tracks) | |
random.shuffle(artist_tracks) | |
# filtering until we find enough tracks that match user's mood | |
selected_tracks = [] | |
for track in artist_tracks: | |
if len(selected_tracks) == MAX_TRACKS: # number of tracks to queue | |
break | |
features = sp_state.value.audio_features([track['id']])[0] | |
mood_criteria = MOOD_SETTINGS[user_mood] | |
match = True | |
for criteria, threshold in mood_criteria.items(): | |
if "min_" in criteria and features[criteria[4:]] < threshold: | |
match = False | |
break | |
elif "max_" in criteria and features[criteria[4:]] > threshold: | |
match = False | |
break | |
if match: | |
print(f"{features}\n{mood_criteria}\n\n") # checking our work | |
selected_tracks.append(track) | |
track_names = [track['name'] for track in selected_tracks] | |
track_list_str = "<br>".join(track_names) # Using HTML line breaks for each track name | |
print(track_list_str) | |
track_uris = [track['uri'] for track in selected_tracks] | |
sp_state.value.start_playback(device_id=device_id_state.value, uris=track_uris) | |
return f""" | |
**♫ Now Playing:** <span style="color: red; font-weight: bold; font-style: italic;">{artist_name}</span> ♫ | |
**Selected Tracks:** | |
{track_list_str} | |
""" | |
### ### ### Recommendations ### ### ### | |
NUM_TRACKS = 10 | |
def recommend_tracks(genre_name=None, artist_name=None, track_name=None, user_mood=None): | |
""" | |
1. Retrieves user's preferences based on artist_name, track_name, genre_name, and user_mood. | |
2. Uses these parameters to conduct personalized recommendations() search. | |
""" | |
if sp_state.value is None or device_id_state.value is None: | |
return f"Spotify client not initialized. Authenticate Spotify first." | |
user_mood = get_user_mood(user_mood).lower() if user_mood else None | |
print(user_mood) | |
seed_genre, seed_artist, seed_track = None, None, None | |
if genre_name: | |
first_name = genre_name.split(',')[0].strip() | |
genre_name = get_genre_by_name(first_name) | |
seed_genre = [genre_name] | |
print(seed_genre) | |
if artist_name: | |
first_name = artist_name.split(',')[0].strip() # if user provides multiple artists, use the first | |
results = sp_state.value.search(q='artist:' + first_name, type='artist') | |
seed_artist = [results['artists']['items'][0]['id']] | |
if track_name: | |
first_name = track_name.split(',')[0].strip() | |
results = sp_state.value.search(q='track:' + first_name, type='track') | |
seed_track = [results['tracks']['items'][0]['id']] | |
if seed_genre is None and seed_artist is None and seed_track is None: | |
raise ValueError("At least one genre, artist, or track must be provided.") | |
recommendations = sp_state.value.recommendations( # requires at least one seed value | |
seed_artists=seed_artist, | |
seed_genres=seed_genre, | |
seed_tracks=seed_track, | |
limit=NUM_TRACKS, | |
country=None, | |
**MOOD_SETTINGS[user_mood] if user_mood else {}) | |
track_uris = [track['uri'] for track in recommendations['tracks']] | |
return track_uris | |
def play_recommended_tracks(genre_name=None, artist_name=None, track_name=None, user_mood=None): | |
if sp_state.value is None or device_id_state.value is None: | |
return f"Spotify client not initialized. Authenticate Spotify first." | |
try: | |
track_uris = recommend_tracks(genre_name, artist_name, track_name, user_mood) | |
track_list_str = create_track_list_str(track_uris) | |
sp_state.value.start_playback(device_id=device_id_state.value, uris=track_uris) | |
return f""" | |
**♫ Now Playing Recommendations Based On:** <span style="color: red; font-weight: bold; font-style: italic;">{', '.join(filter(None, [genre_name, artist_name, track_name, "Your Mood"]))}</span> ♫ | |
**Selected Tracks:** | |
{track_list_str} | |
""" | |
except ValueError as e: | |
return str(e) # Return the error message, or handle it as needed. | |
def create_playlist_from_recommendations(genre_name=None, artist_name=None, track_name=None, user_mood=None): | |
if sp_state.value is None or device_id_state.value is None: | |
return f"Spotify client not initialized. Authenticate Spotify first." | |
user_id = sp_state.value.current_user()['id'] | |
user_name = sp_state.value.current_user()["display_name"] | |
playlists = sp_state.value.current_user_playlists() | |
playlist_names = [playlist['name'] for playlist in playlists["items"]] | |
themes = ["Epic", "Hypnotic", "Dreamy", "Legendary", "Majestic", | |
"Enchanting", "Ethereal", "Super Lit", "Harmonious", "Heroic"] | |
chosen_theme = random.choice(themes) | |
playlist_name = f"{user_name}'s {chosen_theme} Playlist" | |
while playlist_name in playlist_names: | |
chosen_theme = random.choice(themes) | |
playlist_name = f"{user_name}'s {chosen_theme} Playlist" | |
playlist_description=f"Apollo AI's personalized playlist for {user_name}. Get yours here: (add link)." | |
new_playlist = sp_state.value.user_playlist_create(user=user_id, name=playlist_name, public=True, collaborative=False, description=playlist_description) | |
track_uris = recommend_tracks(genre_name, artist_name, track_name, user_mood) | |
track_list_str = create_track_list_str(track_uris) | |
sp_state.value.user_playlist_add_tracks(user=user_id, playlist_id=new_playlist['id'], tracks=track_uris, position=None) | |
playlist_url = f"https://open.spotify.com/playlist/{new_playlist['id']}" | |
return f""" | |
♫ Created *{playlist_name}* Based On: <span style='color: red; font-weight: bold; font-style: italic;'>{', '.join(filter(None, [genre_name, artist_name, track_name, 'Your Mood']))}</span> ♫ | |
**Selected Tracks:** | |
{track_list_str} | |
<a href='{playlist_url}'>Click here to listen to the playlist on Spotify!</a> | |
""" | |