|
The 155000 step version has about 158,100,000 prompt samples weight trained using the |
|
|
|
AbstractPhil/T5-Small-Human-Attentive-Try2-Pass3 |
|
|
|
This T5-small model is fried to echo and interpolate math in complex intended ways. I haven't given it the full robust check yet, but it's definitely pretty fed. |
|
|
|
This adapter here is trained using T5 inputs with this code below. |
|
|
|
This isn't a bad first test. I will be improving the adapter with common lora techniques, including more techniques from training LLM-style loras, and including additional loss methodologies while simultaneously including more advanced and carefully curated response formulas to the way the adapter responded to training and the extrapolative math from the CLIP_L adapted response. |
|
|
|
Given time I'm certain this will work; whether it be creating a layered lora structure to interpolate differences layer by layer within the clip_l, or perhaps in a much more direct neuron interpolation. Time will tell and I'm definitely enjoying this sort of thing. |
|
|
|
Errors to address in the next; |
|
* There is a clamping index error that tends to rear it's head that I haven't had time to track down. It'll cause solid black images from the velocity sigmas being too heavy. |
|
* Occasionally the entire structure of a generation collapses, which means the sigmas aren't lined up correctly - creating malformed sigma responses. |
|
* Occasionally the substructure interprets the request incorrectly; this is due to the tokenization being inaccurately attuned for some spaces than others and this next version will have node weighting for specific attention head sectors to account for it. |
|
|
|
There's many challenges ahead to reach the interpolation endpoint but it's definitely an adaptive journey. |
|
|
|
This is stage 1 of multiple stages to make the recreatable pragmatic outcomes needed in order to build the proofs required to recreate the Beatrix interpolation model - into useful utiliizations outside of diffusion. |
|
|
|
This process adapts multiple similar methods as what I used to create the Beatrix model, but it's not 1:1 by any stretch of the measure. |
|
|
|
I will be slowly releasing parts of Beatrix in training diagrams and stage the methodologies about how she works, so the interested experts will be capable of rationalizing why this model does what it does. |
|
|
|
Because I really don't know why Beatrix works the way she does, and I'm not going to just release something like that until I understand WHY it skips and hops past entropy. |
|
|
|
77 tokens - not 64, there's no need to upscale the most recent 77tok version; it's built to the same plane as CLIP_L now. |
|
|
|
|
|
``` |
|
def main(): |
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
# HF Hub settings |
|
hf_repo_id = "AbstractPhil/t5-to-vit-l-14-velocity-adapter-v3-100m-77tok" |
|
push_every_n_steps = 5000 |
|
|
|
# Tokenizers & frozen models |
|
t5_tok = T5TokenizerFast.from_pretrained("t5-small") |
|
t5_mod = T5EncoderModel.from_pretrained( |
|
"AbstractPhil/T5-Small-Human-Attentive-Try2-Pass3" |
|
).to(device).eval() |
|
clip_tok = CLIPTokenizerFast.from_pretrained("openai/clip-vit-large-patch14") |
|
clip_mod = CLIPTextModel.from_pretrained( |
|
"openai/clip-vit-large-patch14" |
|
).to(device).eval() |
|
|
|
# Adapter & optimizer |
|
adapter = RobustVelocityAdapter(out_tokens=77).to(device) |
|
optimizer = optim.AdamW(adapter.parameters(), lr=5e-4) |
|
|
|
# Compile models for speed |
|
t5_mod = torch.compile(t5_mod) |
|
clip_mod = torch.compile(clip_mod) |
|
adapter = torch.compile(adapter) |
|
|
|
scaler = GradScaler() # for mixed precision |
|
|
|
# Data |
|
dataset = ParsedMultiCharDataset("AbstractPhil/human-templated-captions-1b", |
|
num_files=12) |
|
loader = DataLoader(dataset, |
|
batch_size=None, |
|
num_workers=4, |
|
pin_memory=True) |
|
iterator = iter(loader) |
|
|
|
batch_size = 256 |
|
accum_steps = 4 # effective BS = 256 * 4 = 1024 |
|
max_steps = math.ceil(dataset.total_rows / batch_size) |
|
pbar = tqdm(total=max_steps, desc="Adapter training") |
|
|
|
for step in range(1, max_steps+1): |
|
# zero grads on actual step |
|
if (step-1) % accum_steps == 0: |
|
optimizer.zero_grad() |
|
|
|
# 1) Collect batch |
|
texts = [] |
|
for _ in range(batch_size): |
|
try: |
|
_, txt = next(iterator) |
|
except StopIteration: |
|
iterator = iter(loader) |
|
_, txt = next(iterator) |
|
texts.append(txt) |
|
|
|
# 2) Tokenize |
|
t5_inputs = t5_tok(texts, |
|
padding=True, |
|
truncation=True, |
|
max_length=77, |
|
return_tensors="pt").to(device) |
|
clip_inputs = clip_tok(texts, |
|
padding="max_length", |
|
truncation=True, |
|
max_length=77, |
|
return_tensors="pt").to(device) |
|
|
|
# 3) Forward + loss in mixed precision |
|
with autocast(): |
|
t5_seq = t5_mod(**t5_inputs).last_hidden_state # [B,77,512] |
|
clip_seq = clip_mod(**clip_inputs).last_hidden_state # [B,77,768] |
|
|
|
anchor_pred, delta_pred, sigma_pred = adapter(t5_seq) |
|
delta_target = clip_seq - anchor_pred |
|
|
|
loss_delta = hetero_loss(delta_pred, delta_target, sigma_pred) |
|
# cosine anchor alignment |
|
cos_sim = nn.functional.cosine_similarity( |
|
anchor_pred.reshape(-1,768), |
|
clip_seq.reshape(-1,768), |
|
dim=-1 |
|
).mean() |
|
loss_anchor = (1 - cos_sim) * 0.1 |
|
|
|
loss = loss_delta + loss_anchor |
|
loss = loss / accum_steps # scale for accumulation |
|
|
|
# 4) Backward + optimizer step |
|
scaler.scale(loss).backward() |
|
if step % accum_steps == 0: |
|
scaler.unscale_(optimizer) |
|
torch.nn.utils.clip_grad_norm_(adapter.parameters(), 1.0) |
|
scaler.step(optimizer) |
|
scaler.update() |
|
|
|
pbar.update(1) |
|
pbar.set_postfix(loss=(loss.item() * accum_steps)) |
|
|
|
# 5) Save & push every N steps |
|
if step % push_every_n_steps == 0: |
|
ckpt = f"/content/drive/MyDrive/t5-adapter/t5-to-vit-l-14-velocity-adapter-v3-100m-77tok_step_{step}.safetensors" |
|
save_file(adapter.state_dict(), ckpt) |
|
#upload_file(ckpt, ckpt, repo_id=hf_repo_id) |
|
|
|
|
|
pbar.close() |
|
|
|
``` |
|
|
|
### You'll need to snip out the __orig layer extensions that got snapped into it when I saved. |
|
Still not quite sure how to fix that without just editing before saving, but I think it's causing some sort of additional effects that I'm unaware of. |
|
I don't want to save as pt because they are considered unsafe and I don't want this to be considered unsafe for use. |
|
|
|
You can inference the test version using stable-diffusion-15 as an example test. |
|
The CLIP_L responses fall apart when too many nodes hit those guidance bells, but it's definitely a powerful first test using divergent systems. |
|
|
|
Should just run clean on colab using a l4. |
|
|
|
``` |
|
# Optimized inference_adapter.py |
|
|
|
import torch |
|
import math |
|
from PIL import Image |
|
from torchvision.transforms import ToPILImage |
|
from safetensors.torch import load_file as load_safetensors |
|
|
|
from transformers import ( |
|
T5TokenizerFast, T5EncoderModel, |
|
CLIPTokenizerFast, CLIPTextModel |
|
) |
|
from diffusers import ( |
|
AutoencoderKL, |
|
UNet2DConditionModel, |
|
EulerAncestralDiscreteScheduler |
|
) |
|
from typing import Optional |
|
|
|
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
|
# 1) GLOBAL SETUP: load once, cast, eval, move |
|
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
|
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
DTYPE = torch.float16 # use fp16 for everything on GPU |
|
|
|
# 1a) CLIP text encoder (cond + uncond) |
|
clip_tok = CLIPTokenizerFast.from_pretrained( |
|
"runwayml/stable-diffusion-v1-5", subfolder="tokenizer" |
|
) |
|
clip_mod = CLIPTextModel.from_pretrained( |
|
"runwayml/stable-diffusion-v1-5", subfolder="text_encoder", |
|
torch_dtype=DTYPE |
|
).to(DEVICE).eval() |
|
|
|
# 1b) T5 encoder |
|
t5_tok = T5TokenizerFast.from_pretrained("t5-small") |
|
t5_mod = T5EncoderModel.from_pretrained( |
|
"AbstractPhil/T5-Small-Human-Attentive-Try2-Pass3", |
|
torch_dtype=DTYPE |
|
).to(DEVICE).eval() |
|
|
|
# 1c) Velocity Adapter local directory |
|
local_adapter_directory = "roba_adapter_step_19500.safetensors" # opens the state below. |
|
|
|
|
|
# 1c) Adapter |
|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
|
|
import math |
|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
|
|
class RobustVelocityAdapter(nn.Module): |
|
""" |
|
Fixed version: manual multi-head cross-attention emits [B, heads, Q, K] scores |
|
so that _add_rel_pos_bias can unpack them correctly. |
|
""" |
|
def __init__( |
|
self, |
|
t5_dim: int = 512, |
|
clip_dim: int = 768, |
|
hidden_dim: int = 1024, |
|
out_tokens: int = 77, # now aligned with your T5 finetune |
|
self_attn_layers: int = 2, |
|
cross_heads: int = 8, |
|
max_rel_pos: int = 128, |
|
): |
|
super().__init__() |
|
self.out_tokens = out_tokens |
|
self.cross_heads = cross_heads |
|
self.head_dim = t5_dim // cross_heads |
|
self.max_rel_pos = max_rel_pos |
|
|
|
# 1) Self-attention stack |
|
self.self_attn = nn.ModuleList() |
|
self.self_norm = nn.ModuleList() |
|
for _ in range(self_attn_layers): |
|
self.self_attn.append(nn.MultiheadAttention(t5_dim, cross_heads, batch_first=True)) |
|
self.self_norm.append(nn.LayerNorm(t5_dim)) |
|
|
|
# 2) Residual blocks |
|
def resblock(): |
|
return nn.Sequential( |
|
nn.LayerNorm(t5_dim), |
|
nn.Linear(t5_dim, t5_dim), |
|
nn.GELU(), |
|
nn.Linear(t5_dim, t5_dim), |
|
) |
|
self.res1 = resblock() |
|
self.res2 = resblock() |
|
|
|
# 3) Learned queries for cross-attn |
|
self.query_pos = nn.Parameter(torch.randn(out_tokens, t5_dim)) |
|
|
|
# 4) Projection heads |
|
self.anchor_proj = nn.Sequential( |
|
nn.Linear(t5_dim, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, clip_dim) |
|
) |
|
self.delta_proj = nn.Sequential( |
|
nn.Linear(t5_dim, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, clip_dim) |
|
) |
|
self.var_proj = nn.Sequential( |
|
nn.Linear(t5_dim, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, clip_dim) |
|
) |
|
self.gate_proj = nn.Sequential( |
|
nn.Linear(t5_dim, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, clip_dim), nn.Sigmoid() |
|
) |
|
|
|
# 5) Relative-position bias table |
|
self.rel_bias = nn.Parameter(torch.zeros(2*max_rel_pos-1, cross_heads)) |
|
|
|
# 6) Norm after cross-attn |
|
self.cross_norm = nn.LayerNorm(t5_dim) |
|
|
|
def _add_rel_pos_bias(self, attn_scores: torch.Tensor) -> torch.Tensor: |
|
""" |
|
attn_scores: [B, heads, Q, K] |
|
returns: attn_scores + bias where bias is [B, heads, Q, K] |
|
""" |
|
B, H, Q, K = attn_scores.shape |
|
device = attn_scores.device |
|
|
|
# 1) Query & key position indices |
|
idx_q = torch.arange(Q, device=device) # [Q] |
|
idx_k = torch.arange(K, device=device) # [K] |
|
|
|
# 2) Compute relative distances for every (q, k) pair |
|
# rel[i,j] = idx_q[i] - idx_k[j] |
|
rel = idx_q.unsqueeze(1) - idx_k.unsqueeze(0) # [Q, K] |
|
|
|
# 3) Clamp & shift into bias table range [0, 2*max_rel-2] |
|
max_rel = self.max_rel_pos |
|
rel = rel.clamp(-max_rel+1, max_rel-1) + (max_rel - 1) |
|
|
|
# 4) Lookup per-head biases |
|
# self.rel_bias has shape [2*max_rel-1, H] |
|
bias = self.rel_bias[rel] # [Q, K, H] |
|
bias = bias.permute(2, 0, 1) # [H, Q, K] |
|
|
|
# 5) Broadcast to [B, H, Q, K] and add |
|
bias = bias.unsqueeze(0).expand(B, -1, -1, -1) |
|
return attn_scores + bias |
|
|
|
|
|
def forward(self, t5_seq: torch.Tensor): |
|
""" |
|
t5_seq: [B, L, t5_dim] |
|
returns: |
|
anchor: [B, out_tokens, clip_dim] |
|
delta: [B, out_tokens, clip_dim] |
|
sigma: [B, out_tokens, clip_dim] |
|
""" |
|
x = t5_seq |
|
B, L, D = x.shape |
|
|
|
# 1) Self-attention + residual |
|
for attn, norm in zip(self.self_attn, self.self_norm): |
|
res, _ = attn(x, x, x) |
|
x = norm(x + res) |
|
|
|
# 2) Residual blocks |
|
x = x + self.res1(x) |
|
x = x + self.res2(x) |
|
|
|
# 3) Prepare queries & split heads |
|
queries = self.query_pos.unsqueeze(0).expand(B, -1, -1) # [B, Q, D] |
|
# reshape into heads |
|
q = queries.view(B, self.out_tokens, self.cross_heads, self.head_dim).permute(0,2,1,3) |
|
k = x.view(B, L, self.cross_heads, self.head_dim).permute(0,2,1,3) |
|
v = k |
|
|
|
# 4) Scaled dot-product to get [B, heads, Q, K] |
|
scores = (q @ k.transpose(-2,-1)) / math.sqrt(self.head_dim) |
|
scores = self._add_rel_pos_bias(scores) |
|
probs = F.softmax(scores, dim=-1) # [B, H, Q, K] |
|
|
|
# 5) Attend & merge heads β [B, Q, D] |
|
ctx = probs @ v # [B, H, Q, head_dim] |
|
ctx = ctx.permute(0,2,1,3).reshape(B, self.out_tokens, D) |
|
ctx = self.cross_norm(ctx) |
|
|
|
# 6) Project to anchor, delta_mean, delta_logvar, gate |
|
anchor = self.anchor_proj(ctx) |
|
delta_mean = self.delta_proj(ctx) |
|
delta_logvar = self.var_proj(ctx) |
|
gate = self.gate_proj(ctx) |
|
|
|
# 7) Compute sigma & gated delta |
|
sigma = torch.exp(0.5 * delta_logvar) |
|
delta = delta_mean * gate |
|
|
|
return anchor, delta, sigma |
|
|
|
import torch |
|
import torch.nn.functional as F |
|
from PIL import Image |
|
from torchvision.transforms import ToPILImage |
|
from safetensors.torch import load_file as load_safetensors |
|
|
|
from transformers import ( |
|
CLIPTokenizer, CLIPTextModel, |
|
T5TokenizerFast, T5EncoderModel |
|
) |
|
from diffusers import ( |
|
AutoencoderKL, |
|
UNet2DConditionModel, |
|
EulerAncestralDiscreteScheduler |
|
) |
|
|
|
# 1) GLOBAL SETUP |
|
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
DTYPE = torch.float32 |
|
|
|
# 1a) CLIP tokenizer & text encoder |
|
clip_tok = CLIPTokenizer.from_pretrained( |
|
"runwayml/stable-diffusion-v1-5", subfolder="tokenizer" |
|
) |
|
clip_mod = CLIPTextModel.from_pretrained( |
|
"runwayml/stable-diffusion-v1-5", subfolder="text_encoder", |
|
torch_dtype=DTYPE |
|
).to(DEVICE).eval() |
|
|
|
# 1b) U-Net, VAE, Scheduler |
|
unet = UNet2DConditionModel.from_pretrained( |
|
"runwayml/stable-diffusion-v1-5", subfolder="unet", |
|
torch_dtype=DTYPE |
|
).to(DEVICE).eval() |
|
vae = AutoencoderKL.from_pretrained( |
|
"runwayml/stable-diffusion-v1-5", subfolder="vae", |
|
torch_dtype=DTYPE |
|
).to(DEVICE).eval() |
|
scheduler = EulerAncestralDiscreteScheduler.from_pretrained( |
|
"runwayml/stable-diffusion-v1-5", subfolder="scheduler" |
|
) |
|
|
|
# 1c) T5 |
|
t5_tok = T5TokenizerFast.from_pretrained("t5-small") |
|
t5_mod = T5EncoderModel.from_pretrained( |
|
"AbstractPhil/T5-Small-Human-Attentive-Try2-Pass3", |
|
torch_dtype=DTYPE |
|
).to(DEVICE).eval() |
|
|
|
# 1d) velocity prediction adapter |
|
adapter = RobustVelocityAdapter(out_tokens=77).to(DEVICE).eval() |
|
state = load_safetensors(local_adapter_directory, device="cpu") |
|
clean = {k.replace("_orig_mod.", ""): v for k, v in state.items()} |
|
adapter.load_state_dict(clean, strict=False) |
|
adapter.to(DEVICE).eval() |
|
|
|
|
|
|
|
# 2) GENERATION FUNCTION |
|
@torch.no_grad() |
|
def generate_image_with_adapter( |
|
prompt: str, |
|
seed: int = 42, |
|
steps: int = 50, |
|
adapter_scale: float = 0.5, |
|
guidance_scale: float = 7.5, |
|
height: int = 512, |
|
width: int = 512, |
|
): |
|
gen = torch.Generator(device=DEVICE).manual_seed(seed) |
|
|
|
# 2.1) CLIP embeddings |
|
clip_in = clip_tok([prompt], |
|
max_length=clip_tok.model_max_length, |
|
padding="max_length", truncation=True, |
|
return_tensors="pt").to(DEVICE) |
|
clip_cond = clip_mod(**clip_in).last_hidden_state # [1,77,768] |
|
|
|
empty_in = clip_tok([""], |
|
max_length=clip_tok.model_max_length, |
|
padding="max_length", truncation=True, |
|
return_tensors="pt").to(DEVICE) |
|
clip_uncond= clip_mod(**empty_in).last_hidden_state # [1,77,768] |
|
|
|
# 2.2) T5 β adapter β anchor, delta, sigma (77 tokens) |
|
t5_in = t5_tok(prompt, |
|
max_length=77, padding="max_length", |
|
truncation=True, return_tensors="pt").to(DEVICE) |
|
t5_seq = t5_mod(**t5_in).last_hidden_state # [1,77,512] |
|
anchor, delta, sigma = adapter(t5_seq) # each [1,77,768] |
|
|
|
# 2.3) Upsample to 77 tokens |
|
T_clip = clip_cond.shape[1] # 77 |
|
def up(x): |
|
return F.interpolate( |
|
x.permute(0,2,1), |
|
size=T_clip, mode="linear", align_corners=False |
|
).permute(0,2,1) |
|
anchor = up(anchor) |
|
delta = up(delta) |
|
sigma = up(sigma) |
|
|
|
# 2.4) Ο-based noise scaling |
|
raw_ns = sigma.mean().clamp(0.1, 2.0).item() |
|
noise_scale = 1.0 + adapter_scale * (raw_ns - 1.0) |
|
|
|
# 2.5) Initialize latents |
|
latents = torch.randn( |
|
(1, unet.config.in_channels, height//8, width//8), |
|
generator=gen, device=DEVICE, dtype=DTYPE |
|
) * scheduler.init_noise_sigma * noise_scale |
|
scheduler.set_timesteps(steps, device=DEVICE) |
|
|
|
# 2.6) Denoising with adapter guidance |
|
for i, t in enumerate(scheduler.timesteps): |
|
alpha = i / (len(scheduler.timesteps)-1) |
|
aw = adapter_scale * alpha |
|
cw = 1.0 - aw |
|
|
|
# blend anchors |
|
blended = clip_cond * cw + anchor * aw |
|
|
|
# per-token confidence |
|
eps = 1e-6 |
|
conf = 1.0 / (sigma + eps) |
|
conf = conf / conf.amax(dim=(1,2), keepdim=True) |
|
|
|
# gated delta |
|
gated_delta = delta * aw * conf |
|
|
|
# final cond embedding |
|
cond_embed = blended + gated_delta # [1,77,768] |
|
|
|
# UNet forward |
|
lat_in = scheduler.scale_model_input(latents, t) |
|
lat_in = torch.cat([lat_in, lat_in], dim=0) |
|
embeds = torch.cat([clip_uncond, cond_embed], dim=0) |
|
noise = unet(lat_in, t, encoder_hidden_states=embeds).sample |
|
u, c = noise.chunk(2) |
|
guided = u + guidance_scale * (c - u) |
|
latents= scheduler.step(guided, t, latents, generator=gen).prev_sample |
|
|
|
# 2.7) Decode |
|
dec_lat = latents / vae.config.scaling_factor |
|
image_t = vae.decode(dec_lat).sample |
|
image_t = (image_t.clamp(-1,1) + 1) / 2 |
|
return ToPILImage()(image_t[0]) |
|
|
|
# 3) RUN EXAMPLE |
|
if __name__ == "__main__": |
|
out = generate_image_with_adapter( |
|
"silly dog wearing a batman costume, high resolution, studio lighting", |
|
seed=1234, steps=50, |
|
adapter_scale=0.5, guidance_scale=7.5 |
|
) |
|
out.save("sd15_with_adapter.png") |
|
print("Saved sd15_with_adapter.png") |
|
|