|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
from torch.utils.data import Dataset, DataLoader |
|
import numpy as np |
|
import requests |
|
import re |
|
import json |
|
import os |
|
from collections import Counter |
|
from typing import List, Tuple, Dict |
|
import random |
|
import math |
|
try: |
|
from datasets import load_dataset |
|
except ImportError: |
|
print("datasets non disponibile, usando solo dati sintetici") |
|
load_dataset = None |
|
try: |
|
from transformers import AutoTokenizer |
|
except ImportError: |
|
print("transformers non disponibile, usando tokenizer personalizzato") |
|
AutoTokenizer = None |
|
import gradio as gr |
|
|
|
class SelfOrganizingTokenizer: |
|
def __init__(self, vocab_size=30000): |
|
self.vocab_size = vocab_size |
|
self.token_to_id = {'<PAD>': 0, '<UNK>': 1, '<BOS>': 2, '<EOS>': 3} |
|
self.id_to_token = {0: '<PAD>', 1: '<UNK>', 2: '<BOS>', 3: '<EOS>'} |
|
self.word_freq = Counter() |
|
|
|
def build_vocab(self, texts): |
|
for text in texts: |
|
words = re.findall(r'\w+|[^\w\s]', text.lower()) |
|
self.word_freq.update(words) |
|
|
|
most_common = self.word_freq.most_common(self.vocab_size - 4) |
|
for i, (word, _) in enumerate(most_common): |
|
idx = i + 4 |
|
self.token_to_id[word] = idx |
|
self.id_to_token[idx] = word |
|
|
|
def encode(self, text): |
|
words = re.findall(r'\w+|[^\w\s]', text.lower()) |
|
return [self.token_to_id.get(word, 1) for word in words] |
|
|
|
def decode(self, ids): |
|
return ' '.join([self.id_to_token.get(id, '<UNK>') for id in ids]) |
|
|
|
class SelfOrganizingAttention(nn.Module): |
|
def __init__(self, embed_dim, num_heads): |
|
super().__init__() |
|
self.embed_dim = embed_dim |
|
self.num_heads = num_heads |
|
self.head_dim = embed_dim // num_heads |
|
|
|
self.qkv = nn.Linear(embed_dim, embed_dim * 3) |
|
self.proj = nn.Linear(embed_dim, embed_dim) |
|
self.adaptation_layer = nn.Linear(embed_dim, embed_dim) |
|
|
|
def forward(self, x): |
|
B, T, C = x.shape |
|
qkv = self.qkv(x).reshape(B, T, 3, self.num_heads, self.head_dim) |
|
q, k, v = qkv.permute(2, 0, 3, 1, 4) |
|
|
|
att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1))) |
|
att = torch.softmax(att, dim=-1) |
|
|
|
y = att @ v |
|
y = y.transpose(1, 2).reshape(B, T, C) |
|
y = self.proj(y) |
|
|
|
|
|
adaptation = torch.tanh(self.adaptation_layer(x)) |
|
y = y * (1 + 0.1 * adaptation) |
|
|
|
return y |
|
|
|
class SelfOrganizingTransformer(nn.Module): |
|
def __init__(self, vocab_size, embed_dim=512, num_heads=8, num_layers=6, max_len=1024): |
|
super().__init__() |
|
self.embed_dim = embed_dim |
|
self.tok_embed = nn.Embedding(vocab_size, embed_dim) |
|
self.pos_embed = nn.Embedding(max_len, embed_dim) |
|
|
|
self.layers = nn.ModuleList([ |
|
nn.ModuleDict({ |
|
'attn': SelfOrganizingAttention(embed_dim, num_heads), |
|
'norm1': nn.LayerNorm(embed_dim), |
|
'mlp': nn.Sequential( |
|
nn.Linear(embed_dim, 4 * embed_dim), |
|
nn.GELU(), |
|
nn.Linear(4 * embed_dim, embed_dim), |
|
), |
|
'norm2': nn.LayerNorm(embed_dim), |
|
'adaptation': nn.Linear(embed_dim, embed_dim) |
|
}) for _ in range(num_layers) |
|
]) |
|
|
|
self.ln_f = nn.LayerNorm(embed_dim) |
|
self.head = nn.Linear(embed_dim, vocab_size) |
|
|
|
|
|
self.plasticity = nn.Parameter(torch.ones(num_layers) * 0.01) |
|
|
|
def forward(self, x): |
|
B, T = x.shape |
|
pos = torch.arange(0, T, dtype=torch.long, device=x.device) |
|
|
|
x = self.tok_embed(x) + self.pos_embed(pos) |
|
|
|
for i, layer in enumerate(self.layers): |
|
residual = x |
|
x = layer['norm1'](x) |
|
x = layer['attn'](x) |
|
|
|
|
|
adaptation = torch.tanh(layer['adaptation'](x)) |
|
x = residual + x * (1 + self.plasticity[i] * adaptation) |
|
|
|
residual = x |
|
x = layer['norm2'](x) |
|
x = layer['mlp'](x) |
|
x = residual + x |
|
|
|
x = self.ln_f(x) |
|
logits = self.head(x) |
|
return logits |
|
|
|
class TextDataset(Dataset): |
|
def __init__(self, texts, tokenizer, max_len=512): |
|
self.texts = texts |
|
self.tokenizer = tokenizer |
|
self.max_len = max_len |
|
|
|
def __len__(self): |
|
return len(self.texts) |
|
|
|
def __getitem__(self, idx): |
|
text = self.texts[idx] |
|
tokens = self.tokenizer.encode(text) |
|
|
|
if len(tokens) < self.max_len: |
|
tokens = tokens + [0] * (self.max_len - len(tokens)) |
|
else: |
|
tokens = tokens[:self.max_len] |
|
|
|
return torch.tensor(tokens[:-1]), torch.tensor(tokens[1:]) |
|
|
|
class AITrainer: |
|
def __init__(self): |
|
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
self.tokenizer = None |
|
self.model = None |
|
self.datasets = [] |
|
|
|
def load_public_datasets(self): |
|
"""Carica dataset pubblici senza API key""" |
|
datasets = [] |
|
|
|
if load_dataset: |
|
try: |
|
|
|
wiki = load_dataset("wikipedia", "20220301.it", split="train[:1000]", trust_remote_code=True) |
|
for item in wiki: |
|
if len(item['text']) > 100: |
|
datasets.append(item['text']) |
|
print(f"Caricati {len(datasets)} esempi da Wikipedia") |
|
except Exception as e: |
|
print(f"Wikipedia non disponibile: {e}") |
|
|
|
try: |
|
|
|
cc = load_dataset("cc100", lang="it", split="train[:500]", trust_remote_code=True) |
|
for item in cc: |
|
if len(item['text']) > 100: |
|
datasets.append(item['text']) |
|
print(f"Caricati esempi da Common Crawl") |
|
except Exception as e: |
|
print(f"Common Crawl non disponibile: {e}") |
|
|
|
|
|
urls = [ |
|
"https://www.gutenberg.org/files/2000/2000-0.txt", |
|
] |
|
|
|
for url in urls: |
|
try: |
|
response = requests.get(url, timeout=10) |
|
if response.status_code == 200: |
|
text = response.text |
|
|
|
lines = text.split('\n') |
|
filtered_lines = [line.strip() for line in lines if len(line.strip()) > 50] |
|
chunks = filtered_lines[:1000] |
|
datasets.extend(chunks) |
|
print(f"Caricati {len(chunks)} chunk da {url}") |
|
except Exception as e: |
|
print(f"Errore caricamento {url}: {e}") |
|
continue |
|
|
|
|
|
print("Generazione dati sintetici...") |
|
synthetic_texts = self.generate_synthetic_data(8000) |
|
datasets.extend(synthetic_texts) |
|
|
|
self.datasets = datasets[:10000] |
|
print(f"Dataset finale: {len(self.datasets)} esempi") |
|
|
|
def generate_synthetic_data(self, num_samples): |
|
"""Genera dati sintetici per il training""" |
|
templates = [ |
|
"Il {sostantivo} {verbo} nel {luogo} durante {tempo}.", |
|
"La {sostantivo} è molto {aggettivo} e {verbo} sempre.", |
|
"Quando {verbo}, il {sostantivo} diventa {aggettivo}.", |
|
"Nel {luogo}, la {sostantivo} {verbo} con {sostantivo}.", |
|
"Il {aggettivo} {sostantivo} {verbo} ogni {tempo}." |
|
] |
|
|
|
sostantivi = ["gatto", "cane", "casa", "albero", "fiume", "montagna", "libro", "sole"] |
|
verbi = ["corre", "salta", "vola", "nuota", "dorme", "mangia", "gioca", "legge"] |
|
aggettivi = ["bello", "grande", "piccolo", "veloce", "lento", "intelligente", "forte"] |
|
luoghi = ["parco", "giardino", "bosco", "città", "mare", "cielo", "campo"] |
|
tempi = ["giorno", "notte", "mattina", "sera", "inverno", "estate", "primavera"] |
|
|
|
texts = [] |
|
for _ in range(num_samples): |
|
template = random.choice(templates) |
|
text = template.format( |
|
sostantivo=random.choice(sostantivi), |
|
verbo=random.choice(verbi), |
|
aggettivo=random.choice(aggettivi), |
|
luogo=random.choice(luoghi), |
|
tempo=random.choice(tempi) |
|
) |
|
texts.append(text) |
|
|
|
return texts |
|
|
|
def setup_model(self, vocab_size=30000): |
|
"""Configura il modello transformer auto-organizzante""" |
|
self.model = SelfOrganizingTransformer( |
|
vocab_size=vocab_size, |
|
embed_dim=512, |
|
num_heads=8, |
|
num_layers=6, |
|
max_len=512 |
|
).to(self.device) |
|
|
|
|
|
total_params = sum(p.numel() for p in self.model.parameters()) |
|
print(f"Modello creato con {total_params:,} parametri") |
|
|
|
def train(self, epochs=5, batch_size=16, lr=3e-4): |
|
"""Training del modello""" |
|
print("Inizializzazione tokenizer...") |
|
self.tokenizer = SelfOrganizingTokenizer() |
|
self.tokenizer.build_vocab(self.datasets) |
|
|
|
print("Configurazione modello...") |
|
self.setup_model(len(self.tokenizer.token_to_id)) |
|
|
|
print("Preparazione dataset...") |
|
dataset = TextDataset(self.datasets, self.tokenizer) |
|
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True) |
|
|
|
optimizer = optim.AdamW(self.model.parameters(), lr=lr, weight_decay=0.01) |
|
criterion = nn.CrossEntropyLoss(ignore_index=0) |
|
|
|
print("Inizio training...") |
|
self.model.train() |
|
|
|
for epoch in range(epochs): |
|
total_loss = 0 |
|
num_batches = 0 |
|
|
|
for batch_idx, (input_ids, target_ids) in enumerate(dataloader): |
|
input_ids = input_ids.to(self.device) |
|
target_ids = target_ids.to(self.device) |
|
|
|
optimizer.zero_grad() |
|
|
|
logits = self.model(input_ids) |
|
loss = criterion(logits.reshape(-1, logits.size(-1)), target_ids.reshape(-1)) |
|
|
|
loss.backward() |
|
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0) |
|
optimizer.step() |
|
|
|
total_loss += loss.item() |
|
num_batches += 1 |
|
|
|
if batch_idx % 50 == 0: |
|
print(f"Epoch {epoch+1}/{epochs}, Batch {batch_idx}, Loss: {loss.item():.4f}") |
|
|
|
avg_loss = total_loss / num_batches |
|
print(f"Epoch {epoch+1}/{epochs} completata. Loss media: {avg_loss:.4f}") |
|
|
|
|
|
if epoch % 2 == 0: |
|
self.test_generation("Il gatto") |
|
|
|
print("Training completato!") |
|
self.save_model() |
|
|
|
def test_generation(self, prompt, max_length=50): |
|
"""Test di generazione testo""" |
|
self.model.eval() |
|
with torch.no_grad(): |
|
tokens = self.tokenizer.encode(prompt) |
|
input_ids = torch.tensor([tokens]).to(self.device) |
|
|
|
for _ in range(max_length): |
|
logits = self.model(input_ids) |
|
next_token = torch.argmax(logits[0, -1, :], dim=-1) |
|
input_ids = torch.cat([input_ids, next_token.unsqueeze(0).unsqueeze(0)], dim=1) |
|
|
|
if next_token.item() == self.tokenizer.token_to_id.get('<EOS>', 3): |
|
break |
|
|
|
generated = self.tokenizer.decode(input_ids[0].cpu().numpy()) |
|
print(f"Generazione: {generated}") |
|
|
|
self.model.train() |
|
return generated |
|
|
|
def save_model(self): |
|
"""Salva il modello""" |
|
torch.save({ |
|
'model_state_dict': self.model.state_dict(), |
|
'tokenizer': self.tokenizer, |
|
'vocab_size': len(self.tokenizer.token_to_id) |
|
}, 'ai_model.pth') |
|
print("Modello salvato in ai_model.pth") |
|
|
|
def load_model(self): |
|
"""Carica il modello""" |
|
if os.path.exists('ai_model.pth'): |
|
checkpoint = torch.load('ai_model.pth', map_location=self.device) |
|
self.tokenizer = checkpoint['tokenizer'] |
|
self.setup_model(checkpoint['vocab_size']) |
|
self.model.load_state_dict(checkpoint['model_state_dict']) |
|
print("Modello caricato da ai_model.pth") |
|
return True |
|
return False |
|
|
|
def generate_text(self, prompt, max_length=100, temperature=0.8): |
|
"""Genera testo dal prompt""" |
|
if not self.model or not self.tokenizer: |
|
return "Modello non caricato. Esegui prima il training." |
|
|
|
self.model.eval() |
|
with torch.no_grad(): |
|
tokens = self.tokenizer.encode(prompt) |
|
input_ids = torch.tensor([tokens]).to(self.device) |
|
|
|
for _ in range(max_length): |
|
logits = self.model(input_ids) |
|
logits = logits[0, -1, :] / temperature |
|
probs = torch.softmax(logits, dim=-1) |
|
next_token = torch.multinomial(probs, 1) |
|
|
|
input_ids = torch.cat([input_ids, next_token.unsqueeze(0)], dim=1) |
|
|
|
if next_token.item() == self.tokenizer.token_to_id.get('<EOS>', 3): |
|
break |
|
|
|
generated = self.tokenizer.decode(input_ids[0].cpu().numpy()) |
|
return generated |
|
|
|
def create_interface(): |
|
"""Crea interfaccia Gradio""" |
|
trainer = AITrainer() |
|
|
|
def start_training(): |
|
try: |
|
trainer.load_public_datasets() |
|
trainer.train(epochs=3) |
|
return "Training completato con successo!" |
|
except Exception as e: |
|
return f"Errore durante il training: {str(e)}" |
|
|
|
def generate(prompt, max_len, temp): |
|
try: |
|
if not trainer.load_model(): |
|
return "Modello non trovato. Esegui prima il training." |
|
result = trainer.generate_text(prompt, max_len, temp) |
|
return result |
|
except Exception as e: |
|
return f"Errore nella generazione: {str(e)}" |
|
|
|
with gr.Blocks(title="AI Token Trainer") as demo: |
|
gr.Markdown("# AI Training System - Predizione Token") |
|
|
|
with gr.Tab("Training"): |
|
train_btn = gr.Button("Avvia Training", variant="primary") |
|
train_output = gr.Textbox(label="Stato Training", lines=5) |
|
train_btn.click(start_training, outputs=train_output) |
|
|
|
with gr.Tab("Generazione"): |
|
prompt_input = gr.Textbox(label="Prompt", placeholder="Inserisci il testo di partenza...") |
|
max_len_slider = gr.Slider(10, 200, value=50, label="Lunghezza massima") |
|
temp_slider = gr.Slider(0.1, 2.0, value=0.8, label="Temperatura") |
|
generate_btn = gr.Button("Genera Testo", variant="primary") |
|
output_text = gr.Textbox(label="Testo Generato", lines=10) |
|
|
|
generate_btn.click( |
|
generate, |
|
inputs=[prompt_input, max_len_slider, temp_slider], |
|
outputs=output_text |
|
) |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
|
|
if len(os.sys.argv) > 1 and os.sys.argv[1] == "train": |
|
trainer = AITrainer() |
|
trainer.load_public_datasets() |
|
trainer.train() |
|
else: |
|
|
|
demo = create_interface() |
|
demo.launch(share=True) |