File size: 13,203 Bytes
4512783
 
 
 
d6782f1
4512783
d6782f1
4512783
 
 
 
d6782f1
4512783
 
 
 
 
 
 
623dc66
4512783
 
a06953b
4512783
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a06953b
4512783
 
 
a06953b
4512783
 
 
 
 
d6782f1
 
4512783
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d6782f1
 
 
 
 
 
4512783
 
 
 
 
 
d6782f1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4512783
 
 
d6782f1
 
 
 
 
 
 
 
 
 
 
 
 
4512783
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d5e10b3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4512783
 
 
444a11a
4512783
 
444a11a
4512783
 
4109df9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
import os
import asyncio
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP

from utils.pokemon_utils import (
    start_ladder_battle, start_battle_against_player,
    submit_move_for_battle, get_battle_state, list_active_battles,
    download_battle_replay, cleanup_completed_battles, format_battle_state,
    check_recent_battles, debug_move_attributes, active_battles, player_instances
)
from utils.pokemon_utils import player_instances_by_unique

load_dotenv()

# --- MCP Server Setup ---
mcp = FastMCP(
    name="PokemonBattleAgent",
    host="0.0.0.0",
    port=7860,
)


current_session = {
    'username': None,
    'active_battle_id': None
}


@mcp.tool()
def battle_player(opponent_username: str, username: str = "MCPTrainer") -> dict:
    """
    Start a battle against a specific player.
    Args:
        opponent_username (str): Username of the opponent player
        username (str): Username for the MCP-controlled player
    Returns:
        dict: Battle information including battle ID and initial state
    """
    global current_session
    try:
      
        challenge_result = start_battle_against_player(username, opponent_username)
        
        current_session['username'] = username

        
        return {
            "status": "challenge_queued",
            "message": f"Challenge to {opponent_username} queued. Battle will start when accepted.",
            "opponent": opponent_username,
            "player_base_username": challenge_result.get('player_base_username'),
            "player_unique_username": challenge_result.get('player_unique_username'),
            "instructions": "Use find_recent_battles() or get_player_status() in a few seconds to get the battle ID and viewing link once the battle starts.",
            "note": "Check server console for 'Challenge sent' confirmation."
        }
    except Exception as e:
        return {
            "status": "error",
            "message": f"Failed to challenge player: {str(e)}"
        }

@mcp.tool()
async def choose_move(move_name: str, battle_id: str = None) -> dict:
    """
    Choose and execute a move in the current battle.
    Args:
        move_name (str): Name or ID of the move to use
        battle_id (str, optional): Specific battle ID (uses current active battle if not provided)
    Returns:
        dict: Result of the move and updated battle state
    """
    global current_session
    try:
        # Use provided battle_id or current active battle
        target_battle_id = battle_id or current_session.get('active_battle_id')
        
        if not target_battle_id:
            return {
                "status": "error",
                "message": "No active battle. Start a battle first."
            }
        
        # Submit move
        result = await submit_move_for_battle(target_battle_id, move_name=move_name)
        
        # Get updated battle state
        battle_info = get_battle_state(target_battle_id)
        
        return {
            "status": "success",
            "message": result,
            "battle_state": battle_info['battle_state'],
            "waiting_for_move": battle_info['waiting_for_move']
        }
    except Exception as e:
        return {
            "status": "error",
            "message": f"Failed to execute move: {str(e)}"
        }

@mcp.tool()
async def switch_pokemon(pokemon_name: str, battle_id: str = None) -> dict:
    """
    Switch to a different Pokemon in the current battle.
    Args:
        pokemon_name (str): Name of the Pokemon to switch to
        battle_id (str, optional): Specific battle ID (uses current active battle if not provided)
    Returns:
        dict: Result of the switch and updated battle state
    """
    global current_session
    try:
        # Use provided battle_id or current active battle
        target_battle_id = battle_id or current_session.get('active_battle_id')
        
        if not target_battle_id:
            return {
                "status": "error",
                "message": "No active battle. Start a battle first."
            }
        
        # Submit switch
        result = await submit_move_for_battle(target_battle_id, pokemon_name=pokemon_name)
        
        # Get updated battle state
        battle_info = get_battle_state(target_battle_id)
        
        return {
            "status": "success",
            "message": result,
            "battle_state": battle_info['battle_state'],
            "waiting_for_move": battle_info['waiting_for_move']
        }
    except Exception as e:
        return {
            "status": "error",
            "message": f"Failed to switch Pokemon: {str(e)}"
        }

@mcp.tool()
def get_current_battle_state(battle_id: str = None) -> dict:
    """
    Get the current state of a battle.
    Args:
        battle_id (str, optional): Specific battle ID (uses current active battle if not provided)
    Returns:
        dict: Current battle state with all relevant information
    """
    global current_session
    try:
        # Use provided battle_id or current active battle
        target_battle_id = battle_id or current_session.get('active_battle_id')
        
        if not target_battle_id:
            return {
                "status": "error",
                "message": "No active battle. Start a battle first."
            }
        
        battle_info = get_battle_state(target_battle_id)
        
        return {
            "status": "success",
            "battle_id": target_battle_id,
            **battle_info
        }
    except Exception as e:
        return {
            "status": "error",
            "message": f"Failed to get battle state: {str(e)}"
        }

@mcp.tool()
def list_battles() -> dict:
    """
    List all active battles.
    Returns:
        dict: List of all active battles and their status
    """
    try:
        battles = list_active_battles()
        return {
            "status": "success",
            "active_battles": battles,
            "count": len(battles)
        }
    except Exception as e:
        return {
            "status": "error",
            "message": f"Failed to list battles: {str(e)}"
        }

