import json import uuid import logging from typing import Dict, Any, List # Assume GameState is a TypedDict or similar for clarity # from typing import TypedDict # class GameState(TypedDict): # description: str # project_json: Dict[str, Any] # action_plan: Dict[str, Any] # sprite_initial_positions: Dict[str, Any] # Placeholder for actual GameState in your application GameState = Dict[str, Any] logger = logging.getLogger(__name__) # --- Mock Agent for demonstration --- class MockAgent: def invoke(self, payload: Dict[str, Any]) -> Dict[str, Any]: """ Mocks an LLM agent invocation. In a real scenario, this would call your actual LLM API (e.g., through LangChain, LlamaIndex, etc.). """ user_message = payload["messages"][-1]["content"] print(f"\n--- Mock Agent Received Prompt (partial) ---\n{user_message[:500]}...\n------------------------------------------") # Simplified mock responses for demonstration purposes # In a real scenario, the LLM would generate actual Scratch block JSON if "Propose a high-level action flow" in user_message: return { "messages": [{ "content": json.dumps({ "action_overall_flow": { "Sprite1": { "description": "Basic movement and interaction", "plans": [ { "event": "when flag clicked", "logic": "forever loop: move 10 steps, if touching Edge then turn 15 degrees" }, { "event": "when space key pressed", "logic": "say Hello! for 2 seconds" } ] }, "Ball": { "description": "Simple bouncing behavior", "plans": [ { "event": "when flag clicked", "logic": "move 5 steps, if on edge bounce" } ] } } }) }] } elif "You are an AI assistant generating Scratch 3.0 block JSON" in user_message: # This mock response is highly simplified. A real LLM would generate # valid Scratch blocks based on the provided relevant catalog and plan. # We're just demonstrating the *mechanism* of filtering the catalog. if "Sprite1" in user_message: return { "messages": [{ "content": json.dumps({ f"block_id_{generate_block_id()}": { "opcode": "event_whenflagclicked", "next": f"block_id_{generate_block_id()}_forever", "parent": None, "inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 100, "y": 100 }, f"block_id_{generate_block_id()}_forever": { "opcode": "control_forever", "next": None, "parent": f"block_id_{generate_block_id()}", "inputs": { "SUBSTACK": [2, f"block_id_{generate_block_id()}_move"] }, "fields": {}, "shadow": False, "topLevel": False }, f"block_id_{generate_block_id()}_move": { "opcode": "motion_movesteps", "next": f"block_id_{generate_block_id()}_if", "parent": f"block_id_{generate_block_id()}_forever", "inputs": { "STEPS": [1, [4, "10"]] }, "fields": {}, "shadow": False, "topLevel": False }, f"block_id_{generate_block_id()}_if": { "opcode": "control_if", "next": None, "parent": f"block_id_{generate_block_id()}_forever", "inputs": { "CONDITION": [2, f"block_id_{generate_block_id()}_touching"], "SUBSTACK": [2, f"block_id_{generate_block_id()}_turn"] }, "fields": {}, "shadow": False, "topLevel": False }, f"block_id_{generate_block_id()}_touching": { "opcode": "sensing_touchingobject", "next": None, "parent": f"block_id_{generate_block_id()}_if", "inputs": { "TOUCHINGOBJECTMENU": [1, f"block_id_{generate_block_id()}_touching_menu"] }, "fields": {}, "shadow": False, "topLevel": False }, f"block_id_{generate_block_id()}_touching_menu": { "opcode": "sensing_touchingobjectmenu", "next": None, "parent": f"block_id_{generate_block_id()}_touching", "inputs": {}, "fields": {"TOUCHINGOBJECTMENU": ["_edge_", None]}, "shadow": True, "topLevel": False }, f"block_id_{generate_block_id()}_turn": { "opcode": "motion_turnright", "next": None, "parent": f"block_id_{generate_block_id()}_if", "inputs": { "DEGREES": [1, [4, "15"]] }, "fields": {}, "shadow": False, "topLevel": False }, f"block_id_{generate_block_id()}_say": { "opcode": "looks_sayforsecs", "next": None, "parent": None, # This block would typically be part of a separate script "inputs": { "MESSAGE": [1, [10, "Hello!"]], "SECS": [1, [4, "2"]] }, "fields": {}, "shadow": False, "topLevel": True, "x": 300, "y": 100 } }) }] } elif "Ball" in user_message: return { "messages": [{ "content": json.dumps({ f"block_id_{generate_block_id()}": { "opcode": "event_whenflagclicked", "next": f"block_id_{generate_block_id()}_moveball", "parent": None, "inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 100, "y": 100 }, f"block_id_{generate_block_id()}_moveball": { "opcode": "motion_movesteps", "next": f"block_id_{generate_block_id()}_edgebounce", "parent": f"block_id_{generate_block_id()}", "inputs": { "STEPS": [1, [4, "5"]] }, "fields": {}, "shadow": False, "topLevel": False }, f"block_id_{generate_block_id()}_edgebounce": { "opcode": "motion_ifonedgebounce", "next": None, "parent": f"block_id_{generate_block_id()}_moveball", "inputs": {}, "fields": {}, "shadow": False, "topLevel": False } }) }] } return {"messages": [{"content": "[]"}]} # Default empty response agent = MockAgent() # Helper function to generate a unique block ID def generate_block_id(): return str(uuid.uuid4())[:10].replace('-', '') # Shorten for readability, ensure uniqueness # Placeholder for your extract_json_from_llm_response function def extract_json_from_llm_response(response_string): try: # Assuming the LLM response is ONLY the JSON string within triple backticks json_match = response_string.strip().replace("```json", "").replace("```", "").strip() return json.loads(json_match) except json.JSONDecodeError as e: logger.error(f"Failed to decode JSON from LLM response: {e}") logger.error(f"Raw response: {response_string}") raise ValueError("Invalid JSON response from LLM") # --- GLOBAL CATALOG OF ALL SCRATCH BLOCKS --- # This is where you would load your block_content.json # For demonstration, I'm using your provided snippets and adding some common ones. # In a real application, you'd load this once at startup. ALL_SCRATCH_BLOCKS_CATALOG = { "motion_movesteps": { "opcode": "motion_movesteps", "next": None, "parent": None, "inputs": {"STEPS": [1, [4, "10"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 464, "y": -416 }, "motion_turnright": { "opcode": "motion_turnright", "next": None, "parent": None, "inputs": {"DEGREES": [1, [4, "15"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 467, "y": -316 }, "motion_ifonedgebounce": { "opcode": "motion_ifonedgebounce", "next": None, "parent": None, "inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 467, "y": -316 }, "event_whenflagclicked": { "opcode": "event_whenflagclicked", "next": None, "parent": None, "inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10 }, "event_whenkeypressed": { "opcode": "event_whenkeypressed", "next": None, "parent": None, "inputs": {}, "fields": {"KEY_OPTION": ["space", None]}, "shadow": False, "topLevel": True, "x": 10, "y": 10 }, "control_forever": { "opcode": "control_forever", "next": None, "parent": None, "inputs": {"SUBSTACK": [2, "some_id"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10 }, "control_if": { "opcode": "control_if", "next": None, "parent": None, "inputs": {"CONDITION": [2, "some_id"], "SUBSTACK": [2, "some_id_2"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10 }, "looks_sayforsecs": { "opcode": "looks_sayforsecs", "next": None, "parent": None, "inputs": {"MESSAGE": [1, [10, "Hello!"]], "SECS": [1, [4, "2"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10 }, "looks_say": { "opcode": "looks_say", "next": None, "parent": None, "inputs": {"MESSAGE": [1, [10, "Hello!"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10 }, "sensing_touchingobject": { "opcode": "sensing_touchingobject", "next": None, "parent": None, "inputs": {"TOUCHINGOBJECTMENU": [1, "some_id"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10 }, "sensing_touchingobjectmenu": { "opcode": "sensing_touchingobjectmenu", "next": None, "parent": None, "inputs": {}, "fields": {"TOUCHINGOBJECTMENU": ["_mouse_", None]}, "shadow": True, "topLevel": True, "x": 10, "y": 10 }, # Add more blocks from your block_content.json here... } # --- Heuristic-based block selection --- def get_relevant_blocks_for_plan(action_plan: Dict[str, Any], all_blocks_catalog: Dict[str, Any]) -> Dict[str, Any]: """ Analyzes the natural language action plan and selects relevant Scratch blocks from the comprehensive catalog. This is a heuristic approach and might need to be refined based on your specific use cases and LLM capabilities. """ relevant_opcodes = set() # Always include common event blocks relevant_opcodes.add("event_whenflagclicked") relevant_opcodes.add("event_whenkeypressed") # Could be more specific if key is mentioned # Keyword to opcode mapping (can be expanded) keyword_map = { "move": "motion_movesteps", "steps": "motion_movesteps", "turn": "motion_turnright", "rotate": "motion_turnright", "bounce": "motion_ifonedgebounce", "edge": "motion_ifonedgebounce", "forever": "control_forever", "loop": "control_forever", "if": "control_if", "condition": "control_if", "say": "looks_say", "hello": "looks_say", # Simple example, might need more context "touching": "sensing_touchingobject", "mouse pointer": "sensing_touchingobjectmenu", "edge": "sensing_touchingobjectmenu", # For touching edge } # Iterate through the action plan to find keywords for sprite_name, sprite_actions in action_plan.get("action_overall_flow", {}).items(): for plan in sprite_actions.get("plans", []): event_logic = plan.get("event", "").lower() + " " + plan.get("logic", "").lower() # Check for direct opcode matches (if the LLM somehow outputs opcodes in its plan) for opcode in all_blocks_catalog.keys(): if opcode in event_logic: relevant_opcodes.add(opcode) # Check for keywords for keyword, opcode in keyword_map.items(): if keyword in event_logic: relevant_opcodes.add(opcode) # Add associated shadow blocks if known if opcode == "sensing_touchingobject": relevant_opcodes.add("sensing_touchingobjectmenu") if opcode == "event_whenkeypressed": relevant_opcodes.add("event_whenkeypressed") # It's already there but good to be explicit # Construct the filtered catalog relevant_blocks_catalog = { opcode: all_blocks_catalog[opcode] for opcode in relevant_opcodes if opcode in all_blocks_catalog } return relevant_blocks_catalog # --- New Action Planning Node --- def plan_sprite_actions(state: GameState): logger.info("--- Running PlanSpriteActionsNode ---") planning_prompt = ( f"You are an AI assistant tasked with planning Scratch 3.0 block code for a game. " f"The game description is: '{state['description']}'.\n\n" f"Here are the sprites currently in the project: {', '.join(target['name'] for target in state['project_json']['targets'] if not target['isStage']) if len(state['project_json']['targets']) > 1 else 'None'}.\n" f"Initial positions: {json.dumps(state.get('sprite_initial_positions', {}), indent=2)}\n\n" f"Consider the main actions and interactions required for each sprite. " f"Think step-by-step about what each sprite needs to *do*, *when* it needs to do it (events), " f"and if any actions need to *repeat* or depend on *conditions*.\n\n" f"Propose a high-level action flow for each sprite in the following JSON format. " f"Do NOT generate Scratch block JSON yet. Only describe the logic using natural language or simplified pseudo-code.\n\n" f"Example format:\n" f"```json\n" f"{{\n" f" \"action_overall_flow\": {{\n" f" \"Sprite1\": {{\n" f" \"description\": \"Main character actions\",\n" f" \"plans\": [\n" f" {{\n" f" \"event\": \"when flag clicked\",\n" f" \"logic\": \"forever loop: move 10 steps, if on edge bounce\"\n" f" }},\n" f" {{\n" f" \"event\": \"when space key pressed\",\n" f" \"logic\": \"change y by 10, wait 0.1 seconds, change y by -10\"\n" f" }}\n" f" ]\n" f" }},\n" f" \"Ball\": {{\n" f" \"description\": \"Projectile movement\",\n" f" \"plans\": [\n" f" {{\n" f" \"event\": \"when I start as a clone\",\n" f" \"logic\": \"glide 1 sec to random position, if touching Sprite1 then stop this script\"\n" f" }}\n" f" ]\n" f" }}\n" f" }}\n" f"}}\n" f"```\n\n" f"Return ONLY the JSON object for the action overall flow." ) try: response = agent.invoke({"messages": [{"role": "user", "content": planning_prompt}]}) raw_response = response["messages"][-1].content print("Raw response from LLM [PlanSpriteActionsNode]:", raw_response) action_plan = extract_json_from_llm_response(raw_response) logger.info("Sprite action plan generated by PlanSpriteActionsNode.") return {"action_plan": action_plan} except Exception as e: logger.error(f"Error in PlanSpriteActionsNode: {e}") raise # --- Updated Action Node Builder (to consume the plan and build blocks) --- def build_action_nodes(state: GameState): logger.info("--- Running ActionNodeBuilder ---") action_plan = state.get("action_plan", {}) if not action_plan: raise ValueError("No action plan found in state. Run PlanSpriteActionsNode first.") # Convert the Scratch project JSON to a mutable Python object project_json = state["project_json"] targets = project_json["targets"] # We need a way to map sprite names to their actual target objects in project_json sprite_map = {target["name"]: target for target in targets if not target["isStage"]} # --- NEW: Get only the relevant blocks for the entire action plan --- relevant_scratch_blocks_catalog = get_relevant_blocks_for_plan(action_plan, ALL_SCRATCH_BLOCKS_CATALOG) logger.info(f"Filtered {len(relevant_scratch_blocks_catalog)} relevant blocks out of {len(ALL_SCRATCH_BLOCKS_CATALOG)} total.") # Iterate through the planned actions for each sprite for sprite_name, sprite_actions in action_plan.get("action_overall_flow", {}).items(): if sprite_name in sprite_map: current_sprite_target = sprite_map[sprite_name] # Ensure 'blocks' field exists for the sprite if "blocks" not in current_sprite_target: current_sprite_target["blocks"] = {} # Generate block JSON based on the detailed action plan for this sprite # This is where the LLM's role becomes crucial: translating logic to blocks llm_block_generation_prompt = ( f"You are an AI assistant generating Scratch 3.0 block JSON based on a provided plan. " f"The current sprite is '{sprite_name}'.\n" f"Its planned actions are:\n" f"```json\n{json.dumps(sprite_actions, indent=2)}\n```\n\n" f"Here is a **curated catalog of only the most relevant Scratch 3.0 blocks** for this plan:\n" f"```json\n{json.dumps(relevant_scratch_blocks_catalog, indent=2)}\n```\n\n" f"Current Scratch project JSON (for context, specifically this sprite's existing blocks if any):\n" f"```json\n{json.dumps(current_sprite_target, indent=2)}\n```\n\n" f"**Instructions:**\n" f"1. For each planned event and its associated logic, generate the corresponding Scratch 3.0 block JSON.\n" f"2. **Generate unique block IDs** for every new block. Use a format like 'block_id_abcdef12'.\n" # Updated ID format hint f"3. Properly link blocks using `next` and `parent` fields to form execution stacks. Hat blocks (`topLevel: true`, `parent: null`).\n" f"4. Correctly fill `inputs` and `fields` based on the catalog and the plan's logic (e.g., specific values for motion, keys for events, conditions for controls).\n" f"5. For C-blocks (like `control_repeat`, `control_forever`, `control_if`), use the `SUBSTACK` input to link to the first block inside its loop/conditional.\n" f"6. If the plan involves operators (e.g., 'if touching Sprite1'), use the appropriate operator blocks from the catalog and link them correctly as `CONDITION` inputs.\n" f"7. Ensure that any shadow blocks (e.g., for dropdowns like `motion_goto_menu`, `sensing_touchingobjectmenu`) are generated with `shadow: true` and linked correctly as inputs to their parent block.\n" f"8. Return ONLY the **updated 'blocks' dictionary** for this specific sprite. Do NOT return the full project JSON. ONLY the `blocks` dictionary." ) try: response = agent.invoke({"messages": [{"role": "user", "content": llm_block_generation_prompt}]}) raw_response = response["messages"][-1].content print(f"Raw response from LLM [ActionNodeBuilder - {sprite_name}]:", raw_response) generated_blocks = extract_json_from_llm_response(raw_response) current_sprite_target["blocks"].update(generated_blocks) # Merge new blocks logger.info(f"Action blocks added for sprite '{sprite_name}' by ActionNodeBuilder.") except Exception as e: logger.error(f"Error generating blocks for sprite '{sprite_name}': {e}") # Depending on robustness needed, you might continue or re-raise raise return {"project_json": project_json} # --- Example Usage (to demonstrate the flow) --- if __name__ == "__main__": # Initialize a mock game state initial_game_state = { "description": "A simple game where a sprite moves and says hello.", "project_json": { "targets": [ {"isStage": True, "name": "Stage", "blocks": {}}, {"isStage": False, "name": "Sprite1", "blocks": {}}, {"isStage": False, "name": "Ball", "blocks": {}} ] }, "sprite_initial_positions": {} } # Step 1: Plan Sprite Actions try: state_after_planning = plan_sprite_actions(initial_game_state) initial_game_state.update(state_after_planning) print("\n--- Game State After Planning ---") print(json.dumps(initial_game_state, indent=2)) except Exception as e: print(f"Planning failed: {e}") exit() # Step 2: Build Action Nodes (Generate Blocks) try: state_after_building = build_action_nodes(initial_game_state) initial_game_state.update(state_after_building) print("\n--- Game State After Building Blocks ---") # Print only the blocks for a specific sprite to keep output manageable for target in initial_game_state["project_json"]["targets"]: if not target["isStage"]: print(f"\nBlocks for {target['name']}:") print(json.dumps(target.get('blocks', {}), indent=2)) except Exception as e: print(f"Building blocks failed: {e}")