import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from scipy import stats from ast import literal_eval from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer from sklearn.metrics.pairwise import linear_kernel, cosine_similarity from nltk.stem.snowball import SnowballStemmer from nltk.stem.wordnet import WordNetLemmatizer from nltk.corpus import wordnet from surprise import Reader, Dataset, SVD from surprise import NormalPredictor from surprise.model_selection import cross_validate from Levenshtein import distance import warnings; warnings.simplefilter('ignore') class Recommender_Model: def __init__(self): self.cleaned_data = None self.cleaned_data1 = None self.cosine_sim = None self.titles = None self.indices = None self.index_movie_id = None self.SVD = None self.id_map = None self.preprocessing() def preprocessing(self): movie_data = pd.read_csv("Datasets/movies_metadata.csv") self.user_rating = pd.read_csv("Datasets/ratings_small.csv") vote_counts = movie_data[movie_data['vote_count'].notnull()]['vote_count'].astype('int') vote_averages = movie_data[movie_data['vote_average'].notnull()]['vote_average'].astype('int') average_vote_score = vote_averages.mean() percentile_80_cutoff = np.percentile(vote_counts,80) cleand_data1 = movie_data[movie_data['vote_average']>=average_vote_score] cleand_data1 = cleand_data1[cleand_data1['vote_count']>percentile_80_cutoff] movie_data = movie_data.drop([19730, 29503, 35587]) links_small = pd.read_csv('Datasets/links_small.csv') only_subset_movies = list(links_small['tmdbId']) cleand_data1['id'] = cleand_data1['id'].astype('int') self.cleaned_data = cleand_data1[cleand_data1['id'].isin(only_subset_movies)] self.cleaned_data['tagline'] = self.cleaned_data['tagline'].fillna('') ### genres self.cleaned_data['genres'] = self.cleaned_data['genres'].apply(literal_eval) self.cleaned_data['genres'] = self.cleaned_data['genres'].apply(lambda x : [i['name'] for i in x]) stemmer = SnowballStemmer('english') self.cleaned_data['genres'] = self.cleaned_data['genres'].apply(lambda x: [stemmer.stem(i) for i in x]) self.cleaned_data['genres'] = self.cleaned_data['genres'].apply(lambda x: [str.lower(i.replace(" ", "")) for i in x]) self.cleaned_data['genres'] = self.cleaned_data['genres'].apply(lambda x : list(set(x))) # original_language self.cleaned_data['original_language'].unique() credits = pd.read_csv('Datasets/credits.csv') keywords = pd.read_csv('Datasets/keywords.csv') self.cleaned_data = self.cleaned_data.merge(credits, on='id') self.cleaned_data = self.cleaned_data.merge(keywords, on='id') self.cleaned_data['keywords'] = self.cleaned_data['keywords'].apply(literal_eval) self.cleaned_data['keywords'] = self.cleaned_data['keywords'].apply(lambda x : [i['name'] for i in x]) self.cleaned_data['keywords'] = self.cleaned_data['keywords'].apply(lambda x: [stemmer.stem(i) for i in x]) self.cleaned_data['keywords'] = self.cleaned_data['keywords'].apply(lambda x: [str.lower(i.replace(" ", "")) for i in x]) self.cleaned_data['keywords'] = self.cleaned_data['keywords'].apply(lambda x : list(set(x))) self.cleaned_data['cast'] = self.cleaned_data['cast'].apply(literal_eval) self.cleaned_data['crew'] = self.cleaned_data['crew'].apply(literal_eval) self.cleaned_data['top_crew'] = self.cleaned_data['cast'].apply(lambda x : [i['name'] for i in x]) self.cleaned_data['top_crew'] = self.cleaned_data['top_crew'].apply(lambda x : x[:2]) self.cleaned_data['director'] = self.cleaned_data['crew'].apply(get_director) imp_cols = ['tagline', 'genres' ,'original_language' ,'keywords' ,'top_crew','director'] self.cleaned_data1 = self.cleaned_data[imp_cols] self.cleaned_data1['tagline'] = self.cleaned_data1['tagline'].apply(lambda x : [x]) self.cleaned_data1['original_language'] = self.cleaned_data1['original_language'].apply(lambda x : [x]) self.cleaned_data1['director'] = self.cleaned_data1['director'].apply(lambda x : [x]) self.cleaned_data1['combine'] = self.cleaned_data1['genres'] + self.cleaned_data1['original_language'] +\ self.cleaned_data1['keywords'] + self.cleaned_data1['top_crew'] +\ self.cleaned_data1['director'] self.cleaned_data1['combine'] = self.cleaned_data1['combine'].apply(lambda x: ' '.join(x)) count = CountVectorizer(analyzer='word',ngram_range=(1, 2),min_df=0.01, stop_words='english') count_matrix = count.fit_transform(self.cleaned_data1['combine']) self.cosine_sim = cosine_similarity(count_matrix, count_matrix) self.cleaned_data = self.cleaned_data.reset_index() self.titles = self.cleaned_data['title'] self.indices = pd.Series(self.cleaned_data.index, index=self.cleaned_data['title']) self.index_movie_id = self.cleaned_data[['index','id']] reader = Reader() data = Dataset.load_from_df(self.user_rating[['userId', 'movieId', 'rating']], reader) cross_validate(NormalPredictor(), data, cv=4) self.SVD = SVD() trainset = data.build_full_trainset() self.SVD.fit(trainset) self.id_map = pd.read_csv('Datasets/links_small.csv')[['movieId', 'tmdbId']] self.id_map['tmdbId'] = self.id_map['tmdbId'].apply(convert_int) self.id_map.columns = ['movieId', 'id'] self.id_map = self.id_map.merge(self.cleaned_data[['title', 'id']], on='id').set_index('title') self.indices_map = self.id_map.set_index('id') self.user_rating.drop(columns=['timestamp'], inplace=True) def hybrid2(self, userId, title1, title2, title3, number_of_suggestions: int): idx1 = self.indices[title1] idx2 = self.indices[title2] idx3 = self.indices[title3] tmdbId1 = self.id_map.loc[title1]['id'] tmdbId2 = self.id_map.loc[title2]['id'] tmdbId3 = self.id_map.loc[title3]['id'] movie_id1 = self.id_map.loc[title1]['movieId'] movie_id2 = self.id_map.loc[title2]['movieId'] movie_id3 = self.id_map.loc[title3]['movieId'] if type(idx1) == pd.Series: idx1 = idx1.iloc[0] if type(idx2) == pd.Series: idx2 = idx2.iloc[0] if type(idx3) == pd.Series: idx3 = idx3.iloc[0] sim_scores = list(enumerate(self.cosine_sim[int(idx1)] + self.cosine_sim[int(idx2)] + self.cosine_sim[int(idx3)])) sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True) sim_scores = sim_scores[1:56] movie_indices = [i[0] for i in sim_scores] movies = self.cleaned_data.iloc[movie_indices][['title', 'vote_count', 'vote_average', 'id']] movies['est'] = movies['id'].apply(lambda x: self.SVD.predict(userId, self.indices_smoother(x)).est) movies = movies.sort_values('est', ascending=False) return movies.head(number_of_suggestions + 3) def get_similar_users(self, movie_names): movie_ids = [self.cleaned_data.loc[self.cleaned_data['title'] == movie]['id'].iloc[0] for movie in movie_names] new_user_ratings = pd.DataFrame({ 'userId': [max(self.user_rating['userId']) + 1] * len(movie_ids), 'movieId': movie_ids, 'rating': [5.0] * len(movie_ids) }) merged_ratings = pd.concat([self.user_rating, new_user_ratings], ignore_index=True) user_item_matrix = merged_ratings.pivot_table(index='userId', columns='movieId', values='rating', fill_value=0) new_user_vector = user_item_matrix.loc[user_item_matrix.index[-1]].values.reshape(1, -1) user_similarity = cosine_similarity(user_item_matrix.values[:-1], new_user_vector) similar_users_indices = user_similarity.argsort(axis=0)[-10:].flatten()[::-1] similar_users_similarity = user_similarity[similar_users_indices].flatten() similar_users = user_item_matrix.iloc[similar_users_indices] similar_users_df = pd.DataFrame({'userId': similar_users.index, 'Similarity': similar_users_similarity}) return similar_users_df def suggest(self, movies, number_of_suggestions) -> pd.DataFrame: similar_id = self.get_similar_users(movies).iloc[0, 0] return self.hybrid2(similar_id, movies[0], movies[1], movies[2], number_of_suggestions=number_of_suggestions) def get_movie_info(self, movie_name: str) -> dict: movie_info = {} record = self.cleaned_data[self.cleaned_data['title'] == movie_name] movie_info['title'] = record['title'].to_numpy()[0] movie_info['overview'] = record['overview'].to_numpy()[0] movie_info['language'] = get_language_name(record['original_language'].to_numpy()[0]) movie_info['genres'] = record['genres'].to_numpy() return movie_info def indices_smoother(self, ids): if type(self.indices_map.loc[ids]['movieId']) == pd.Series: return self.indices_map.loc[ids]['movieId'].iloc[0] else: return self.indices_map.loc[ids]['movieId'] def find_nearest_movie(self, movie_name: str) -> tuple: lowest_distance = float('inf') closest_movie = '' for movie in self.cleaned_data['title']: current_distance = levenshtein_distance(movie_name.replace(" ", '').lower(), movie.replace(" ", '').lower()) if current_distance < lowest_distance: lowest_distance = current_distance closest_movie = movie return (closest_movie, lowest_distance) def get_director(x): for i in x: if i['job'] == 'Director': return i['name'] return "" def convert_int(x): try: return int(x) except: return np.nan def levenshtein_distance(name1: str, name2: str) -> int: return distance(name1, name2) def get_language_name(code:str) -> str: language_dict = { 'en': 'English', 'fr': 'French', 'es': 'Spanish', 'de': 'German', 'ja': 'Japanese', 'zh-cn': 'Chinese', 'ru': 'Russian', 'pt': 'Portuguese', 'ar': 'Arabic', 'hi': 'Hindi' } if code in language_dict: return language_dict[code] else: return 'Unknown Language'