In [None]:
!pip install keras==2.15.0 tensorflow==2.15.0

In [None]:
import os
os.environ['KERAS_BACKEND'] = 'tensorflow'
import pathlib
import re

In [None]:
import tensorflow as tf
import keras
import numpy as np
from keras import layers

In [None]:
print(keras.__version__)
print(tf.__version__)
print(keras.backend.backend())

2.15.0
2.15.0
tensorflow


In [None]:
# Path to the images
IMAGES_PATH = "Flicker8k_Dataset"

# Desired image dimensions
IMAGE_SIZE = (299, 299)

# Vocabulary size
VOCAB_SIZE = 10000

# Fixed length allowed for any sequence
SEQ_LENGTH = 25

# Dimension for the image embeddings and token embeddings
EMBED_DIM = 512

# Per-layer units in the feed-forward network
FF_DIM = 512

# Other training parameters
BATCH_SIZE = 64
EPOCHS = 30
AUTOTUNE = tf.data.AUTOTUNE

In [None]:
path = pathlib.Path(".")
keras.utils.get_file(
 origin='https://github.com/jbrownlee/Datasets/releases/download/Flickr8k/Flickr8k_Dataset.zip',
 cache_dir='.',
 cache_subdir=path,
 extract=True)
keras.utils.get_file(
 origin='https://github.com/jbrownlee/Datasets/releases/download/Flickr8k/Flickr8k_text.zip',
 cache_dir='.',
 cache_subdir=path,
 extract=True)

In [None]:
dataset = pathlib.Path(path, "Flickr8k.token.txt").read_text(encoding='utf-8').splitlines()

In [None]:
dataset

In [None]:
dataset = [line.split('\t') for line in dataset]

In [None]:
dataset

In [None]:
dataset = [[os.path.join(IMAGES_PATH,fname.split('#')[0].strip()), caption] for (fname, caption) in dataset]

In [None]:
dataset

In [None]:
for i in dataset:
 print(i)
 break

In [None]:
caption_mapping = {}
text_data = []
X_en_data = []
X_de_data = []
Y_data = []

In [None]:
for img_name, caption in dataset:
 if img_name.endswith("jpg"):
 X_de_data.append(" " + caption.strip().replace(".", ""))
 Y_data.append(caption.strip().replace(".", "") + " ")
 text_data.append(" " + caption.strip().replace(".", "") + " ")
 X_en_data.append(img_name)


 if img_name in caption_mapping:
 caption_mapping[img_name].append(caption)
 else:
 caption_mapping[img_name] = [caption]

In [None]:
for i in X_de_data:
 if len(i) <= 2:
 print("Y")

In [None]:
print(X_en_data[0])
print(X_de_data[0])
print(Y_data[0])

In [None]:
train_size=0.8
shuffle=True
np.random.seed(42)

In [None]:
zipped = list(zip(X_en_data, X_de_data, Y_data))
np.random.shuffle(zipped)
X_en_data, X_de_data, Y_data = zip(*zipped)
print(X_en_data[0])
print(X_de_data[0])
print(Y_data[0])

In [None]:
train_size = int(len(X_en_data)*train_size)
X_train_en = list(X_en_data[:train_size])
X_train_de = list(X_de_data[:train_size])
Y_train = list(Y_data[:train_size])
X_valid_en = list(X_en_data[train_size:])
X_valid_de = list(X_de_data[train_size:])
Y_valid = list(Y_data[train_size:])

In [None]:
print(X_train_en[0])
print(X_train_de[0])
print(Y_train[0])
print(X_valid_en[0])
print(X_valid_de[0])
print(Y_valid[0])

In [None]:
strip_chars = "!\"#$%&'()*+,-./:;=?@[\]^_`{|}~"
def custom_standardization(input_string):
 lowercase = tf.strings.lower(input_string)
 return tf.strings.regex_replace(lowercase, f'{re.escape(strip_chars)}', '')

In [None]:
vectorization = keras.layers.TextVectorization(
 max_tokens=VOCAB_SIZE,
 output_mode="int",
 output_sequence_length=SEQ_LENGTH,
 standardize=custom_standardization,
 )

vectorization.adapt(text_data)
vocab = np.array(vectorization.get_vocabulary())
np.save('./artifacts/vocabulary.npy', vocab)

