from pathlib import Path from typing import Optional import chess import chess.engine import chess.svg import chess.pgn from src.llm.sambanova_wrapper import SambaNovaWrapper from src.util.pgn_util import add_variation, format_pv class ThinkSquareEngine: _ENGINE = str(Path("bin/stockfish").resolve()) llm_commentator = SambaNovaWrapper() @staticmethod def get_best_move(fen: Optional[str] = None, time_limit=0.1): if fen is None: fen = chess.STARTING_FEN board = chess.Board(fen) with chess.engine.SimpleEngine.popen_uci(ThinkSquareEngine._ENGINE) as engine: result = engine.play(board, chess.engine.Limit(time=time_limit)) best_move = result.move bestmove_san = board.san(best_move) return bestmove_san @staticmethod def get_engine_analysis(board, analysis_time=0.1): with chess.engine.SimpleEngine.popen_uci(ThinkSquareEngine._ENGINE) as engine: pre_info = engine.analyse(board, chess.engine.Limit(time=analysis_time)) return pre_info @staticmethod def _perform_post_analysis_and_add_comment( analysis_time, board, played_node, pre_eval, engine_best_move_san, pv, ): post_info = ThinkSquareEngine.get_engine_analysis(board, analysis_time) post_eval = post_info["score"].white().score(mate_score=100000) # Evaluation drop eval_drop = ( (pre_eval - post_eval) if pre_eval is not None and post_eval is not None else 0 ) # Classification if eval_drop > 200: label = "Blunder" elif eval_drop > 100: label = "Mistake" elif eval_drop > 50: label = "Inaccuracy" elif eval_drop < -150: label = "Brilliant" elif eval_drop < -60: label = "Very Good" elif abs(eval_drop) <= 30: label = None else: label = None # "Good" if post_eval is not None: if post_eval > 200: overall_situation = "White is better" elif post_eval > 100: overall_situation = "White has a slight advantage" elif post_eval < -200: overall_situation = "Black is better" elif post_eval < -100: overall_situation = "Black has a slight advantage" else: overall_situation = "No side has a significant advantage" else: overall_situation = None node_reference = None _comment = None variation = None variation_san = None post_eval_score = post_eval if label is not None: comment = f"{label}. " node_reference = played_node _comment = comment if eval_drop > 0 and engine_best_move_san is not None: comment += f"Better was {engine_best_move_san} " _comment = comment # played_node.comment = comment if pv is not None: # add_variation(played_node.parent, pv) variation = pv variation_san = format_pv(pv, played_node.parent.board()) else: # played_node.comment = comment _comment = comment if overall_situation is not None: if _comment is not None: _comment += f"\n Overall, {overall_situation}." else: _comment = f"Overall, {overall_situation}." return node_reference, _comment, variation, variation_san, post_eval_score @staticmethod def annotate(game, analysis_time: float = 0.1, llm_character: Optional[str] = None): if not isinstance(game, chess.pgn.Game): raise ValueError("Input must be a chess.pgn.Game object") if not game.variations: raise ValueError("Game must have at least one variation") if analysis_time <= 0: raise ValueError("Analysis time must be greater than 0") node = game comment_refs = [] node_refs = [] comments = [] variations = [] variation_sans = [] move_numbers = [] played_moves = [] played_by = [] pre_eval_scores = [] post_eval_scores = [] while node.variations: board = node.board() played_node = node.variation(0) played_move = played_node.move # Get engine's best move BEFORE the actual move pre_info = ThinkSquareEngine.get_engine_analysis(board, analysis_time) pre_eval = pre_info["score"].white().score(mate_score=100000) # Best move suggestion engine_best_move = pre_info.get("pv", [None])[0] engine_best_move_san = ( board.san(engine_best_move) if engine_best_move else None ) # Get principal variation (PV) pv = pre_info.get("pv", []) # Make the played move and get new evaluation played_move_san = board.san(played_move) if played_move else None board.push(played_move) if played_move_san != engine_best_move_san: node_referece, _comment, variation, variation_san, post_eval_score = ( ThinkSquareEngine._perform_post_analysis_and_add_comment( analysis_time, board, played_node, pre_eval, engine_best_move_san, pv, ) ) else: node_referece = played_node _comment = "Best move played." variation = None variation_san = None post_info = ThinkSquareEngine.get_engine_analysis(board, analysis_time) post_eval = post_info["score"].white().score(mate_score=100000) post_eval_score = post_eval if node_referece is not None: node_refs.append(node_referece) comments.append(_comment) variations.append(variation) variation_sans.append(variation_san) move_numbers.append(played_node.parent.board().fullmove_number) played_moves.append(played_move_san) played_by.append( "white" if played_node.parent.board().turn else "black" ) pre_eval_scores.append(pre_eval) post_eval_scores.append(post_eval_score) comment_refs.append(len(comment_refs) + 1) node = played_node if llm_character is not None: formatted_comments = ThinkSquareEngine.llm_commentator.comment( character=llm_character, game=str(game), comment_refs=comment_refs, move_nums=move_numbers, comments=comments, move_suggestions=variation_sans, played_moves=played_moves, played_by=played_by, pre_eval_scores=pre_eval_scores, post_eval_scores=post_eval_scores, ) for comment_with_move_num in formatted_comments["comments"]: comment_ref = comment_with_move_num["comment_ref"] comment = comment_with_move_num["comment"] if comment_ref not in comment_refs: raise ValueError( f"Comment reference {comment_ref} not found in comment_refs." ) index = comment_refs.index(comment_ref) comments[index] = comment for node_ref, comment, variation in zip(node_refs, comments, variations): if node_ref is None: continue node_ref.comment = comment if variation is not None: add_variation(node_ref.parent, variation) return game @staticmethod def is_valid_move( move_san: str, fen: Optional[str] = None, ) -> bool: if fen is None: fen = chess.STARTING_FEN board = chess.Board(fen) try: move = board.parse_san(move_san) return board.is_legal(move) except ValueError: return False @staticmethod def get_fen_after_move( move_san: str, fen: Optional[str] = None, ) -> Optional[str]: if fen is None: fen = chess.STARTING_FEN board = chess.Board(fen) try: move = board.parse_san(move_san) if board.is_legal(move): board.push(move) return board.fen() else: return None except ValueError: return None @staticmethod def render_board_ascii(fen: Optional[str] = None) -> str: if fen is None: fen = chess.STARTING_FEN board = chess.Board(fen) orientation = chess.WHITE if board.turn == chess.WHITE else chess.BLACK ascii_board = str(board).split("\n") if orientation == chess.BLACK: # Flip both vertically and horizontally ascii_board = [row[::-1] for row in ascii_board[::-1]] return "\n".join(ascii_board) @staticmethod def render_board_svg(fen: Optional[str] = None): if fen is None: fen = chess.STARTING_FEN board = chess.Board(fen) orientation = chess.WHITE if board.turn == chess.WHITE else chess.BLACK svg = chess.svg.board( board=board, orientation=orientation, size=400, coordinates=True ) return svg @staticmethod def render_board_unicode(fen: Optional[str] = None) -> str: if fen is None: fen = chess.STARTING_FEN board = chess.Board(fen) orientation = chess.WHITE if board.turn == chess.WHITE else chess.BLACK unicode_representation = board.unicode( invert_color=False, borders=True, empty_square=".", orientation=orientation ) return unicode_representation