import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader import numpy as np import requests import re import json import os from collections import Counter from typing import List, Tuple, Dict import random import math try: from datasets import load_dataset except ImportError: print("datasets non disponibile, usando solo dati sintetici") load_dataset = None try: from transformers import AutoTokenizer except ImportError: print("transformers non disponibile, usando tokenizer personalizzato") AutoTokenizer = None import gradio as gr class SelfOrganizingTokenizer: def __init__(self, vocab_size=30000): self.vocab_size = vocab_size self.token_to_id = {'': 0, '': 1, '': 2, '': 3} self.id_to_token = {0: '', 1: '', 2: '', 3: ''} self.word_freq = Counter() def build_vocab(self, texts): for text in texts: words = re.findall(r'\w+|[^\w\s]', text.lower()) self.word_freq.update(words) most_common = self.word_freq.most_common(self.vocab_size - 4) for i, (word, _) in enumerate(most_common): idx = i + 4 self.token_to_id[word] = idx self.id_to_token[idx] = word def encode(self, text): words = re.findall(r'\w+|[^\w\s]', text.lower()) return [self.token_to_id.get(word, 1) for word in words] def decode(self, ids): return ' '.join([self.id_to_token.get(id, '') for id in ids]) class SelfOrganizingAttention(nn.Module): def __init__(self, embed_dim, num_heads): super().__init__() self.embed_dim = embed_dim self.num_heads = num_heads self.head_dim = embed_dim // num_heads self.qkv = nn.Linear(embed_dim, embed_dim * 3) self.proj = nn.Linear(embed_dim, embed_dim) self.adaptation_layer = nn.Linear(embed_dim, embed_dim) def forward(self, x): B, T, C = x.shape qkv = self.qkv(x).reshape(B, T, 3, self.num_heads, self.head_dim) q, k, v = qkv.permute(2, 0, 3, 1, 4) att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1))) att = torch.softmax(att, dim=-1) y = att @ v y = y.transpose(1, 2).reshape(B, T, C) y = self.proj(y) # Auto-organizzazione adaptation = torch.tanh(self.adaptation_layer(x)) y = y * (1 + 0.1 * adaptation) return y class SelfOrganizingTransformer(nn.Module): def __init__(self, vocab_size, embed_dim=512, num_heads=8, num_layers=6, max_len=1024): super().__init__() self.embed_dim = embed_dim self.tok_embed = nn.Embedding(vocab_size, embed_dim) self.pos_embed = nn.Embedding(max_len, embed_dim) self.layers = nn.ModuleList([ nn.ModuleDict({ 'attn': SelfOrganizingAttention(embed_dim, num_heads), 'norm1': nn.LayerNorm(embed_dim), 'mlp': nn.Sequential( nn.Linear(embed_dim, 4 * embed_dim), nn.GELU(), nn.Linear(4 * embed_dim, embed_dim), ), 'norm2': nn.LayerNorm(embed_dim), 'adaptation': nn.Linear(embed_dim, embed_dim) }) for _ in range(num_layers) ]) self.ln_f = nn.LayerNorm(embed_dim) self.head = nn.Linear(embed_dim, vocab_size) # Parametri per auto-organizzazione self.plasticity = nn.Parameter(torch.ones(num_layers) * 0.01) def forward(self, x): B, T = x.shape pos = torch.arange(0, T, dtype=torch.long, device=x.device) x = self.tok_embed(x) + self.pos_embed(pos) for i, layer in enumerate(self.layers): residual = x x = layer['norm1'](x) x = layer['attn'](x) # Auto-organizzazione adattiva adaptation = torch.tanh(layer['adaptation'](x)) x = residual + x * (1 + self.plasticity[i] * adaptation) residual = x x = layer['norm2'](x) x = layer['mlp'](x) x = residual + x x = self.ln_f(x) logits = self.head(x) return logits class TextDataset(Dataset): def __init__(self, texts, tokenizer, max_len=512): self.texts = texts self.tokenizer = tokenizer self.max_len = max_len def __len__(self): return len(self.texts) def __getitem__(self, idx): text = self.texts[idx] tokens = self.tokenizer.encode(text) if len(tokens) < self.max_len: tokens = tokens + [0] * (self.max_len - len(tokens)) else: tokens = tokens[:self.max_len] return torch.tensor(tokens[:-1]), torch.tensor(tokens[1:]) class AITrainer: def __init__(self): self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.tokenizer = None self.model = None self.datasets = [] def load_public_datasets(self): """Carica dataset pubblici senza API key""" datasets = [] if load_dataset: try: # Wikipedia in italiano wiki = load_dataset("wikipedia", "20220301.it", split="train[:1000]", trust_remote_code=True) for item in wiki: if len(item['text']) > 100: datasets.append(item['text']) print(f"Caricati {len(datasets)} esempi da Wikipedia") except Exception as e: print(f"Wikipedia non disponibile: {e}") try: # Common Crawl cc = load_dataset("cc100", lang="it", split="train[:500]", trust_remote_code=True) for item in cc: if len(item['text']) > 100: datasets.append(item['text']) print(f"Caricati esempi da Common Crawl") except Exception as e: print(f"Common Crawl non disponibile: {e}") # Dataset di testo semplice da URL pubblici urls = [ "https://www.gutenberg.org/files/2000/2000-0.txt", # Divina Commedia ] for url in urls: try: response = requests.get(url, timeout=10) if response.status_code == 200: text = response.text # Filtra contenuto utile lines = text.split('\n') filtered_lines = [line.strip() for line in lines if len(line.strip()) > 50] chunks = filtered_lines[:1000] # Primi 1000 chunk datasets.extend(chunks) print(f"Caricati {len(chunks)} chunk da {url}") except Exception as e: print(f"Errore caricamento {url}: {e}") continue # Genera dati sintetici print("Generazione dati sintetici...") synthetic_texts = self.generate_synthetic_data(8000) datasets.extend(synthetic_texts) self.datasets = datasets[:10000] # Limita a 10k esempi print(f"Dataset finale: {len(self.datasets)} esempi") def generate_synthetic_data(self, num_samples): """Genera dati sintetici per il training""" templates = [ "Il {sostantivo} {verbo} nel {luogo} durante {tempo}.", "La {sostantivo} è molto {aggettivo} e {verbo} sempre.", "Quando {verbo}, il {sostantivo} diventa {aggettivo}.", "Nel {luogo}, la {sostantivo} {verbo} con {sostantivo}.", "Il {aggettivo} {sostantivo} {verbo} ogni {tempo}." ] sostantivi = ["gatto", "cane", "casa", "albero", "fiume", "montagna", "libro", "sole"] verbi = ["corre", "salta", "vola", "nuota", "dorme", "mangia", "gioca", "legge"] aggettivi = ["bello", "grande", "piccolo", "veloce", "lento", "intelligente", "forte"] luoghi = ["parco", "giardino", "bosco", "città", "mare", "cielo", "campo"] tempi = ["giorno", "notte", "mattina", "sera", "inverno", "estate", "primavera"] texts = [] for _ in range(num_samples): template = random.choice(templates) text = template.format( sostantivo=random.choice(sostantivi), verbo=random.choice(verbi), aggettivo=random.choice(aggettivi), luogo=random.choice(luoghi), tempo=random.choice(tempi) ) texts.append(text) return texts def setup_model(self, vocab_size=30000): """Configura il modello transformer auto-organizzante""" self.model = SelfOrganizingTransformer( vocab_size=vocab_size, embed_dim=512, num_heads=8, num_layers=6, max_len=512 ).to(self.device) # Calcola parametri total_params = sum(p.numel() for p in self.model.parameters()) print(f"Modello creato con {total_params:,} parametri") def train(self, epochs=5, batch_size=16, lr=3e-4): """Training del modello""" print("Inizializzazione tokenizer...") self.tokenizer = SelfOrganizingTokenizer() self.tokenizer.build_vocab(self.datasets) print("Configurazione modello...") self.setup_model(len(self.tokenizer.token_to_id)) print("Preparazione dataset...") dataset = TextDataset(self.datasets, self.tokenizer) dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True) optimizer = optim.AdamW(self.model.parameters(), lr=lr, weight_decay=0.01) criterion = nn.CrossEntropyLoss(ignore_index=0) print("Inizio training...") self.model.train() for epoch in range(epochs): total_loss = 0 num_batches = 0 for batch_idx, (input_ids, target_ids) in enumerate(dataloader): input_ids = input_ids.to(self.device) target_ids = target_ids.to(self.device) optimizer.zero_grad() logits = self.model(input_ids) loss = criterion(logits.reshape(-1, logits.size(-1)), target_ids.reshape(-1)) loss.backward() torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0) optimizer.step() total_loss += loss.item() num_batches += 1 if batch_idx % 50 == 0: print(f"Epoch {epoch+1}/{epochs}, Batch {batch_idx}, Loss: {loss.item():.4f}") avg_loss = total_loss / num_batches print(f"Epoch {epoch+1}/{epochs} completata. Loss media: {avg_loss:.4f}") # Test generazione if epoch % 2 == 0: self.test_generation("Il gatto") print("Training completato!") self.save_model() def test_generation(self, prompt, max_length=50): """Test di generazione testo""" self.model.eval() with torch.no_grad(): tokens = self.tokenizer.encode(prompt) input_ids = torch.tensor([tokens]).to(self.device) for _ in range(max_length): logits = self.model(input_ids) next_token = torch.argmax(logits[0, -1, :], dim=-1) input_ids = torch.cat([input_ids, next_token.unsqueeze(0).unsqueeze(0)], dim=1) if next_token.item() == self.tokenizer.token_to_id.get('', 3): break generated = self.tokenizer.decode(input_ids[0].cpu().numpy()) print(f"Generazione: {generated}") self.model.train() return generated def save_model(self): """Salva il modello""" torch.save({ 'model_state_dict': self.model.state_dict(), 'tokenizer': self.tokenizer, 'vocab_size': len(self.tokenizer.token_to_id) }, 'ai_model.pth') print("Modello salvato in ai_model.pth") def load_model(self): """Carica il modello""" if os.path.exists('ai_model.pth'): checkpoint = torch.load('ai_model.pth', map_location=self.device) self.tokenizer = checkpoint['tokenizer'] self.setup_model(checkpoint['vocab_size']) self.model.load_state_dict(checkpoint['model_state_dict']) print("Modello caricato da ai_model.pth") return True return False def generate_text(self, prompt, max_length=100, temperature=0.8): """Genera testo dal prompt""" if not self.model or not self.tokenizer: return "Modello non caricato. Esegui prima il training." self.model.eval() with torch.no_grad(): tokens = self.tokenizer.encode(prompt) input_ids = torch.tensor([tokens]).to(self.device) for _ in range(max_length): logits = self.model(input_ids) logits = logits[0, -1, :] / temperature probs = torch.softmax(logits, dim=-1) next_token = torch.multinomial(probs, 1) input_ids = torch.cat([input_ids, next_token.unsqueeze(0)], dim=1) if next_token.item() == self.tokenizer.token_to_id.get('', 3): break generated = self.tokenizer.decode(input_ids[0].cpu().numpy()) return generated def create_interface(): """Crea interfaccia Gradio""" trainer = AITrainer() def start_training(): try: trainer.load_public_datasets() trainer.train(epochs=3) return "Training completato con successo!" except Exception as e: return f"Errore durante il training: {str(e)}" def generate(prompt, max_len, temp): try: if not trainer.load_model(): return "Modello non trovato. Esegui prima il training." result = trainer.generate_text(prompt, max_len, temp) return result except Exception as e: return f"Errore nella generazione: {str(e)}" with gr.Blocks(title="AI Token Trainer") as demo: gr.Markdown("# AI Training System - Predizione Token") with gr.Tab("Training"): train_btn = gr.Button("Avvia Training", variant="primary") train_output = gr.Textbox(label="Stato Training", lines=5) train_btn.click(start_training, outputs=train_output) with gr.Tab("Generazione"): prompt_input = gr.Textbox(label="Prompt", placeholder="Inserisci il testo di partenza...") max_len_slider = gr.Slider(10, 200, value=50, label="Lunghezza massima") temp_slider = gr.Slider(0.1, 2.0, value=0.8, label="Temperatura") generate_btn = gr.Button("Genera Testo", variant="primary") output_text = gr.Textbox(label="Testo Generato", lines=10) generate_btn.click( generate, inputs=[prompt_input, max_len_slider, temp_slider], outputs=output_text ) return demo if __name__ == "__main__": # Training automatico se richiesto if len(os.sys.argv) > 1 and os.sys.argv[1] == "train": trainer = AITrainer() trainer.load_public_datasets() trainer.train() else: # Interfaccia Gradio demo = create_interface() demo.launch(share=True)