In [None]:
def decode_and_resize(img_path):
 img = tf.io.read_file(img_path)
 img = tf.image.decode_jpeg(img, channels=3)
 img = tf.image.resize(img, IMAGE_SIZE)
 img = tf.image.convert_image_dtype(img, tf.float32)
 return img


def process_input(img_cap, y_captions):
 img_path, x_captions = img_cap
 return ((decode_and_resize(img_path), vectorization(x_captions)), vectorization(y_captions))


def make_dataset(images, x_captions, y_captions):
 dataset = tf.data.Dataset.from_tensor_slices(((images, x_captions), y_captions))
 dataset = dataset.map(process_input, num_parallel_calls=AUTOTUNE)
 dataset = dataset.batch(BATCH_SIZE).prefetch(AUTOTUNE)

 return dataset



train_dataset = make_dataset(X_train_en, X_train_de, Y_train)

valid_dataset = make_dataset(X_valid_en, X_valid_de, Y_valid)


In [None]:
image_augmentation = keras.Sequential(
 [
 keras.layers.RandomFlip("horizontal"),
 keras.layers.RandomRotation(0.2),
 keras.layers.RandomContrast(0.3),
 ]
)
@keras.saving.register_keras_serializable()
def get_cnn_model():
 base_model = keras.applications.efficientnet.EfficientNetB0(
 input_shape=(*IMAGE_SIZE, 3),
 include_top=False,
 weights="imagenet"
 )
 base_model.trainable = False
 base_model_out = base_model.output
 base_model_out = layers.Reshape((-1, base_model_out.shape[-1]))(base_model_out)
 cnn_model = keras.models.Model(base_model.input, base_model_out)
 return cnn_model

@keras.saving.register_keras_serializable()
class TransformerEncoderBlock(layers.Layer):
 def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
 super().__init__(**kwargs)
 self.embed_dim = embed_dim
 self.dense_dim = dense_dim
 self.num_heads = num_heads
 self.attention_1 = layers.MultiHeadAttention(
 num_heads=num_heads, key_dim=embed_dim, dropout=0.0
 )
 self.layernorm_1 = layers.LayerNormalization()
 self.layernorm_2 = layers.LayerNormalization()
 self.dense_1 = layers.Dense(embed_dim, activation="relu")

 def get_config(self):
 base_config = super().get_config()
 config = {
 "embed_dim": self.embed_dim,
 "dense_dim": self.dense_dim,
 "num_heads": self.num_heads,
 }
 return {**base_config, **config}


 def call(self, inputs, training):
 inputs = self.layernorm_1(inputs)
 inputs = self.dense_1(inputs)

 attention_output_1 = self.attention_1(
 query=inputs,
 value=inputs,
 key=inputs,
 training=training,
 )
 out_1 = self.layernorm_2(inputs + attention_output_1)
 return out_1

@keras.saving.register_keras_serializable()
class PositionalEmbedding(layers.Layer):
 def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
 super().__init__(**kwargs)
 self.token_embeddings = layers.Embedding(
 input_dim=vocab_size, output_dim=embed_dim, mask_zero=True
 )
 self.position_embeddings = layers.Embedding(
 input_dim=sequence_length, output_dim=embed_dim
 )
 self.sequence_length = sequence_length
 self.vocab_size = vocab_size
 self.embed_dim = embed_dim

 self.add = layers.Add()

 def get_config(self):
 base_config = super().get_config()
 config = {
 "sequence_length": self.sequence_length,
 "vocab_size": self.vocab_size,
 "embed_dim": self.embed_dim,
 }
 return {**base_config, **config}

 def call(self, seq):
 seq = self.token_embeddings(seq)

 x = tf.range(tf.shape(seq)[1])
 x = x[tf.newaxis, :]
 x = self.position_embeddings(x)

 return self.add([seq,x])