@mcp.tool()
async def download_replay(battle_id: str = None) -> dict:
    """
    Download the replay for a completed battle.
    Args:
        battle_id (str, optional): Battle ID (uses current active battle if not provided)
    Returns:
        dict: Information about the downloaded replay
    """
    global current_session
    try:
        # Use provided battle_id or current active battle
        target_battle_id = battle_id or current_session.get('active_battle_id')
        
        if not target_battle_id:
            return {
                "status": "error",
                "message": "No battle ID provided. Specify a battle_id or start a battle first."
            }
        
        replay_path = await download_battle_replay(target_battle_id)
        
        return {
            "status": "success",
            "message": f"Replay downloaded successfully",
            "replay_path": replay_path,
            "battle_id": target_battle_id
        }
    except Exception as e:
        return {
            "status": "error",
            "message": f"Failed to download replay: {str(e)}"
        }

@mcp.tool()
def get_player_status(username: str = None) -> dict:
    """
    Get the current status of a player including any active battles.
    Args:
        username (str, optional): Username to check (uses current session if not provided)
    Returns:
        dict: Player status and battle information
    """
    global current_session
    try:
        # Use provided username or current session username
        target_username = username or current_session.get('username')
        
        if not target_username:
            return {
                "status": "error",
                "message": "No username provided and no active session."
            }
        
        players_to_check = []
        if target_username in player_instances:
            players_to_check = list(player_instances[target_username].values())
        elif target_username in player_instances_by_unique:
            players_to_check = [player_instances_by_unique[target_username]]
        else:
            return {
                "status": "no_player",
                "username": target_username,
                "message": f"No player instance found for {target_username}"
            }
        
        all_battles = []
        total_battles = 0
        total_won = 0
        total_finished = 0
        
        for p in players_to_check:
            total_battles += len(p.battles)
            total_won += getattr(p, 'n_won_battles', 0)
            total_finished += getattr(p, 'n_finished_battles', 0)
            for battle_id, battle in p.battles.items():
                battle_info = {
                    'battle_id': battle_id,
                    'battle_url': f"https://jofthomas.com/play.pokemonshowdown.com/testclient.html#battle-{battle_id}",
                    'turn': getattr(battle, 'turn', 0),
                    'finished': getattr(battle, 'finished', False),
                    'won': getattr(battle, 'won', None),
                    'player_unique_username': p.username,
                    'player_base_username': getattr(p, 'base_username', target_username)
                }
                all_battles.append(battle_info)
        
        return {
            "status": "success",
            "query_username": target_username,
            "resolved_scope": "base" if target_username in player_instances else "unique",
            "players": [
                {
                    'player_unique_username': p.username,
                    'player_base_username': getattr(p, 'base_username', target_username),
                    'n_battles': len(p.battles)
                } for p in players_to_check
            ],
            "total_battles": total_battles,
            "battles": all_battles,
            "n_won_battles": total_won,
            "n_finished_battles": total_finished
        }
    except Exception as e:
        return {
            "status": "error",
            "message": f"Failed to get player status: {str(e)}"
        }

@mcp.tool()
def find_recent_battles(username: str = None) -> dict:
    """
    Check for recent battles that may have started after a timeout.
    Useful when battle requests time out but battles actually started.
    Args:
        username (str, optional): Username to check (uses current session if not provided)
    Returns:
        dict: List of recent battles with viewing links
    """
    global current_session
    try:
        # Use provided username or current session username
        target_username = username or current_session.get('username')
        
        if not target_username:
            return {
                "status": "error",
                "message": "No username provided and no active session."
            }
        
        # Check for recent battles using the imported function
        recent_battles = check_recent_battles(target_username)
        
        if recent_battles:
            # Update current session with the most recent battle
            current_session['active_battle_id'] = recent_battles[0]['battle_id']
        
        return {
            "status": "success",
            "username": target_username,
            "recent_battles": recent_battles,
            "count": len(recent_battles),
            "message": f"Found {len(recent_battles)} recent battles" if recent_battles else "No recent battles found"
        }
    except Exception as e:
        return {
            "status": "error",
            "message": f"Failed to check recent battles: {str(e)}"
        }
@mcp.tool()
def cleanup_battles() -> dict:
    """
    Clean up completed battles to free memory.
    Returns:
        dict: Status of cleanup operation
    """
    try:
        cleanup_completed_battles()
        return {
            "status": "success",
            "message": "Completed battles cleaned up successfully"
        }
    except Exception as e:
        return {
            "status": "error",
            "message": f"Failed to cleanup battles: {str(e)}"
        }

@mcp.tool()
async def wait_30_seconds() -> dict:
    """
    Wait for 30 seconds. Useful for giving processes time to complete or timing operations.
    Returns:
        dict: Status of the wait operation
    """
    try:
        await asyncio.sleep(30)
        return {
            "status": "success",
            "message": "Waited 30 seconds successfully"
        }
    except Exception as e:
        return {
            "status": "error",
            "message": f"Failed to wait: {str(e)}"
        }


# --- Server Execution ---
if __name__ == "__main__":
    print(f"Pokemon Battle MCP Server starting on port 7860...")
    print("Available battle types:")
    print("- Ladder battles")
    print("- AI agent battles (OpenAI, Gemini, Mistral, MaxDamage) - NO RANDOM FALLBACKS")
    print("- Human player battles")
    print("Running Pokemon Battle MCP server with SSE transport")
    mcp.run(transport="sse")