import os import asyncio from dotenv import load_dotenv from mcp.server.fastmcp import FastMCP from utils.pokemon_utils import ( start_ladder_battle, start_battle_against_player, submit_move_for_battle, get_battle_state, list_active_battles, download_battle_replay, cleanup_completed_battles, format_battle_state, check_recent_battles, debug_move_attributes, active_battles, player_instances ) from utils.pokemon_utils import player_instances_by_unique load_dotenv() # --- MCP Server Setup --- mcp = FastMCP( name="PokemonBattleAgent", host="0.0.0.0", port=7860, ) current_session = { 'username': None, 'active_battle_id': None } @mcp.tool() def battle_player(opponent_username: str, username: str = "MCPTrainer") -> dict: """ Start a battle against a specific player. Args: opponent_username (str): Username of the opponent player username (str): Username for the MCP-controlled player Returns: dict: Battle information including battle ID and initial state """ global current_session try: challenge_result = start_battle_against_player(username, opponent_username) current_session['username'] = username return { "status": "challenge_queued", "message": f"Challenge to {opponent_username} queued. Battle will start when accepted.", "opponent": opponent_username, "player_base_username": challenge_result.get('player_base_username'), "player_unique_username": challenge_result.get('player_unique_username'), "instructions": "Use find_recent_battles() or get_player_status() in a few seconds to get the battle ID and viewing link once the battle starts.", "note": "Check server console for 'Challenge sent' confirmation." } except Exception as e: return { "status": "error", "message": f"Failed to challenge player: {str(e)}" } @mcp.tool() async def choose_move(move_name: str, battle_id: str = None) -> dict: """ Choose and execute a move in the current battle. Args: move_name (str): Name or ID of the move to use battle_id (str, optional): Specific battle ID (uses current active battle if not provided) Returns: dict: Result of the move and updated battle state """ global current_session try: # Use provided battle_id or current active battle target_battle_id = battle_id or current_session.get('active_battle_id') if not target_battle_id: return { "status": "error", "message": "No active battle. Start a battle first." } # Submit move result = await submit_move_for_battle(target_battle_id, move_name=move_name) # Get updated battle state battle_info = get_battle_state(target_battle_id) return { "status": "success", "message": result, "battle_state": battle_info['battle_state'], "waiting_for_move": battle_info['waiting_for_move'] } except Exception as e: return { "status": "error", "message": f"Failed to execute move: {str(e)}" } @mcp.tool() async def switch_pokemon(pokemon_name: str, battle_id: str = None) -> dict: """ Switch to a different Pokemon in the current battle. Args: pokemon_name (str): Name of the Pokemon to switch to battle_id (str, optional): Specific battle ID (uses current active battle if not provided) Returns: dict: Result of the switch and updated battle state """ global current_session try: # Use provided battle_id or current active battle target_battle_id = battle_id or current_session.get('active_battle_id') if not target_battle_id: return { "status": "error", "message": "No active battle. Start a battle first." } # Submit switch result = await submit_move_for_battle(target_battle_id, pokemon_name=pokemon_name) # Get updated battle state battle_info = get_battle_state(target_battle_id) return { "status": "success", "message": result, "battle_state": battle_info['battle_state'], "waiting_for_move": battle_info['waiting_for_move'] } except Exception as e: return { "status": "error", "message": f"Failed to switch Pokemon: {str(e)}" } @mcp.tool() def get_current_battle_state(battle_id: str = None) -> dict: """ Get the current state of a battle. Args: battle_id (str, optional): Specific battle ID (uses current active battle if not provided) Returns: dict: Current battle state with all relevant information """ global current_session try: # Use provided battle_id or current active battle target_battle_id = battle_id or current_session.get('active_battle_id') if not target_battle_id: return { "status": "error", "message": "No active battle. Start a battle first." } battle_info = get_battle_state(target_battle_id) return { "status": "success", "battle_id": target_battle_id, **battle_info } except Exception as e: return { "status": "error", "message": f"Failed to get battle state: {str(e)}" } @mcp.tool() def list_battles() -> dict: """ List all active battles. Returns: dict: List of all active battles and their status """ try: battles = list_active_battles() return { "status": "success", "active_battles": battles, "count": len(battles) } except Exception as e: return { "status": "error", "message": f"Failed to list battles: {str(e)}" } @mcp.tool() async def download_replay(battle_id: str = None) -> dict: """ Download the replay for a completed battle. Args: battle_id (str, optional): Battle ID (uses current active battle if not provided) Returns: dict: Information about the downloaded replay """ global current_session try: # Use provided battle_id or current active battle target_battle_id = battle_id or current_session.get('active_battle_id') if not target_battle_id: return { "status": "error", "message": "No battle ID provided. Specify a battle_id or start a battle first." } replay_path = await download_battle_replay(target_battle_id) return { "status": "success", "message": f"Replay downloaded successfully", "replay_path": replay_path, "battle_id": target_battle_id } except Exception as e: return { "status": "error", "message": f"Failed to download replay: {str(e)}" } @mcp.tool() def get_player_status(username: str = None) -> dict: """ Get the current status of a player including any active battles. Args: username (str, optional): Username to check (uses current session if not provided) Returns: dict: Player status and battle information """ global current_session try: # Use provided username or current session username target_username = username or current_session.get('username') if not target_username: return { "status": "error", "message": "No username provided and no active session." } players_to_check = [] if target_username in player_instances: players_to_check = list(player_instances[target_username].values()) elif target_username in player_instances_by_unique: players_to_check = [player_instances_by_unique[target_username]] else: return { "status": "no_player", "username": target_username, "message": f"No player instance found for {target_username}" } all_battles = [] total_battles = 0 total_won = 0 total_finished = 0 for p in players_to_check: total_battles += len(p.battles) total_won += getattr(p, 'n_won_battles', 0) total_finished += getattr(p, 'n_finished_battles', 0) for battle_id, battle in p.battles.items(): battle_info = { 'battle_id': battle_id, 'battle_url': f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#battle-{battle_id}", 'turn': getattr(battle, 'turn', 0), 'finished': getattr(battle, 'finished', False), 'won': getattr(battle, 'won', None), 'player_unique_username': p.username, 'player_base_username': getattr(p, 'base_username', target_username) } all_battles.append(battle_info) return { "status": "success", "query_username": target_username, "resolved_scope": "base" if target_username in player_instances else "unique", "players": [ { 'player_unique_username': p.username, 'player_base_username': getattr(p, 'base_username', target_username), 'n_battles': len(p.battles) } for p in players_to_check ], "total_battles": total_battles, "battles": all_battles, "n_won_battles": total_won, "n_finished_battles": total_finished } except Exception as e: return { "status": "error", "message": f"Failed to get player status: {str(e)}" } @mcp.tool() def find_recent_battles(username: str = None) -> dict: """ Check for recent battles that may have started after a timeout. Useful when battle requests time out but battles actually started. Args: username (str, optional): Username to check (uses current session if not provided) Returns: dict: List of recent battles with viewing links """ global current_session try: # Use provided username or current session username target_username = username or current_session.get('username') if not target_username: return { "status": "error", "message": "No username provided and no active session." } # Check for recent battles using the imported function recent_battles = check_recent_battles(target_username) if recent_battles: # Update current session with the most recent battle current_session['active_battle_id'] = recent_battles[0]['battle_id'] return { "status": "success", "username": target_username, "recent_battles": recent_battles, "count": len(recent_battles), "message": f"Found {len(recent_battles)} recent battles" if recent_battles else "No recent battles found" } except Exception as e: return { "status": "error", "message": f"Failed to check recent battles: {str(e)}" } @mcp.tool() def cleanup_battles() -> dict: """ Clean up completed battles to free memory. Returns: dict: Status of cleanup operation """ try: cleanup_completed_battles() return { "status": "success", "message": "Completed battles cleaned up successfully" } except Exception as e: return { "status": "error", "message": f"Failed to cleanup battles: {str(e)}" } @mcp.tool() async def wait_30_seconds() -> dict: """ Wait for 30 seconds. Useful for giving processes time to complete or timing operations. Returns: dict: Status of the wait operation """ try: await asyncio.sleep(30) return { "status": "success", "message": "Waited 30 seconds successfully" } except Exception as e: return { "status": "error", "message": f"Failed to wait: {str(e)}" } # --- Server Execution --- if __name__ == "__main__": print(f"Pokemon Battle MCP Server starting on port 7860...") print("Available battle types:") print("- Ladder battles") print("- AI agent battles (OpenAI, Gemini, Mistral, MaxDamage) - NO RANDOM FALLBACKS") print("- Human player battles") print("Running Pokemon Battle MCP server with SSE transport") mcp.run(transport="sse")