|
|
|
""" |
|
Advanced Magnus Model Backend Integration |
|
Loads and serves the latest trained advanced Magnus model for FastAPI |
|
""" |
|
|
|
import sys |
|
import pickle |
|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
import numpy as np |
|
from pathlib import Path |
|
from typing import Dict, List, Tuple, Optional, Any |
|
from collections import Counter |
|
import chess |
|
import chess.pgn |
|
import yaml |
|
import json |
|
import warnings |
|
|
|
warnings.filterwarnings("ignore") |
|
|
|
|
|
project_root = Path(__file__).parent.parent.parent |
|
sys.path.append(str(project_root)) |
|
|
|
|
|
class AdvancedChessFeatureExtractor: |
|
"""Extract advanced chess features for better move prediction""" |
|
|
|
def __init__(self): |
|
self.piece_values = { |
|
"p": 1, |
|
"n": 3, |
|
"b": 3, |
|
"r": 5, |
|
"q": 9, |
|
"k": 0, |
|
"P": 1, |
|
"N": 3, |
|
"B": 3, |
|
"R": 5, |
|
"Q": 9, |
|
"K": 0, |
|
} |
|
|
|
def extract_features(self, position_data): |
|
"""Extract comprehensive position features""" |
|
features = [] |
|
|
|
|
|
white_material = sum( |
|
self.piece_values.get(p, 0) for p in str(position_data) if p.isupper() |
|
) |
|
black_material = sum( |
|
self.piece_values.get(p, 0) for p in str(position_data) if p.islower() |
|
) |
|
material_balance = white_material - black_material |
|
|
|
|
|
features.extend( |
|
[ |
|
white_material / 39.0, |
|
black_material / 39.0, |
|
material_balance / 39.0, |
|
abs(material_balance) / 39.0, |
|
] |
|
) |
|
|
|
|
|
total_material = white_material + black_material |
|
game_phase = total_material / 78.0 |
|
features.extend( |
|
[ |
|
game_phase, |
|
1 - game_phase, |
|
min(game_phase * 2, 1), |
|
max(0, min((game_phase - 0.3) * 2, 1)), |
|
] |
|
) |
|
|
|
return np.array(features, dtype=np.float32) |
|
|
|
|
|
class MultiHeadAttention(nn.Module): |
|
"""Multi-head attention mechanism for position encoding""" |
|
|
|
def __init__(self, d_model, num_heads): |
|
super().__init__() |
|
self.d_model = d_model |
|
self.num_heads = num_heads |
|
self.d_k = d_model // num_heads |
|
|
|
self.W_q = nn.Linear(d_model, d_model) |
|
self.W_k = nn.Linear(d_model, d_model) |
|
self.W_v = nn.Linear(d_model, d_model) |
|
self.W_o = nn.Linear(d_model, d_model) |
|
|
|
def forward(self, x): |
|
batch_size = x.size(0) |
|
|
|
|
|
Q = self.W_q(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2) |
|
K = self.W_k(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2) |
|
V = self.W_v(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2) |
|
|
|
|
|
scores = torch.matmul(Q, K.transpose(-2, -1)) / np.sqrt(self.d_k) |
|
attn = F.softmax(scores, dim=-1) |
|
context = torch.matmul(attn, V) |
|
|
|
|
|
context = ( |
|
context.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model) |
|
) |
|
output = self.W_o(context) |
|
|
|
return output.mean(dim=1) |
|
|
|
|
|
class AdvancedMagnusModel(nn.Module): |
|
"""Advanced Magnus model architecture matching the trained model""" |
|
|
|
def __init__(self, vocab_size: int, feature_dim: int = 8): |
|
super().__init__() |
|
self.vocab_size = vocab_size |
|
|
|
|
|
self.board_encoder = nn.Sequential( |
|
nn.Linear(768, 1024), |
|
nn.BatchNorm1d(1024), |
|
nn.ReLU(), |
|
nn.Dropout(0.2), |
|
nn.Linear(1024, 512), |
|
nn.BatchNorm1d(512), |
|
nn.ReLU(), |
|
nn.Dropout(0.2), |
|
nn.Linear(512, 256), |
|
nn.BatchNorm1d(256), |
|
nn.ReLU(), |
|
) |
|
|
|
|
|
self.board_attention = MultiHeadAttention(256, 8) |
|
|
|
|
|
self.feature_encoder = nn.Sequential( |
|
nn.Linear(feature_dim, 64), |
|
nn.BatchNorm1d(64), |
|
nn.ReLU(), |
|
nn.Dropout(0.1), |
|
nn.Linear(64, 32), |
|
nn.ReLU(), |
|
) |
|
|
|
|
|
combined_dim = 256 + 32 |
|
self.feature_combiner = nn.Sequential( |
|
nn.Linear(combined_dim, 512), |
|
nn.BatchNorm1d(512), |
|
nn.ReLU(), |
|
nn.Dropout(0.3), |
|
nn.Linear(512, 256), |
|
nn.BatchNorm1d(256), |
|
nn.ReLU(), |
|
nn.Dropout(0.2), |
|
) |
|
|
|
|
|
self.move_predictor = nn.Sequential( |
|
nn.Linear(256, 512), |
|
nn.ReLU(), |
|
nn.Dropout(0.3), |
|
nn.Linear(512, vocab_size), |
|
) |
|
|
|
|
|
self.eval_predictor = nn.Sequential( |
|
nn.Linear(256, 128), |
|
nn.ReLU(), |
|
nn.Dropout(0.2), |
|
nn.Linear(128, 64), |
|
nn.ReLU(), |
|
nn.Linear(64, 1), |
|
nn.Tanh(), |
|
) |
|
|
|
def forward(self, position, features): |
|
|
|
board_enc = self.board_encoder(position) |
|
|
|
|
|
if len(board_enc.shape) == 2: |
|
board_enc_reshaped = board_enc.unsqueeze(1) |
|
board_att = self.board_attention(board_enc_reshaped) |
|
else: |
|
board_att = self.board_attention(board_enc) |
|
|
|
|
|
feature_enc = self.feature_encoder(features) |
|
|
|
|
|
combined = torch.cat([board_att, feature_enc], dim=1) |
|
combined = self.feature_combiner(combined) |
|
|
|
|
|
move_logits = self.move_predictor(combined) |
|
eval_pred = self.eval_predictor(combined) |
|
|
|
return move_logits, eval_pred |
|
|
|
|
|
class AdvancedMagnusPredictor: |
|
"""Advanced Magnus model predictor for FastAPI backend""" |
|
|
|
def __init__(self, model_path: Optional[str] = None): |
|
self.device = self._get_device() |
|
self.model = None |
|
self.move_to_idx = {} |
|
self.idx_to_move = {} |
|
self.vocab_size = 0 |
|
self.model_config = {} |
|
self.feature_extractor = AdvancedChessFeatureExtractor() |
|
|
|
|
|
if model_path is None: |
|
model_path = self._get_latest_mlflow_model() |
|
|
|
if model_path and Path(model_path).exists(): |
|
self.load_model(model_path) |
|
else: |
|
print(f"⚠️ Model path not found: {model_path}") |
|
|
|
def _get_device(self): |
|
"""Get the best available device""" |
|
if torch.backends.mps.is_available(): |
|
return torch.device("mps") |
|
elif torch.cuda.is_available(): |
|
return torch.device("cuda") |
|
else: |
|
return torch.device("cpu") |
|
|
|
def _get_latest_mlflow_model(self): |
|
"""Get the latest MLflow model path""" |
|
|
|
possible_paths = [ |
|
project_root |
|
/ "mlruns" |
|
/ "427589957554434254" |
|
/ "cbb3fccf10b64db5a8985add8bcac5ef" |
|
/ "artifacts" |
|
/ "model_artifacts", |
|
Path(__file__).parent.parent |
|
/ "mlruns" |
|
/ "427589957554434254" |
|
/ "cbb3fccf10b64db5a8985add8bcac5ef" |
|
/ "artifacts" |
|
/ "model_artifacts", |
|
Path( |
|
"/Users/levandalbashvili/Documents/GitHub/What-Would---DO/mlruns/427589957554434254/cbb3fccf10b64db5a8985add8bcac5ef/artifacts/model_artifacts" |
|
), |
|
] |
|
|
|
for path in possible_paths: |
|
if path.exists(): |
|
print(f"✅ Found model at: {path}") |
|
return str(path) |
|
|
|
print(f"❌ Model not found in any of these paths:") |
|
for path in possible_paths: |
|
print(f" - {path}") |
|
return None |
|
|
|
def load_model(self, model_path: str): |
|
"""Load the trained model""" |
|
try: |
|
model_path = Path(model_path) |
|
|
|
|
|
config_file = model_path / "config.yaml" |
|
if config_file.exists(): |
|
with open(config_file, "r") as f: |
|
self.model_config = yaml.safe_load(f) |
|
print(f"✅ Loaded model config: {config_file}") |
|
|
|
|
|
version_file = model_path / "version.json" |
|
if version_file.exists(): |
|
with open(version_file, "r") as f: |
|
version_info = json.load(f) |
|
print(f"✅ Model version: {version_info.get('model_id', 'unknown')}") |
|
|
|
|
|
model_file = model_path / "model.pth" |
|
if not model_file.exists(): |
|
raise FileNotFoundError(f"Model file not found: {model_file}") |
|
|
|
checkpoint = torch.load(model_file, map_location=self.device) |
|
|
|
|
|
if "model_state_dict" in checkpoint: |
|
model_state = checkpoint["model_state_dict"] |
|
self.move_to_idx = checkpoint.get("move_to_idx", {}) |
|
self.idx_to_move = checkpoint.get("idx_to_move", {}) |
|
self.vocab_size = checkpoint.get("vocab_size", len(self.move_to_idx)) |
|
else: |
|
|
|
model_state = checkpoint |
|
|
|
|
|
vocab_size = self.model_config.get("vocab_size", 2000) |
|
self.vocab_size = vocab_size |
|
|
|
|
|
if not self.move_to_idx or len(self.move_to_idx) == 0: |
|
print( |
|
"⚠️ Move vocabulary not found in checkpoint, creating from chess games" |
|
) |
|
|
|
vocab_size = self.model_config.get("data", {}).get("vocab_size", 945) |
|
self.vocab_size = vocab_size |
|
self._create_vocabulary_from_games() |
|
|
|
|
|
vocab_size = self.model_config.get("data", {}).get("vocab_size", 945) |
|
feature_dim = 8 |
|
self.vocab_size = vocab_size |
|
self.model = AdvancedMagnusModel(self.vocab_size, feature_dim).to( |
|
self.device |
|
) |
|
|
|
|
|
self.model.load_state_dict(model_state) |
|
self.model.eval() |
|
|
|
total_params = sum(p.numel() for p in self.model.parameters()) |
|
print(f"✅ Advanced Magnus model loaded successfully!") |
|
print(f" Device: {self.device}") |
|
print(f" Parameters: {total_params:,}") |
|
print(f" Vocabulary size: {self.vocab_size}") |
|
print(f" Model path: {model_path}") |
|
|
|
except Exception as e: |
|
print(f"❌ Error loading model: {e}") |
|
self.model = None |
|
|
|
def _create_vocabulary_from_games(self): |
|
"""Create vocabulary from actual chess games (like the training data)""" |
|
print("🔧 Creating vocabulary from Magnus Carlsen games...") |
|
|
|
moves = set() |
|
|
|
|
|
pgn_paths = [ |
|
Path(__file__).parent / "data_processing" / "carlsen-games-quarter.pgn", |
|
Path(__file__).parent / "data_processing" / "carlsen-games.pgn", |
|
] |
|
|
|
games_processed = 0 |
|
for pgn_path in pgn_paths: |
|
if pgn_path.exists(): |
|
print(f"📖 Reading moves from {pgn_path.name}...") |
|
try: |
|
with open(pgn_path, "r") as f: |
|
while True: |
|
game = chess.pgn.read_game(f) |
|
if game is None: |
|
break |
|
|
|
|
|
board = game.board() |
|
for move in game.mainline_moves(): |
|
moves.add(move.uci()) |
|
board.push(move) |
|
|
|
games_processed += 1 |
|
if games_processed % 100 == 0: |
|
print( |
|
f" Processed {games_processed} games, {len(moves)} unique moves" |
|
) |
|
|
|
|
|
if games_processed >= 500: |
|
break |
|
|
|
if moves: |
|
break |
|
|
|
except Exception as e: |
|
print(f" ⚠️ Error reading {pgn_path}: {e}") |
|
continue |
|
|
|
|
|
if not moves: |
|
print("📝 Falling back to comprehensive UCI move generation...") |
|
moves = self._generate_comprehensive_uci_moves() |
|
|
|
|
|
moves_list = sorted(list(moves)) |
|
if len(moves_list) > self.vocab_size: |
|
|
|
basic_moves = [] |
|
promotion_moves = [] |
|
other_moves = [] |
|
|
|
for move in moves_list: |
|
if len(move) == 5 and move[4] in "qrbn": |
|
promotion_moves.append(move) |
|
elif len(move) == 4: |
|
basic_moves.append(move) |
|
else: |
|
other_moves.append(move) |
|
|
|
|
|
moves_list = (basic_moves + promotion_moves + other_moves)[ |
|
: self.vocab_size |
|
] |
|
|
|
|
|
while len(moves_list) < self.vocab_size: |
|
moves_list.append(f"null_move_{len(moves_list)}") |
|
|
|
self.move_to_idx = {move: idx for idx, move in enumerate(moves_list)} |
|
self.idx_to_move = {idx: move for move, idx in self.move_to_idx.items()} |
|
|
|
print( |
|
f"✅ Created vocabulary with {len(self.move_to_idx)} moves from {games_processed} games" |
|
) |
|
print(f" Sample moves: {moves_list[:10]}") |
|
print(f" Last moves: {moves_list[-10:]}") |
|
|
|
def _generate_comprehensive_uci_moves(self): |
|
"""Generate comprehensive UCI moves as fallback""" |
|
moves = set() |
|
files = "abcdefgh" |
|
ranks = "12345678" |
|
|
|
|
|
for from_file in files: |
|
for from_rank in ranks: |
|
for to_file in files: |
|
for to_rank in ranks: |
|
from_sq = from_file + from_rank |
|
to_sq = to_file + to_rank |
|
if from_sq != to_sq: |
|
moves.add(from_sq + to_sq) |
|
|
|
|
|
promotion_pieces = ["q", "r", "b", "n"] |
|
for from_file in files: |
|
for to_file in files: |
|
|
|
for piece in promotion_pieces: |
|
moves.add(f"{from_file}7{to_file}8{piece}") |
|
|
|
for piece in promotion_pieces: |
|
moves.add(f"{from_file}2{to_file}1{piece}") |
|
|
|
return moves |
|
|
|
def board_to_tensor(self, board: chess.Board) -> torch.Tensor: |
|
"""Convert chess board to tensor representation""" |
|
|
|
board_tensor = np.zeros((8, 8, 12), dtype=np.float32) |
|
|
|
piece_map = { |
|
chess.PAWN: 0, |
|
chess.ROOK: 1, |
|
chess.KNIGHT: 2, |
|
chess.BISHOP: 3, |
|
chess.QUEEN: 4, |
|
chess.KING: 5, |
|
} |
|
|
|
for square in chess.SQUARES: |
|
piece = board.piece_at(square) |
|
if piece is not None: |
|
rank, file = divmod(square, 8) |
|
piece_type = piece_map[piece.piece_type] |
|
color_offset = 0 if piece.color == chess.WHITE else 6 |
|
board_tensor[rank, file, piece_type + color_offset] = 1.0 |
|
|
|
return torch.FloatTensor(board_tensor.flatten()) |
|
|
|
def extract_features(self, board: chess.Board) -> torch.Tensor: |
|
"""Extract advanced features from the board position""" |
|
|
|
fen = board.fen() |
|
|
|
|
|
features = self.feature_extractor.extract_features(fen) |
|
|
|
return torch.FloatTensor(features) |
|
|
|
def predict_moves(self, board: chess.Board, top_k: int = 5) -> List[Dict[str, Any]]: |
|
"""Predict top-k moves prioritizing best moves with Magnus style flavor""" |
|
if self.model is None: |
|
return [{"move": "e2e4", "confidence": 0.5, "evaluation": 0.0}] |
|
|
|
try: |
|
|
|
legal_moves = list(board.legal_moves) |
|
|
|
if not legal_moves: |
|
return [] |
|
|
|
|
|
predictions = [] |
|
|
|
|
|
try: |
|
import chess.engine |
|
|
|
with chess.engine.SimpleEngine.popen_uci( |
|
"/opt/homebrew/bin/stockfish" |
|
) as engine: |
|
|
|
main_info = engine.analyse(board, chess.engine.Limit(time=0.1)) |
|
|
|
for legal_move in legal_moves: |
|
|
|
board_copy = board.copy() |
|
board_copy.push(legal_move) |
|
|
|
try: |
|
|
|
move_info = engine.analyse( |
|
board_copy, chess.engine.Limit(time=0.03) |
|
) |
|
move_score = move_info.get( |
|
"score", |
|
chess.engine.PovScore( |
|
chess.engine.Cp(0), board_copy.turn |
|
), |
|
) |
|
|
|
|
|
if move_score.is_mate(): |
|
if move_score.mate() > 0: |
|
engine_quality = 0.95 |
|
else: |
|
engine_quality = 0.05 |
|
else: |
|
|
|
cp_score = move_score.white().score(mate_score=10000) |
|
if not board.turn: |
|
cp_score = -cp_score |
|
|
|
|
|
engine_quality = max( |
|
0.1, min(0.9, 0.5 + cp_score / 300) |
|
) |
|
|
|
except: |
|
engine_quality = 0.5 |
|
|
|
|
|
magnus_bonus = 0.0 |
|
move_uci = legal_move.uci() |
|
|
|
|
|
if move_uci in self.move_to_idx: |
|
try: |
|
with torch.no_grad(): |
|
position_tensor = ( |
|
self.board_to_tensor(board) |
|
.unsqueeze(0) |
|
.to(self.device) |
|
) |
|
features_tensor = ( |
|
self.extract_features(board) |
|
.unsqueeze(0) |
|
.to(self.device) |
|
) |
|
move_logits, _ = self.model( |
|
position_tensor, features_tensor |
|
) |
|
move_probs = F.softmax(move_logits, dim=1) |
|
|
|
idx = self.move_to_idx[move_uci] |
|
magnus_style_score = float( |
|
move_probs[0, idx].item() |
|
) |
|
magnus_bonus = ( |
|
magnus_style_score * 0.1 |
|
) |
|
except: |
|
magnus_bonus = 0.0 |
|
|
|
|
|
heuristic_bonus = self._calculate_heuristic_bonus( |
|
board, legal_move |
|
) |
|
|
|
|
|
final_confidence = ( |
|
0.8 * engine_quality |
|
+ 0.1 * magnus_bonus |
|
+ 0.1 * heuristic_bonus |
|
) |
|
|
|
predictions.append( |
|
{ |
|
"move": move_uci, |
|
"confidence": final_confidence, |
|
"evaluation": ( |
|
cp_score if "cp_score" in locals() else 0.0 |
|
), |
|
"engine_quality": engine_quality, |
|
"magnus_bonus": magnus_bonus, |
|
"heuristic_bonus": heuristic_bonus, |
|
"is_legal": True, |
|
"approach": "engine_primary", |
|
} |
|
) |
|
|
|
except Exception as e: |
|
print(f"Engine analysis failed, using heuristics only: {e}") |
|
|
|
for legal_move in legal_moves: |
|
move_uci = legal_move.uci() |
|
|
|
|
|
heuristic_score = self._calculate_comprehensive_heuristic_score( |
|
board, legal_move |
|
) |
|
|
|
|
|
magnus_bonus = 0.0 |
|
if move_uci in self.move_to_idx: |
|
try: |
|
with torch.no_grad(): |
|
position_tensor = ( |
|
self.board_to_tensor(board) |
|
.unsqueeze(0) |
|
.to(self.device) |
|
) |
|
features_tensor = ( |
|
self.extract_features(board) |
|
.unsqueeze(0) |
|
.to(self.device) |
|
) |
|
move_logits, _ = self.model( |
|
position_tensor, features_tensor |
|
) |
|
move_probs = F.softmax(move_logits, dim=1) |
|
|
|
idx = self.move_to_idx[move_uci] |
|
magnus_style_score = float(move_probs[0, idx].item()) |
|
magnus_bonus = ( |
|
magnus_style_score * 0.2 |
|
) |
|
except: |
|
magnus_bonus = 0.0 |
|
|
|
final_confidence = 0.8 * heuristic_score + 0.2 * magnus_bonus |
|
|
|
predictions.append( |
|
{ |
|
"move": move_uci, |
|
"confidence": final_confidence, |
|
"evaluation": 0.0, |
|
"heuristic_score": heuristic_score, |
|
"magnus_bonus": magnus_bonus, |
|
"is_legal": True, |
|
"approach": "heuristic_primary", |
|
} |
|
) |
|
|
|
|
|
predictions.sort(key=lambda x: x["confidence"], reverse=True) |
|
return predictions[:top_k] |
|
|
|
except Exception as e: |
|
print(f"❌ Prediction error: {e}") |
|
|
|
legal_moves = list(board.legal_moves) |
|
if legal_moves: |
|
return [ |
|
{ |
|
"move": legal_moves[i % len(legal_moves)].uci(), |
|
"confidence": max(0.15 - i * 0.02, 0.05), |
|
"evaluation": 0.0, |
|
"error": str(e), |
|
"approach": "fallback", |
|
} |
|
for i in range(min(top_k, len(legal_moves))) |
|
] |
|
else: |
|
return [ |
|
{ |
|
"move": "e2e4", |
|
"confidence": 0.1, |
|
"evaluation": 0.0, |
|
"error": str(e), |
|
} |
|
] |
|
|
|
def _calculate_heuristic_bonus(self, board: chess.Board, move: chess.Move) -> float: |
|
"""Calculate a small heuristic bonus for the move""" |
|
bonus = 0.0 |
|
piece = board.piece_at(move.from_square) |
|
|
|
if piece: |
|
|
|
center_squares = [chess.E4, chess.E5, chess.D4, chess.D5] |
|
if move.to_square in center_squares: |
|
bonus += 0.05 |
|
|
|
|
|
if ( |
|
piece.piece_type in [chess.KNIGHT, chess.BISHOP] |
|
and board.fullmove_number <= 10 |
|
): |
|
bonus += 0.03 |
|
|
|
|
|
if board.is_capture(move): |
|
captured = board.piece_at(move.to_square) |
|
if captured: |
|
piece_values = { |
|
chess.PAWN: 1, |
|
chess.KNIGHT: 3, |
|
chess.BISHOP: 3, |
|
chess.ROOK: 5, |
|
chess.QUEEN: 9, |
|
} |
|
if piece_values.get(captured.piece_type, 0) >= piece_values.get( |
|
piece.piece_type, 0 |
|
): |
|
bonus += 0.04 |
|
|
|
|
|
if board.gives_check(move): |
|
bonus += 0.02 |
|
|
|
|
|
if board.is_castling(move) and board.fullmove_number <= 15: |
|
bonus += 0.06 |
|
|
|
return min(bonus, 0.15) |
|
|
|
def _calculate_comprehensive_heuristic_score( |
|
self, board: chess.Board, move: chess.Move |
|
) -> float: |
|
"""Calculate a comprehensive heuristic score for a move (used when engine is unavailable)""" |
|
score = 0.5 |
|
piece = board.piece_at(move.from_square) |
|
|
|
if piece: |
|
|
|
piece_values = { |
|
chess.PAWN: 1, |
|
chess.KNIGHT: 3, |
|
chess.BISHOP: 3, |
|
chess.ROOK: 5, |
|
chess.QUEEN: 9, |
|
chess.KING: 0, |
|
} |
|
|
|
|
|
center_squares = [chess.E4, chess.E5, chess.D4, chess.D5] |
|
extended_center = [ |
|
chess.C3, |
|
chess.C4, |
|
chess.C5, |
|
chess.C6, |
|
chess.D3, |
|
chess.D6, |
|
chess.E3, |
|
chess.E6, |
|
chess.F3, |
|
chess.F4, |
|
chess.F5, |
|
chess.F6, |
|
] |
|
|
|
if move.to_square in center_squares: |
|
score += 0.15 |
|
elif move.to_square in extended_center: |
|
score += 0.08 |
|
|
|
|
|
if board.fullmove_number <= 10: |
|
if piece.piece_type in [chess.KNIGHT, chess.BISHOP]: |
|
score += 0.12 |
|
elif ( |
|
piece.piece_type == chess.PAWN and move.to_square in center_squares |
|
): |
|
score += 0.10 |
|
|
|
|
|
if board.is_capture(move): |
|
captured = board.piece_at(move.to_square) |
|
if captured: |
|
material_gain = piece_values.get( |
|
captured.piece_type, 0 |
|
) - piece_values.get(piece.piece_type, 0) |
|
if material_gain >= 0: |
|
score += min(0.2, 0.05 + material_gain * 0.02) |
|
else: |
|
score -= 0.1 |
|
|
|
|
|
if board.is_castling(move): |
|
score += 0.15 |
|
|
|
|
|
if board.gives_check(move): |
|
score += 0.05 |
|
|
|
|
|
if board.fullmove_number <= 8: |
|
|
|
moves_history = list(board.move_stack) |
|
piece_moved_before = any( |
|
m.from_square == move.from_square for m in moves_history[-6:] |
|
) |
|
if piece_moved_before and piece.piece_type != chess.PAWN: |
|
score -= 0.08 |
|
|
|
return max(0.1, min(0.9, score)) |
|
|
|
def predict_moves_with_engine_guidance( |
|
self, |
|
board: chess.Board, |
|
top_k: int = 5, |
|
engine_path: str = "/opt/homebrew/bin/stockfish", |
|
) -> List[Dict[str, Any]]: |
|
"""Predict moves combining Magnus style with engine guidance for better quality""" |
|
try: |
|
import chess.engine |
|
|
|
|
|
magnus_predictions = self.predict_moves( |
|
board, top_k * 2 |
|
) |
|
|
|
|
|
with chess.engine.SimpleEngine.popen_uci(engine_path) as engine: |
|
|
|
info = engine.analyse( |
|
board, chess.engine.Limit(time=0.1), multipv=top_k |
|
) |
|
|
|
enhanced_predictions = [] |
|
|
|
for pred in magnus_predictions: |
|
move_uci = pred["move"] |
|
try: |
|
move = chess.Move.from_uci(move_uci) |
|
if move in board.legal_moves: |
|
|
|
board_copy = board.copy() |
|
board_copy.push(move) |
|
|
|
try: |
|
eval_info = engine.analyse( |
|
board_copy, chess.engine.Limit(time=0.05) |
|
) |
|
score = eval_info.get("score") |
|
|
|
|
|
engine_confidence = 0.5 |
|
if score: |
|
if score.is_mate(): |
|
if score.mate() > 0: |
|
engine_confidence = 0.95 |
|
else: |
|
engine_confidence = 0.05 |
|
else: |
|
cp_score = score.white().score(mate_score=10000) |
|
if board.turn == chess.BLACK: |
|
cp_score = -cp_score |
|
|
|
engine_confidence = max( |
|
0.1, min(0.9, 0.5 + cp_score / 500) |
|
) |
|
|
|
|
|
magnus_weight = 0.6 |
|
engine_weight = 0.4 |
|
|
|
blended_confidence = ( |
|
magnus_weight * pred["confidence"] |
|
+ engine_weight * engine_confidence |
|
) |
|
|
|
enhanced_predictions.append( |
|
{ |
|
"move": move_uci, |
|
"confidence": blended_confidence, |
|
"evaluation": pred.get("evaluation", 0.0), |
|
"magnus_confidence": pred["confidence"], |
|
"engine_confidence": engine_confidence, |
|
"style": "magnus_engine_hybrid", |
|
} |
|
) |
|
|
|
except: |
|
|
|
enhanced_predictions.append(pred) |
|
except: |
|
continue |
|
|
|
|
|
enhanced_predictions.sort(key=lambda x: x["confidence"], reverse=True) |
|
return enhanced_predictions[:top_k] |
|
|
|
except Exception as e: |
|
print(f"Engine guidance failed, falling back to Magnus-only: {e}") |
|
return self.predict_moves(board, top_k) |
|
|
|
def _apply_chess_heuristics( |
|
self, board: chess.Board, predictions: List[Dict[str, Any]] |
|
) -> List[Dict[str, Any]]: |
|
"""Apply chess heuristics to improve prediction quality""" |
|
|
|
for pred in predictions: |
|
move_uci = pred["move"] |
|
try: |
|
move = chess.Move.from_uci(move_uci) |
|
confidence_boost = 0.0 |
|
|
|
|
|
piece = board.piece_at(move.from_square) |
|
if piece: |
|
|
|
center_squares = [chess.E4, chess.E5, chess.D4, chess.D5] |
|
if move.to_square in center_squares: |
|
confidence_boost += 0.02 |
|
|
|
|
|
if piece.piece_type in [chess.KNIGHT, chess.BISHOP]: |
|
if board.fullmove_number <= 10: |
|
confidence_boost += 0.03 |
|
|
|
|
|
if board.is_capture(move): |
|
captured_piece = board.piece_at(move.to_square) |
|
if captured_piece: |
|
|
|
piece_values = { |
|
chess.PAWN: 1, |
|
chess.KNIGHT: 3, |
|
chess.BISHOP: 3, |
|
chess.ROOK: 5, |
|
chess.QUEEN: 9, |
|
} |
|
capture_value = piece_values.get( |
|
captured_piece.piece_type, 0 |
|
) |
|
attacking_value = piece_values.get(piece.piece_type, 0) |
|
if capture_value >= attacking_value: |
|
confidence_boost += 0.04 |
|
|
|
|
|
if board.gives_check(move): |
|
confidence_boost += 0.02 |
|
|
|
|
|
if board.is_castling(move) and board.fullmove_number <= 15: |
|
confidence_boost += 0.05 |
|
|
|
|
|
pred["confidence"] = min(0.95, pred["confidence"] + confidence_boost) |
|
pred["heuristic_boost"] = confidence_boost |
|
|
|
except Exception as e: |
|
|
|
pred["heuristic_boost"] = 0.0 |
|
|
|
return predictions |
|
|
|
def is_loaded(self) -> bool: |
|
"""Check if the model is successfully loaded""" |
|
return self.model is not None |
|
|
|
|
|
|
|
_magnus_predictor = None |
|
|
|
|
|
def get_magnus_predictor() -> AdvancedMagnusPredictor: |
|
"""Get the global Magnus predictor instance""" |
|
global _magnus_predictor |
|
if _magnus_predictor is None: |
|
_magnus_predictor = AdvancedMagnusPredictor() |
|
return _magnus_predictor |
|
|
|
|
|
def test_predictor(): |
|
"""Test the predictor with a simple position""" |
|
predictor = AdvancedMagnusPredictor() |
|
|
|
if predictor.is_loaded(): |
|
board = chess.Board() |
|
predictions = predictor.predict_moves(board, top_k=3) |
|
|
|
print("🧪 Test Predictions:") |
|
for i, pred in enumerate(predictions, 1): |
|
print(f" {i}. {pred['move']} (confidence: {pred['confidence']:.3f})") |
|
else: |
|
print("❌ Predictor not loaded") |
|
|
|
|
|
if __name__ == "__main__": |
|
test_predictor() |
|
|