@keras.saving.register_keras_serializable()
class TransformerDecoderBlock(layers.Layer):
 def __init__(self, embed_dim, ff_dim, num_heads, **kwargs):
 super().__init__(**kwargs)
 self.embed_dim = embed_dim
 self.ff_dim = ff_dim
 self.num_heads = num_heads
 self.attention_1 = layers.MultiHeadAttention(
 num_heads=num_heads, key_dim=embed_dim, dropout=0.1
 )
 self.attention_2 = layers.MultiHeadAttention(
 num_heads=num_heads, key_dim=embed_dim, dropout=0.1
 )
 self.ffn_layer_1 = layers.Dense(ff_dim, activation="relu")
 self.ffn_layer_2 = layers.Dense(embed_dim)

 self.layernorm_1 = layers.LayerNormalization()
 self.layernorm_2 = layers.LayerNormalization()
 self.layernorm_3 = layers.LayerNormalization()

 self.embedding = PositionalEmbedding(
 embed_dim=EMBED_DIM,
 sequence_length=SEQ_LENGTH,
 vocab_size=VOCAB_SIZE,
 )
 self.out = layers.Dense(VOCAB_SIZE, activation="softmax")

 self.dropout_1 = layers.Dropout(0.3)
 self.dropout_2 = layers.Dropout(0.5)
 self.supports_masking = True

 def get_config(self):
 base_config = super().get_config()
 config = {
 "embed_dim": self.embed_dim,
 "ff_dim": self.ff_dim,
 "num_heads": self.num_heads,

 }
 return {**base_config, **config}



 def call(self, inputs, encoder_outputs, training, mask=None):
 inputs = self.embedding(inputs)

 attention_output_1 = self.attention_1(
 query=inputs,
 value=inputs,
 key=inputs,
 training=training,
 use_causal_mask=True
 )
 out_1 = self.layernorm_1(inputs + attention_output_1)

 attention_output_2 = self.attention_2(
 query=out_1,
 value=encoder_outputs,
 key=encoder_outputs,
 training=training,
 )
 out_2 = self.layernorm_2(out_1 + attention_output_2)

 ffn_out = self.ffn_layer_1(out_2)
 ffn_out = self.dropout_1(ffn_out, training=training)
 ffn_out = self.ffn_layer_2(ffn_out)

 ffn_out = self.layernorm_3(ffn_out + out_2, training=training)
 ffn_out = self.dropout_2(ffn_out, training=training)
 preds = self.out(ffn_out)
 return preds


@keras.saving.register_keras_serializable()
class ImageCaptioningModel(keras.Model):
 def __init__(
 self,
 cnn_model,
 encoder,
 decoder,
 image_aug=None,
 **kwargs
 ):
 super().__init__(**kwargs)
 self.cnn_model = cnn_model
 self.encoder = encoder
 self.decoder = decoder
 self.image_aug = image_aug

 def get_config(self):
 base_config = super().get_config()
 config = {
 "cnn_model": self.cnn_model,
 "encoder": self.encoder,
 "decoder": self.decoder,
 "image_aug": self.image_aug,
 }
 return {**base_config, **config}

 @classmethod
 def from_config(cls, config):
 # Note that you can also use [`keras.saving.deserialize_keras_object`](/api/models/model_saving_apis/serialization_utils#deserializekerasobject-function) here
 config["cnn_model"] = keras.saving.deserialize_keras_object(config["cnn_model"])
 config["encoder"] = keras.saving.deserialize_keras_object(config["encoder"])
 config["decoder"] = keras.saving.deserialize_keras_object(config["decoder"])
 config["image_aug"] = keras.saving.deserialize_keras_object(config["image_aug"])

 # Instantiate the ImageCaptioningModel with the remaining configuration
 return cls(**config)

 def call(self, inputs, training):
 img, caption = inputs
 if self.image_aug:
 img = self.image_aug(img)
 img_embed = self.cnn_model(img)
 encoder_out = self.encoder(img_embed, training=training)
 pred = self.decoder(caption, encoder_out, training=training)
 return pred




In [None]:
cnn_model = get_cnn_model()
encoder = TransformerEncoderBlock(embed_dim=EMBED_DIM, dense_dim=FF_DIM, num_heads=1)
decoder = TransformerDecoderBlock(embed_dim=EMBED_DIM, ff_dim=FF_DIM, num_heads=2)
caption_model = ImageCaptioningModel(
 cnn_model=cnn_model,
 encoder=encoder,
 decoder=decoder,
 image_aug=image_augmentation,
)

In [None]:

early_stopping = keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True)


@keras.saving.register_keras_serializable()
class LRSchedule(keras.optimizers.schedules.LearningRateSchedule):
 def __init__(self, post_warmup_learning_rate, warmup_steps, **kwargs):
 super().__init__(**kwargs)
 self.post_warmup_learning_rate = post_warmup_learning_rate
 self.warmup_steps = warmup_steps

 def get_config(self):
 config = {
 "post_warmup_learning_rate": self.post_warmup_learning_rate,
 "warmup_steps": self.warmup_steps,
 }
 return config

 def __call__(self, step):
 global_step = tf.cast(step, tf.float32)
 warmup_steps = tf.cast(self.warmup_steps, tf.float32)
 warmup_progress = global_step / warmup_steps
 warmup_learning_rate = self.post_warmup_learning_rate * warmup_progress
 return tf.cond(
 global_step < warmup_steps,
 lambda: warmup_learning_rate,
 lambda: self.post_warmup_learning_rate,
 )


