Spaces:
Sleeping
Sleeping
import os | |
# β Fix PermissionError on Hugging Face Spaces | |
os.environ["HF_HOME"] = "/tmp" | |
os.environ["HF_DATASETS_CACHE"] = "/tmp" | |
import streamlit as st | |
from datasets import load_dataset | |
import numpy as np | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
from collections import defaultdict, Counter | |
from sklearn.tree import DecisionTreeClassifier | |
from sklearn.ensemble import GradientBoostingClassifier | |
import random | |
st.title("π§ Language Model Explorer") | |
################################### | |
# Sidebar configuration | |
################################### | |
dataset_name = st.sidebar.selectbox( | |
"Choose Dataset", | |
["squad", "tiny_shakespeare"] | |
) | |
tokenizer_type = st.sidebar.selectbox( | |
"Choose Tokenizer", | |
["character", "word"] | |
) | |
model_type = st.sidebar.selectbox( | |
"Choose Model", | |
["N-gram", "Feed Forward NN", "Decision Tree", "Gradient Boosted Tree", "RNN"] | |
) | |
temperature = st.sidebar.slider("Sampling Temperature", 0.1, 2.0, 1.0) | |
context_size = st.sidebar.slider("Context Size (how many tokens to look back)", min_value=2, max_value=10, value=3, step=1) | |
# Number of tokens from dataset to use for training (minimum 100 tokens) | |
num_train_tokens = st.sidebar.slider("Number of tokens from dataset to train on", min_value=100, max_value=100000, value=1000, step=100) | |
train_button = st.sidebar.button("Train Model") | |
device = torch.device("cpu") # force CPU usage | |
################################### | |
# Load dataset | |
################################### | |
def load_text(dataset_name): | |
if dataset_name == "squad": | |
data = load_dataset("squad", split="train[:1%]") | |
texts = [x['context'] for x in data] | |
elif dataset_name == "tiny_shakespeare": | |
data = load_dataset("tiny_shakespeare") | |
texts = [data['train'][0]['text']] | |
else: | |
texts = ["hello world"] | |
return " ".join(texts) | |
text_data = load_text(dataset_name) | |
################################### | |
# Tokenization | |
################################### | |
def tokenize(text, tokenizer_type): | |
if tokenizer_type == "character": | |
tokens = list(text) | |
elif tokenizer_type == "word": | |
tokens = text.split() | |
return tokens | |
tokens_all = tokenize(text_data, tokenizer_type) | |
# Cap tokens to requested number for training | |
tokens = tokens_all[:num_train_tokens] | |
vocab = list(set(tokens)) | |
PAD_TOKEN = "<PAD>" | |
if PAD_TOKEN not in vocab: | |
vocab.append(PAD_TOKEN) | |
token_to_idx = {tok: i for i, tok in enumerate(vocab)} | |
idx_to_token = {i: tok for tok, i in token_to_idx.items()} | |
################################### | |
# Helper to pad context | |
################################### | |
def pad_context(context, size): | |
pad_len = size - len(context) | |
if pad_len > 0: | |
return [PAD_TOKEN]*pad_len + context | |
else: | |
return context[-size:] | |
################################### | |
# Models | |
################################### | |
class NGramModel: | |
def __init__(self, tokens, n=3): | |
self.n = n | |
self.model = defaultdict(Counter) | |
for i in range(len(tokens) - n): | |
context = tuple(tokens[i:i+n-1]) | |
next_token = tokens[i+n-1] | |
self.model[context][next_token] += 1 | |
def predict(self, context, temperature=1.0): | |
context = tuple(context[-(self.n-1):]) | |
counts = self.model.get(context, None) | |
if counts is None: | |
return random.choice(list(token_to_idx.keys())) | |
items = list(counts.items()) | |
tokens_, freqs = zip(*items) | |
probs = np.array(freqs, dtype=float) | |
probs = probs ** (1.0 / temperature) | |
probs /= probs.sum() | |
return np.random.choice(tokens_, p=probs) | |
################################### | |
# Feed Forward NN | |
################################### | |
class FFNN(nn.Module): | |
def __init__(self, vocab_size, context_size, hidden_size=128): | |
super().__init__() | |
self.embed = nn.Embedding(vocab_size, hidden_size) | |
self.fc1 = nn.Linear(hidden_size * context_size, hidden_size) | |
self.fc2 = nn.Linear(hidden_size, vocab_size) | |
def forward(self, x): | |
x = self.embed(x) | |
x = x.view(x.size(0), -1) | |
x = torch.relu(self.fc1(x)) | |
x = self.fc2(x) | |
return x | |
def train_ffnn(tokens, context_size=3, epochs=3): | |
data = [] | |
for i in range(len(tokens) - (context_size - 1)): | |
context = tokens[i : i + context_size - 1] | |
context = pad_context(context, context_size - 1) | |
target = tokens[i + context_size - 1] | |
data.append(( | |
torch.tensor([token_to_idx.get(t, token_to_idx[PAD_TOKEN]) for t in context], device=device), | |
token_to_idx.get(target, token_to_idx[PAD_TOKEN]) | |
)) | |
if len(data) == 0: | |
st.warning("No training data generated. Increase dataset size or reduce context size.") | |
return None | |
model = FFNN(len(vocab), context_size - 1).to(device) | |
optimizer = optim.Adam(model.parameters(), lr=0.01) | |
criterion = nn.CrossEntropyLoss() | |
progress_bar = st.progress(0) | |
total_steps = len(data) * epochs | |
step = 0 | |
model.train() | |
for epoch in range(epochs): | |
total_loss = 0 | |
random.shuffle(data) | |
for x, y in data: | |
x = x.unsqueeze(0) | |
y = torch.tensor([y], device=device) | |
optimizer.zero_grad() | |
out = model(x) | |
loss = criterion(out, y) | |
loss.backward() | |
optimizer.step() | |
total_loss += loss.item() | |
step += 1 | |
progress_bar.progress(step / total_steps) | |
st.write(f"Epoch {epoch+1}, Loss: {total_loss/len(data):.4f}") | |
progress_bar.empty() | |
return model | |
def ffnn_predict(model, context, temperature=1.0): | |
context = pad_context(context, context_size - 1) | |
x = torch.tensor([token_to_idx.get(tok, token_to_idx[PAD_TOKEN]) for tok in context], device=device).unsqueeze(0) | |
with torch.no_grad(): | |
logits = model(x).squeeze() | |
probs = torch.softmax(logits / temperature, dim=0).cpu().numpy() | |
return np.random.choice(vocab, p=probs) | |
################################### | |
# Decision Tree | |
################################### | |
def train_dt(tokens, context_size=3): | |
X, y = [], [] | |
for i in range(len(tokens) - (context_size - 1)): | |
context = tokens[i : i + context_size - 1] | |
context = pad_context(context, context_size - 1) | |
target = tokens[i + context_size - 1] | |
X.append([token_to_idx.get(t, token_to_idx[PAD_TOKEN]) for t in context]) | |
y.append(token_to_idx.get(target, token_to_idx[PAD_TOKEN])) | |
with st.spinner("Training Decision Tree..."): | |
model = DecisionTreeClassifier() | |
model.fit(X, y) | |
return model | |
def dt_predict(model, context): | |
context = pad_context(context, context_size - 1) | |
x = [token_to_idx.get(tok, token_to_idx[PAD_TOKEN]) for tok in context] | |
pred = model.predict([x])[0] | |
return idx_to_token[pred] | |
################################### | |
# Gradient Boosted Tree | |
################################### | |
def train_gbt(tokens, context_size=3): | |
X, y = [], [] | |
for i in range(len(tokens) - (context_size - 1)): | |
context = tokens[i : i + context_size - 1] | |
context = pad_context(context, context_size - 1) | |
target = tokens[i + context_size - 1] | |
X.append([token_to_idx.get(t, token_to_idx[PAD_TOKEN]) for t in context]) | |
y.append(token_to_idx.get(target, token_to_idx[PAD_TOKEN])) | |
with st.spinner("Training Gradient Boosted Tree..."): | |
model = GradientBoostingClassifier() | |
model.fit(X, y) | |
return model | |
def gbt_predict(model, context): | |
context = pad_context(context, context_size - 1) | |
x = [token_to_idx.get(tok, token_to_idx[PAD_TOKEN]) for tok in context] | |
pred = model.predict([x])[0] | |
return idx_to_token[pred] | |
################################### | |
# RNN | |
################################### | |
class RNNModel(nn.Module): | |
def __init__(self, vocab_size, embed_size=64, hidden_size=128): | |
super().__init__() | |
self.embed = nn.Embedding(vocab_size, embed_size) | |
self.rnn = nn.RNN(embed_size, hidden_size, batch_first=True) | |
self.fc = nn.Linear(hidden_size, vocab_size) | |
def forward(self, x, h=None): | |
x = self.embed(x) | |
out, h = self.rnn(x, h) | |
out = self.fc(out[:, -1, :]) | |
return out, h | |
def train_rnn(tokens, context_size=3, epochs=3): | |
data = [] | |
for i in range(len(tokens) - (context_size - 1)): | |
context = tokens[i : i + context_size - 1] | |
context = pad_context(context, context_size - 1) | |
target = tokens[i + context_size - 1] | |
data.append(( | |
torch.tensor([token_to_idx.get(t, token_to_idx[PAD_TOKEN]) for t in context], device=device), | |
token_to_idx.get(target, token_to_idx[PAD_TOKEN]) | |
)) | |
if len(data) == 0: | |
st.warning("No training data generated. Increase dataset size or reduce context size.") | |
return None | |
model = RNNModel(len(vocab)).to(device) | |
optimizer = optim.Adam(model.parameters(), lr=0.01) | |
criterion = nn.CrossEntropyLoss() | |
progress_bar = st.progress(0) | |
total_steps = len(data) * epochs | |
step = 0 | |
model.train() | |
for epoch in range(epochs): | |
total_loss = 0 | |
h = None | |
random.shuffle(data) | |
for x, y in data: | |
x = x.unsqueeze(0) | |
y = torch.tensor([y], device=device) | |
out, h = model(x, h) | |
loss = criterion(out, y) | |
optimizer.zero_grad() | |
loss.backward() | |
optimizer.step() | |
total_loss += loss.item() | |
step += 1 | |
progress_bar.progress(step / total_steps) | |
st.write(f"Epoch {epoch+1}, Loss: {total_loss/len(data):.4f}") | |
progress_bar.empty() | |
return model | |
def rnn_predict(model, context, temperature=1.0): | |
context = pad_context(context, context_size - 1) | |
x = torch.tensor([token_to_idx.get(tok, token_to_idx[PAD_TOKEN]) for tok in context], device=device).unsqueeze(0) | |
with torch.no_grad(): | |
logits, _ = model(x) | |
probs = torch.softmax(logits.squeeze() / temperature, dim=0).cpu().numpy() | |
return np.random.choice(vocab, p=probs) | |
################################### | |
# Train and evaluate | |
################################### | |
if train_button: | |
st.write(f"Training **{model_type}** model with context size {context_size} on {len(tokens)} tokens...") | |
if model_type == "N-gram": | |
with st.spinner("Training N-gram model..."): | |
model = NGramModel(tokens, n=context_size) | |
elif model_type == "Feed Forward NN": | |
model = train_ffnn(tokens, context_size=context_size) | |
elif model_type == "Decision Tree": | |
model = train_dt(tokens, context_size=context_size) | |
elif model_type == "Gradient Boosted Tree": | |
model = train_gbt(tokens, context_size=context_size) | |
elif model_type == "RNN": | |
model = train_rnn(tokens, context_size=context_size) | |
if model is not None: | |
st.session_state["model"] = model | |
st.session_state["model_type"] = model_type | |
st.session_state["context_size"] = context_size | |
st.success(f"{model_type} model trained.") | |
else: | |
st.error("Training failed due to no data.") | |
################################### | |
# Chat interface | |
################################### | |
st.header("π¬ Chat with the model") | |
if "model" in st.session_state: | |
user_input = st.text_input("Type a prompt:") | |
if user_input: | |
context = tokenize(user_input, tokenizer_type) | |
generated = context.copy() | |
for _ in range(20): | |
ctx = pad_context(generated, st.session_state["context_size"] - 1) | |
if st.session_state["model_type"] == "N-gram": | |
next_tok = st.session_state["model"].predict(ctx, temperature) | |
elif st.session_state["model_type"] == "Feed Forward NN": | |
next_tok = ffnn_predict(st.session_state["model"], ctx, temperature) | |
elif st.session_state["model_type"] == "Decision Tree": | |
next_tok = dt_predict(st.session_state["model"], ctx) | |
elif st.session_state["model_type"] == "Gradient Boosted Tree": | |
next_tok = gbt_predict(st.session_state["model"], ctx) | |
elif st.session_state["model_type"] == "RNN": | |
next_tok = rnn_predict(st.session_state["model"], ctx, temperature) | |
generated.append(next_tok) | |
if next_tok == "<END>": | |
break | |
if tokenizer_type == "character": | |
output = "".join(generated) | |
else: | |
output = " ".join(generated) | |
st.write("**Model Output:**") | |
st.write(output) | |
else: | |
st.info("Train a model to begin chatting.") | |