geoai / app.py
mset's picture
Update app.py
ee732a9 verified
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import requests
import re
import json
import os
from collections import Counter
from typing import List, Tuple, Dict
import random
import math
try:
from datasets import load_dataset
except ImportError:
print("datasets non disponibile, usando solo dati sintetici")
load_dataset = None
try:
from transformers import AutoTokenizer
except ImportError:
print("transformers non disponibile, usando tokenizer personalizzato")
AutoTokenizer = None
import gradio as gr
class SelfOrganizingTokenizer:
def __init__(self, vocab_size=30000):
self.vocab_size = vocab_size
self.token_to_id = {'<PAD>': 0, '<UNK>': 1, '<BOS>': 2, '<EOS>': 3}
self.id_to_token = {0: '<PAD>', 1: '<UNK>', 2: '<BOS>', 3: '<EOS>'}
self.word_freq = Counter()
def build_vocab(self, texts):
for text in texts:
words = re.findall(r'\w+|[^\w\s]', text.lower())
self.word_freq.update(words)
most_common = self.word_freq.most_common(self.vocab_size - 4)
for i, (word, _) in enumerate(most_common):
idx = i + 4
self.token_to_id[word] = idx
self.id_to_token[idx] = word
def encode(self, text):
words = re.findall(r'\w+|[^\w\s]', text.lower())
return [self.token_to_id.get(word, 1) for word in words]
def decode(self, ids):
return ' '.join([self.id_to_token.get(id, '<UNK>') for id in ids])
class SelfOrganizingAttention(nn.Module):
def __init__(self, embed_dim, num_heads):
super().__init__()
self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads
self.qkv = nn.Linear(embed_dim, embed_dim * 3)
self.proj = nn.Linear(embed_dim, embed_dim)
self.adaptation_layer = nn.Linear(embed_dim, embed_dim)
def forward(self, x):
B, T, C = x.shape
qkv = self.qkv(x).reshape(B, T, 3, self.num_heads, self.head_dim)
q, k, v = qkv.permute(2, 0, 3, 1, 4)
att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
att = torch.softmax(att, dim=-1)
y = att @ v
y = y.transpose(1, 2).reshape(B, T, C)
y = self.proj(y)
# Auto-organizzazione
adaptation = torch.tanh(self.adaptation_layer(x))
y = y * (1 + 0.1 * adaptation)
return y
class SelfOrganizingTransformer(nn.Module):
def __init__(self, vocab_size, embed_dim=512, num_heads=8, num_layers=6, max_len=1024):
super().__init__()
self.embed_dim = embed_dim
self.tok_embed = nn.Embedding(vocab_size, embed_dim)
self.pos_embed = nn.Embedding(max_len, embed_dim)
self.layers = nn.ModuleList([
nn.ModuleDict({
'attn': SelfOrganizingAttention(embed_dim, num_heads),
'norm1': nn.LayerNorm(embed_dim),
'mlp': nn.Sequential(
nn.Linear(embed_dim, 4 * embed_dim),
nn.GELU(),
nn.Linear(4 * embed_dim, embed_dim),
),
'norm2': nn.LayerNorm(embed_dim),
'adaptation': nn.Linear(embed_dim, embed_dim)
}) for _ in range(num_layers)
])
self.ln_f = nn.LayerNorm(embed_dim)
self.head = nn.Linear(embed_dim, vocab_size)
# Parametri per auto-organizzazione
self.plasticity = nn.Parameter(torch.ones(num_layers) * 0.01)
def forward(self, x):
B, T = x.shape
pos = torch.arange(0, T, dtype=torch.long, device=x.device)
x = self.tok_embed(x) + self.pos_embed(pos)
for i, layer in enumerate(self.layers):
residual = x
x = layer['norm1'](x)
x = layer['attn'](x)
# Auto-organizzazione adattiva
adaptation = torch.tanh(layer['adaptation'](x))
x = residual + x * (1 + self.plasticity[i] * adaptation)
residual = x
x = layer['norm2'](x)
x = layer['mlp'](x)
x = residual + x
x = self.ln_f(x)
logits = self.head(x)
return logits
class TextDataset(Dataset):
def __init__(self, texts, tokenizer, max_len=512):
self.texts = texts
self.tokenizer = tokenizer
self.max_len = max_len
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
tokens = self.tokenizer.encode(text)
if len(tokens) < self.max_len:
tokens = tokens + [0] * (self.max_len - len(tokens))
else:
tokens = tokens[:self.max_len]
return torch.tensor(tokens[:-1]), torch.tensor(tokens[1:])
class AITrainer:
def __init__(self):
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.tokenizer = None
self.model = None
self.datasets = []
def load_public_datasets(self):
"""Carica dataset pubblici senza API key"""
datasets = []
if load_dataset:
try:
# Wikipedia in italiano
wiki = load_dataset("wikipedia", "20220301.it", split="train[:1000]", trust_remote_code=True)
for item in wiki:
if len(item['text']) > 100:
datasets.append(item['text'])
print(f"Caricati {len(datasets)} esempi da Wikipedia")
except Exception as e:
print(f"Wikipedia non disponibile: {e}")
try:
# Common Crawl
cc = load_dataset("cc100", lang="it", split="train[:500]", trust_remote_code=True)
for item in cc:
if len(item['text']) > 100:
datasets.append(item['text'])
print(f"Caricati esempi da Common Crawl")
except Exception as e:
print(f"Common Crawl non disponibile: {e}")
# Dataset di testo semplice da URL pubblici
urls = [
"https://www.gutenberg.org/files/2000/2000-0.txt", # Divina Commedia
]
for url in urls:
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
text = response.text
# Filtra contenuto utile
lines = text.split('\n')
filtered_lines = [line.strip() for line in lines if len(line.strip()) > 50]
chunks = filtered_lines[:1000] # Primi 1000 chunk
datasets.extend(chunks)
print(f"Caricati {len(chunks)} chunk da {url}")
except Exception as e:
print(f"Errore caricamento {url}: {e}")
continue
# Genera dati sintetici
print("Generazione dati sintetici...")
synthetic_texts = self.generate_synthetic_data(8000)
datasets.extend(synthetic_texts)
self.datasets = datasets[:10000] # Limita a 10k esempi
print(f"Dataset finale: {len(self.datasets)} esempi")
def generate_synthetic_data(self, num_samples):
"""Genera dati sintetici per il training"""
templates = [
"Il {sostantivo} {verbo} nel {luogo} durante {tempo}.",
"La {sostantivo} è molto {aggettivo} e {verbo} sempre.",
"Quando {verbo}, il {sostantivo} diventa {aggettivo}.",
"Nel {luogo}, la {sostantivo} {verbo} con {sostantivo}.",
"Il {aggettivo} {sostantivo} {verbo} ogni {tempo}."
]
sostantivi = ["gatto", "cane", "casa", "albero", "fiume", "montagna", "libro", "sole"]
verbi = ["corre", "salta", "vola", "nuota", "dorme", "mangia", "gioca", "legge"]
aggettivi = ["bello", "grande", "piccolo", "veloce", "lento", "intelligente", "forte"]
luoghi = ["parco", "giardino", "bosco", "città", "mare", "cielo", "campo"]
tempi = ["giorno", "notte", "mattina", "sera", "inverno", "estate", "primavera"]
texts = []
for _ in range(num_samples):
template = random.choice(templates)
text = template.format(
sostantivo=random.choice(sostantivi),
verbo=random.choice(verbi),
aggettivo=random.choice(aggettivi),
luogo=random.choice(luoghi),
tempo=random.choice(tempi)
)
texts.append(text)
return texts
def setup_model(self, vocab_size=30000):
"""Configura il modello transformer auto-organizzante"""
self.model = SelfOrganizingTransformer(
vocab_size=vocab_size,
embed_dim=512,
num_heads=8,
num_layers=6,
max_len=512
).to(self.device)
# Calcola parametri
total_params = sum(p.numel() for p in self.model.parameters())
print(f"Modello creato con {total_params:,} parametri")
def train(self, epochs=5, batch_size=16, lr=3e-4):
"""Training del modello"""
print("Inizializzazione tokenizer...")
self.tokenizer = SelfOrganizingTokenizer()
self.tokenizer.build_vocab(self.datasets)
print("Configurazione modello...")
self.setup_model(len(self.tokenizer.token_to_id))
print("Preparazione dataset...")
dataset = TextDataset(self.datasets, self.tokenizer)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
optimizer = optim.AdamW(self.model.parameters(), lr=lr, weight_decay=0.01)
criterion = nn.CrossEntropyLoss(ignore_index=0)
print("Inizio training...")
self.model.train()
for epoch in range(epochs):
total_loss = 0
num_batches = 0
for batch_idx, (input_ids, target_ids) in enumerate(dataloader):
input_ids = input_ids.to(self.device)
target_ids = target_ids.to(self.device)
optimizer.zero_grad()
logits = self.model(input_ids)
loss = criterion(logits.reshape(-1, logits.size(-1)), target_ids.reshape(-1))
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
optimizer.step()
total_loss += loss.item()
num_batches += 1
if batch_idx % 50 == 0:
print(f"Epoch {epoch+1}/{epochs}, Batch {batch_idx}, Loss: {loss.item():.4f}")
avg_loss = total_loss / num_batches
print(f"Epoch {epoch+1}/{epochs} completata. Loss media: {avg_loss:.4f}")
# Test generazione
if epoch % 2 == 0:
self.test_generation("Il gatto")
print("Training completato!")
self.save_model()
def test_generation(self, prompt, max_length=50):
"""Test di generazione testo"""
self.model.eval()
with torch.no_grad():
tokens = self.tokenizer.encode(prompt)
input_ids = torch.tensor([tokens]).to(self.device)
for _ in range(max_length):
logits = self.model(input_ids)
next_token = torch.argmax(logits[0, -1, :], dim=-1)
input_ids = torch.cat([input_ids, next_token.unsqueeze(0).unsqueeze(0)], dim=1)
if next_token.item() == self.tokenizer.token_to_id.get('<EOS>', 3):
break
generated = self.tokenizer.decode(input_ids[0].cpu().numpy())
print(f"Generazione: {generated}")
self.model.train()
return generated
def save_model(self):
"""Salva il modello"""
torch.save({
'model_state_dict': self.model.state_dict(),
'tokenizer': self.tokenizer,
'vocab_size': len(self.tokenizer.token_to_id)
}, 'ai_model.pth')
print("Modello salvato in ai_model.pth")
def load_model(self):
"""Carica il modello"""
if os.path.exists('ai_model.pth'):
checkpoint = torch.load('ai_model.pth', map_location=self.device)
self.tokenizer = checkpoint['tokenizer']
self.setup_model(checkpoint['vocab_size'])
self.model.load_state_dict(checkpoint['model_state_dict'])
print("Modello caricato da ai_model.pth")
return True
return False
def generate_text(self, prompt, max_length=100, temperature=0.8):
"""Genera testo dal prompt"""
if not self.model or not self.tokenizer:
return "Modello non caricato. Esegui prima il training."
self.model.eval()
with torch.no_grad():
tokens = self.tokenizer.encode(prompt)
input_ids = torch.tensor([tokens]).to(self.device)
for _ in range(max_length):
logits = self.model(input_ids)
logits = logits[0, -1, :] / temperature
probs = torch.softmax(logits, dim=-1)
next_token = torch.multinomial(probs, 1)
input_ids = torch.cat([input_ids, next_token.unsqueeze(0)], dim=1)
if next_token.item() == self.tokenizer.token_to_id.get('<EOS>', 3):
break
generated = self.tokenizer.decode(input_ids[0].cpu().numpy())
return generated
def create_interface():
"""Crea interfaccia Gradio"""
trainer = AITrainer()
def start_training():
try:
trainer.load_public_datasets()
trainer.train(epochs=3)
return "Training completato con successo!"
except Exception as e:
return f"Errore durante il training: {str(e)}"
def generate(prompt, max_len, temp):
try:
if not trainer.load_model():
return "Modello non trovato. Esegui prima il training."
result = trainer.generate_text(prompt, max_len, temp)
return result
except Exception as e:
return f"Errore nella generazione: {str(e)}"
with gr.Blocks(title="AI Token Trainer") as demo:
gr.Markdown("# AI Training System - Predizione Token")
with gr.Tab("Training"):
train_btn = gr.Button("Avvia Training", variant="primary")
train_output = gr.Textbox(label="Stato Training", lines=5)
train_btn.click(start_training, outputs=train_output)
with gr.Tab("Generazione"):
prompt_input = gr.Textbox(label="Prompt", placeholder="Inserisci il testo di partenza...")
max_len_slider = gr.Slider(10, 200, value=50, label="Lunghezza massima")
temp_slider = gr.Slider(0.1, 2.0, value=0.8, label="Temperatura")
generate_btn = gr.Button("Genera Testo", variant="primary")
output_text = gr.Textbox(label="Testo Generato", lines=10)
generate_btn.click(
generate,
inputs=[prompt_input, max_len_slider, temp_slider],
outputs=output_text
)
return demo
if __name__ == "__main__":
# Training automatico se richiesto
if len(os.sys.argv) > 1 and os.sys.argv[1] == "train":
trainer = AITrainer()
trainer.load_public_datasets()
trainer.train()
else:
# Interfaccia Gradio
demo = create_interface()
demo.launch(share=True)