| !pip install -q -U watermark |
|
|
| !pip install -qq transformers |
|
|
|
|
| import transformers |
| from transformers import BertModel, BertTokenizer, AdamW, get_linear_schedule_with_warmup |
| import torch |
|
|
| import numpy as np |
| import pandas as pd |
| import seaborn as sns |
| from pylab import rcParams |
| import matplotlib.pyplot as plt |
| from matplotlib import rc |
| from sklearn.model_selection import train_test_split |
| from sklearn.metrics import confusion_matrix, classification_report |
| from collections import defaultdict |
| from textwrap import wrap |
|
|
| from torch import nn, optim |
| from torch.utils.data import Dataset, DataLoader |
| import torch.nn.functional as F |
|
|
|
|
|
|
| sns.set(style='whitegrid', palette='muted', font_scale=1.2) |
|
|
| HAPPY_COLORS_PALETTE = ["#01BEFE", "#FFDD00", "#FF7D00", "#FF006D", "#ADFF02", "#8F00FF"] |
|
|
| sns.set_palette(sns.color_palette(HAPPY_COLORS_PALETTE)) |
|
|
| rcParams['figure.figsize'] = 12, 8 |
|
|
| RANDOM_SEED = 42 |
| np.random.seed(RANDOM_SEED) |
| torch.manual_seed(RANDOM_SEED) |
|
|
| device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
|
|
|
|
| !gdown --id 1S6qMioqPJjyBLpLVz4gmRTnJHnjitnuV |
| !gdown --id 1zdmewp7ayS4js4VtrJEHzAheSW-5NBZv |
|
|
| df = pd.read_csv("reviews.csv") |
|
|
|
|
| sns.countplot(x='score', data = df) |
| plt.xlabel('review score'); |
|
|
| def to_sentiment(rating): |
| rating = int(rating) |
| if rating <= 2: |
| return 0 |
| elif rating == 3: |
| return 1 |
| else: |
| return 2 |
|
|
| df['sentiment'] = df.score.apply(to_sentiment) |
|
|
| class_names = ['negative', 'neutral', 'positive'] |
|
|
| print(df.sentiment) |
|
|
| ax = sns.countplot(x='sentiment', data = df) |
| plt.xlabel('review sentiment') |
| ax.set_xticklabels(class_names); |
|
|
| PRE_TRAINED_MODEL_NAME = 'bert-base-uncased' |
|
|
| tokenizer = BertTokenizer.from_pretrained(PRE_TRAINED_MODEL_NAME) |
|
|
| sample_txt = 'When was I last outside? I am stuck at home for 2 weeks.' |
|
|
| tokens = tokenizer.tokenize(sample_txt) |
| token_ids = tokenizer.convert_tokens_to_ids(tokens) |
|
|
| print(f' Sentence: {sample_txt}') |
| print(f' Tokens: {tokens}') |
| print(f'Token IDs: {token_ids}') |
|
|
| tokenizer.sep_token, tokenizer.sep_token_id |
|
|
| tokenizer.cls_token, tokenizer.cls_token_id |
|
|
| tokenizer.pad_token, tokenizer.pad_token_id |
|
|
| tokenizer.unk_token, tokenizer.unk_token_id |
|
|
| encoding = tokenizer.encode_plus( |
| sample_txt, |
| max_length=32, |
| add_special_tokens=True, |
| return_token_type_ids=False, |
| pad_to_max_length=True, |
| return_attention_mask=True, |
| return_tensors='pt', |
| ) |
|
|
| encoding.keys() |
|
|
| print(len(encoding['input_ids'][0])) |
| encoding['input_ids'][0] |
|
|
| print(len(encoding['attention_mask'][0])) |
| encoding['attention_mask'] |
|
|
| tokenizer.convert_ids_to_tokens(encoding['input_ids'][0]) |
|
|
| token_lens = [] |
|
|
| for txt in df.content: |
| tokens = tokenizer.encode(txt, max_length=512) |
| token_lens.append(len(tokens)) |
|
|
| sns.distplot(token_lens) |
| plt.xlim([0, 256]); |
| plt.xlabel('Token count'); |
|
|
| MAX_LEN = 160 |
|
|
| class GPReviewDataset(Dataset): |
|
|
| def __init__(self, reviews, targets, tokenizer, max_len): |
| self.reviews = reviews |
| self.targets = targets |
| self.tokenizer = tokenizer |
| self.max_len = max_len |
|
|
| def __len__(self): |
| return len(self.reviews) |
|
|
| def __getitem__(self, item): |
| review = str(self.reviews[item]) |
| target = self.targets[item] |
|
|
| encoding = self.tokenizer.encode_plus( |
| review, |
| add_special_tokens=True, |
| max_length=self.max_len, |
| return_token_type_ids=False, |
| pad_to_max_length=True, |
| return_attention_mask=True, |
| return_tensors='pt', |
| ) |
|
|
| return { |
| 'review_text': review, |
| 'input_ids': encoding['input_ids'].flatten(), |
| 'attention_mask': encoding['attention_mask'].flatten(), |
| 'targets': torch.tensor(target, dtype=torch.long) |
| } |
|
|
| df_train, df_test = train_test_split(df, test_size=0.1, random_state=RANDOM_SEED) |
| df_val, df_test = train_test_split(df_test, test_size=0.5, random_state=RANDOM_SEED) |
|
|
| df_train.shape, df_val.shape, df_test.shape |
|
|
| def create_data_loader(df, tokenizer, max_len, batch_size): |
| ds = GPReviewDataset( |
| reviews=df.content.to_numpy(), |
| targets=df.sentiment.to_numpy(), |
| tokenizer=tokenizer, |
| max_len=max_len |
| ) |
|
|
| return DataLoader( |
| ds, |
| batch_size=batch_size, |
| num_workers=4 |
| ) |
|
|
| BATCH_SIZE = 16 |
|
|
| train_data_loader = create_data_loader(df_train, tokenizer, MAX_LEN, BATCH_SIZE) |
| val_data_loader = create_data_loader(df_val, tokenizer, MAX_LEN, BATCH_SIZE) |
| test_data_loader = create_data_loader(df_test, tokenizer, MAX_LEN, BATCH_SIZE) |
|
|
| data = next(iter(train_data_loader)) |
| data.keys() |
|
|
| print(data['input_ids'].shape) |
| print(data['attention_mask'].shape) |
| print(data['targets'].shape) |
|
|
| bert_model = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME) |
|
|
| last_hidden_state, pooled_output = bert_model( |
| input_ids=encoding['input_ids'], |
| attention_mask=encoding['attention_mask'], |
| return_dict = False |
| ) |
|
|
| last_hidden_state.shape |
|
|
| bert_model.config.hidden_size |
|
|
| pooled_output.shape |
|
|
| class SentimentClassifier(nn.Module): |
|
|
| def __init__(self, n_classes): |
| super(SentimentClassifier, self).__init__() |
| self.bert = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME) |
| self.drop = nn.Dropout(p=0.3) |
| self.out = nn.Linear(self.bert.config.hidden_size, n_classes) |
|
|
| def forward(self, input_ids, attention_mask): |
| returned = self.bert( |
| input_ids=input_ids, |
| attention_mask=attention_mask |
| ) |
| pooled_output = returned["pooler_output"] |
| output = self.drop(pooled_output) |
| return self.out(output) |
|
|
| model = SentimentClassifier(len(class_names)) |
| model = model.to(device) |
|
|
| input_ids = data['input_ids'].to(device) |
| attention_mask = data['attention_mask'].to(device) |
|
|
| print(input_ids.shape) |
| print(attention_mask.shape) |
|
|
| F.softmax(model(input_ids, attention_mask), dim=1) |
|
|
|
|
| EPOCHS = 6 |
|
|
| optimizer = AdamW(model.parameters(), lr=2e-5, correct_bias=False) |
| total_steps = len(train_data_loader) * EPOCHS |
|
|
| scheduler = get_linear_schedule_with_warmup( |
| optimizer, |
| num_warmup_steps=0, |
| num_training_steps=total_steps |
| ) |
|
|
| loss_fn = nn.CrossEntropyLoss().to(device) |
|
|
| def train_epoch( |
| model, |
| data_loader, |
| loss_fn, |
| optimizer, |
| device, |
| scheduler, |
| n_examples |
| ): |
| model = model.train() |
|
|
| losses = [] |
| correct_predictions = 0 |
|
|
| for d in data_loader: |
| input_ids = d["input_ids"].to(device) |
| attention_mask = d["attention_mask"].to(device) |
| targets = d["targets"].to(device) |
|
|
| outputs = model( |
| input_ids=input_ids, |
| attention_mask=attention_mask |
| ) |
|
|
| _, preds = torch.max(outputs, dim=1) |
| loss = loss_fn(outputs, targets) |
|
|
| correct_predictions += torch.sum(preds == targets) |
| losses.append(loss.item()) |
|
|
| loss.backward() |
| nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) |
| optimizer.step() |
| scheduler.step() |
| optimizer.zero_grad() |
|
|
| return correct_predictions.double() / n_examples, np.mean(losses) |
|
|
| def eval_model(model, data_loader, loss_fn, device, n_examples): |
| model = model.eval() |
|
|
| losses = [] |
| correct_predictions = 0 |
|
|
| with torch.no_grad(): |
| for d in data_loader: |
| input_ids = d["input_ids"].to(device) |
| attention_mask = d["attention_mask"].to(device) |
| targets = d["targets"].to(device) |
|
|
| outputs = model( |
| input_ids=input_ids, |
| attention_mask=attention_mask |
| ) |
| _, preds = torch.max(outputs, dim=1) |
|
|
| loss = loss_fn(outputs, targets) |
|
|
| correct_predictions += torch.sum(preds == targets) |
| losses.append(loss.item()) |
|
|
| return correct_predictions.double() / n_examples, np.mean(losses) |
|
|
| %%time |
| history = defaultdict(list) |
| best_accuracy = 0 |
| |
| for epoch in range(EPOCHS): |
| |
| print(f'Epoch {epoch + 1}/{EPOCHS}') |
| print('-' * 10) |
|
|
| train_acc, train_loss = train_epoch( |
| model, |
| train_data_loader, |
| loss_fn, |
| optimizer, |
| device, |
| scheduler, |
| len(df_train) |
| ) |
| |
| print(f'Train loss {train_loss} accuracy {train_acc}') |
| |
| val_acc, val_loss = eval_model( |
| model, |
| val_data_loader, |
| loss_fn, |
| device, |
| len(df_val) |
| ) |
| |
| print(f'Val loss {val_loss} accuracy {val_acc}') |
| print() |
| |
| history['train_acc'].append(train_acc) |
| history['train_loss'].append(train_loss) |
| history['val_acc'].append(val_acc) |
| history['val_loss'].append(val_loss) |
|
|
| if val_acc > best_accuracy: |
| torch.save(model.state_dict(), 'best_model_state.bin') |
| best_accuracy = val_acc |
|
|
| print(history['train_acc']) |
|
|
| list_of_train_accuracy= [t.cpu().numpy() for t in history['train_acc']] |
| list_of_train_accuracy |
|
|
| print(history['val_acc']) |
|
|
| list_of_val_accuracy= [t.cpu().numpy() for t in history['val_acc']] |
| list_of_val_accuracy |
|
|
| plt.plot(list_of_train_accuracy, label='train accuracy') |
| plt.plot(list_of_val_accuracy, label='validation accuracy') |
|
|
| plt.title('Training history') |
| plt.ylabel('Accuracy') |
| plt.xlabel('Epoch') |
| plt.legend() |
| plt.ylim([0, 1]); |
|
|
| test_acc, _ = eval_model( |
| model, |
| test_data_loader, |
| loss_fn, |
| device, |
| len(df_test) |
| ) |
|
|
| print(('\n')) |
| print('Test Accuracy : ', test_acc.item()) |
|
|
| def get_predictions(model, data_loader): |
| model = model.eval() |
|
|
| review_texts = [] |
| predictions = [] |
| prediction_probs = [] |
| real_values = [] |
|
|
| with torch.no_grad(): |
| for d in data_loader: |
|
|
| texts = d["review_text"] |
| input_ids = d["input_ids"].to(device) |
| attention_mask = d["attention_mask"].to(device) |
| targets = d["targets"].to(device) |
|
|
| outputs = model( |
| input_ids=input_ids, |
| attention_mask=attention_mask |
| ) |
| _, preds = torch.max(outputs, dim=1) |
|
|
| probs = F.softmax(outputs, dim=1) |
|
|
| review_texts.extend(texts) |
| predictions.extend(preds) |
| prediction_probs.extend(probs) |
| real_values.extend(targets) |
|
|
| predictions = torch.stack(predictions).cpu() |
| prediction_probs = torch.stack(prediction_probs).cpu() |
| real_values = torch.stack(real_values).cpu() |
| return review_texts, predictions, prediction_probs, real_values |
|
|
| y_review_texts, y_pred, y_pred_probs, y_test = get_predictions( |
| model, |
| test_data_loader |
| ) |
|
|
| print(classification_report(y_test, y_pred, target_names=class_names)) |
|
|
| def show_confusion_matrix(confusion_matrix): |
| hmap = sns.heatmap(confusion_matrix, annot=True, fmt="d", cmap="Blues") |
| hmap.yaxis.set_ticklabels(hmap.yaxis.get_ticklabels(), rotation=0, ha='right') |
| hmap.xaxis.set_ticklabels(hmap.xaxis.get_ticklabels(), rotation=30, ha='right') |
| plt.ylabel('True sentiment') |
| plt.xlabel('Predicted sentiment'); |
|
|
| cm = confusion_matrix(y_test, y_pred) |
| df_cm = pd.DataFrame(cm, index=class_names, columns=class_names) |
| show_confusion_matrix(df_cm) |
|
|
| idx = 2 |
|
|
| review_text = y_review_texts[idx] |
| true_sentiment = y_test[idx] |
| pred_df = pd.DataFrame({ |
| 'class_names': class_names, |
| 'values': y_pred_probs[idx] |
| }) |
|
|
| print("\n".join(wrap(review_text))) |
| print() |
| print(f'True sentiment: {class_names[true_sentiment]}') |
|
|
| sns.barplot(x='values', y='class_names', data=pred_df, orient='h') |
| plt.ylabel('sentiment') |
| plt.xlabel('probability') |
| plt.xlim([0, 1]); |
|
|
| review_text = input("Enter a comment for sentiment analysis: ") |
|
|
| encoded_review = tokenizer.encode_plus( |
| review_text, |
| max_length=MAX_LEN, |
| add_special_tokens=True, |
| return_token_type_ids=False, |
| pad_to_max_length=True, |
| return_attention_mask=True, |
| return_tensors='pt', |
| ) |
|
|
| input_ids = encoded_review['input_ids'].to(device) |
| attention_mask = encoded_review['attention_mask'].to(device) |
|
|
| output = model(input_ids, attention_mask) |
| _, prediction = torch.max(output, dim=1) |
|
|
| print(f'Review text: {review_text}') |
| print(f'Sentiment : {class_names[prediction]}') |
|
|
| def suggest_improved_text(review_text, model, tokenizer): |
| |
| sentiment = analyze_sentiment(review_text, model, tokenizer) |
|
|
| |
| if sentiment in ['negative', 'neutral']: |
| |
| encoded_input = tokenizer.encode_plus( |
| review_text, |
| max_length=MAX_LEN, |
| add_special_tokens=True, |
| return_token_type_ids=False, |
| pad_to_max_length=True, |
| return_attention_mask=True, |
| return_tensors='pt' |
| ) |
|
|
| input_ids = encoded_input['input_ids'].to(device) |
| attention_mask = encoded_input['attention_mask'].to(device) |
| outputs = model(input_ids, attention_mask) |
| _, predicted_sentiment = torch.max(outputs, dim=1) |
|
|
| improved_text = generate_improved_text(text, predicted_sentiment) |
|
|
| return improved_text |
|
|
| return review_text |
|
|
| def analyze_sentiment(review_text, model, tokenizer): |
| encoded_input = tokenizer.encode_plus( |
| review_text, |
| max_length=MAX_LEN, |
| add_special_tokens=True, |
| return_token_type_ids=False, |
| pad_to_max_length=True, |
| return_attention_mask=True, |
| return_tensors='pt' |
| ) |
|
|
| input_ids = encoded_input['input_ids'].to(device) |
| attention_mask = encoded_input['attention_mask'].to(device) |
| outputs = model(input_ids, attention_mask) |
| _, predicted_sentiment = torch.max(outputs, dim=1) |
|
|
| return class_names[predicted_sentiment] |
| def generate_improved_text(review_text, predicted_sentiment): |
| positive_words = ["marvellous", "fantastic", "excellent", "admirable", "formidable"] |
|
|
| if predicted_sentiment == 0: |
| improved_text = review_text + " " + " ".join(positive_words) |
| else: |
| improved_text = review_text |
|
|
| return improved_text |
|
|
| from transformers import BertModel |
|
|
| bert_model = BertModel.from_pretrained('bert-base-uncased') |
|
|
| bert_model.save_pretrained('C:/Users/Marie-Ange/Downloads') |
|
|
| from transformers import pipeline |
|
|
|
|
| filename = "Sentiment_analysis_with_bert.py" |
| pipe = pipeline("sentiment-analysis", model="Group209/Sentiment_Analysis") |
| pipe("Sentiment_analysis_with_bert.py") |
|
|
|
|
|
|
|
|