num_train_steps = len(train_dataset) * EPOCHS
num_warmup_steps = num_train_steps // 15
lr_schedule = LRSchedule(post_warmup_learning_rate=1e-4, warmup_steps=num_warmup_steps)

caption_model.compile(optimizer=keras.optimizers.Adam(lr_schedule), loss='sparse_categorical_crossentropy',
 metrics=['accuracy'])

caption_model.fit(
 train_dataset,
 epochs=EPOCHS,
 validation_data=valid_dataset,
 callbacks=[early_stopping],
)

In [None]:
caption_model.save("caption_model.keras")

In [None]:
loaded_model = keras.models.load_model("caption_model.keras", compile=True)

In [None]:
caption_model.summary()

In [None]:
loaded_model.summary()

In [None]:
caption_model.evaluate(valid_dataset)

In [None]:
loaded_model.evaluate(valid_dataset)

In [None]:
import matplotlib.pyplot as plt

In [None]:


vocab = vectorization.get_vocabulary()
index_lookup = dict(zip(range(len(vocab)), vocab))
max_decoded_sentence_length = SEQ_LENGTH - 1
valid_images = list(X_train_en)


def generate_caption():
 # Select a random image from the validation dataset
 sample_img = np.random.choice(valid_images)

 # Read the image from the disk
 sample_img = decode_and_resize(sample_img)
 img = sample_img.numpy().clip(0, 255).astype(np.uint8)
 plt.imshow(img)
 plt.show()

 # Pass the image to the CNN
 img = tf.expand_dims(sample_img, 0)
 img = caption_model.cnn_model(img)

 # Pass the image features to the Transformer encoder
 encoded_img = caption_model.encoder(img, training=False)

 # Generate the caption using the Transformer decoder
 decoded_caption = " "
 for i in range(max_decoded_sentence_length):
 tokenized_caption = vectorization([decoded_caption])
 mask = tf.math.not_equal(tokenized_caption, 0)
 predictions = caption_model.decoder(
 tokenized_caption, encoded_img, training=False, mask=mask
 )
 sampled_token_index = np.argmax(predictions[0, i, :])
 sampled_token = index_lookup[sampled_token_index]
 if sampled_token == "":
 break
 decoded_caption += " " + sampled_token

 decoded_caption = decoded_caption.replace(" ", "")
 decoded_caption = decoded_caption.replace(" ", "").strip()
 print("Predicted Caption: ", decoded_caption)


# Check predictions for a few samples
generate_caption()


In [None]:


vocab = vectorization.get_vocabulary()
index_lookup = dict(zip(range(len(vocab)), vocab))
max_decoded_sentence_length = SEQ_LENGTH - 1
valid_images = list(X_train_en)


def generate_caption():
 # Select a random image from the validation dataset
 sample_img = np.random.choice(valid_images)

 # Read the image from the disk
 sample_img = decode_and_resize(sample_img)
 img = sample_img.numpy().clip(0, 255).astype(np.uint8)
 plt.imshow(img)
 plt.show()

 # Pass the image to the CNN
 img = tf.expand_dims(sample_img, 0)
 img = loaded_model.cnn_model(img)

 # Pass the image features to the Transformer encoder
 encoded_img = loaded_model.encoder(img, training=False)

 # Generate the caption using the Transformer decoder
 decoded_caption = " "
 for i in range(max_decoded_sentence_length):
 tokenized_caption = vectorization([decoded_caption])
 mask = tf.math.not_equal(tokenized_caption, 0)
 predictions = loaded_model.decoder(
 tokenized_caption, encoded_img, training=False, mask=mask
 )
 sampled_token_index = np.argmax(predictions[0, i, :])
 sampled_token = index_lookup[sampled_token_index]
 if sampled_token == "":
 break
 decoded_caption += " " + sampled_token

 decoded_caption = decoded_caption.replace(" ", "")
 decoded_caption = decoded_caption.replace(" ", "").strip()
 print("Predicted Caption: ", decoded_caption)


# Check predictions for a few samples
generate_caption()
