scratch_agent / utils /action_node.py
WebashalarForML's picture
Upload 24 files
3d3703f verified
import json
import uuid
import logging
from typing import Dict, Any, List
# Assume GameState is a TypedDict or similar for clarity
# from typing import TypedDict
# class GameState(TypedDict):
# description: str
# project_json: Dict[str, Any]
# action_plan: Dict[str, Any]
# sprite_initial_positions: Dict[str, Any]
# Placeholder for actual GameState in your application
GameState = Dict[str, Any]
logger = logging.getLogger(__name__)
# --- Mock Agent for demonstration ---
class MockAgent:
def invoke(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
Mocks an LLM agent invocation. In a real scenario, this would call your
actual LLM API (e.g., through LangChain, LlamaIndex, etc.).
"""
user_message = payload["messages"][-1]["content"]
print(f"\n--- Mock Agent Received Prompt (partial) ---\n{user_message[:500]}...\n------------------------------------------")
# Simplified mock responses for demonstration purposes
# In a real scenario, the LLM would generate actual Scratch block JSON
if "Propose a high-level action flow" in user_message:
return {
"messages": [{
"content": json.dumps({
"action_overall_flow": {
"Sprite1": {
"description": "Basic movement and interaction",
"plans": [
{
"event": "when flag clicked",
"logic": "forever loop: move 10 steps, if touching Edge then turn 15 degrees"
},
{
"event": "when space key pressed",
"logic": "say Hello! for 2 seconds"
}
]
},
"Ball": {
"description": "Simple bouncing behavior",
"plans": [
{
"event": "when flag clicked",
"logic": "move 5 steps, if on edge bounce"
}
]
}
}
})
}]
}
elif "You are an AI assistant generating Scratch 3.0 block JSON" in user_message:
# This mock response is highly simplified. A real LLM would generate
# valid Scratch blocks based on the provided relevant catalog and plan.
# We're just demonstrating the *mechanism* of filtering the catalog.
if "Sprite1" in user_message:
return {
"messages": [{
"content": json.dumps({
f"block_id_{generate_block_id()}": {
"opcode": "event_whenflagclicked",
"next": f"block_id_{generate_block_id()}_forever",
"parent": None,
"inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 100, "y": 100
},
f"block_id_{generate_block_id()}_forever": {
"opcode": "control_forever",
"next": None,
"parent": f"block_id_{generate_block_id()}",
"inputs": {
"SUBSTACK": [2, f"block_id_{generate_block_id()}_move"]
}, "fields": {}, "shadow": False, "topLevel": False
},
f"block_id_{generate_block_id()}_move": {
"opcode": "motion_movesteps",
"next": f"block_id_{generate_block_id()}_if",
"parent": f"block_id_{generate_block_id()}_forever",
"inputs": {
"STEPS": [1, [4, "10"]]
}, "fields": {}, "shadow": False, "topLevel": False
},
f"block_id_{generate_block_id()}_if": {
"opcode": "control_if",
"next": None,
"parent": f"block_id_{generate_block_id()}_forever",
"inputs": {
"CONDITION": [2, f"block_id_{generate_block_id()}_touching"],
"SUBSTACK": [2, f"block_id_{generate_block_id()}_turn"]
}, "fields": {}, "shadow": False, "topLevel": False
},
f"block_id_{generate_block_id()}_touching": {
"opcode": "sensing_touchingobject",
"next": None,
"parent": f"block_id_{generate_block_id()}_if",
"inputs": {
"TOUCHINGOBJECTMENU": [1, f"block_id_{generate_block_id()}_touching_menu"]
}, "fields": {}, "shadow": False, "topLevel": False
},
f"block_id_{generate_block_id()}_touching_menu": {
"opcode": "sensing_touchingobjectmenu",
"next": None,
"parent": f"block_id_{generate_block_id()}_touching",
"inputs": {},
"fields": {"TOUCHINGOBJECTMENU": ["_edge_", None]},
"shadow": True, "topLevel": False
},
f"block_id_{generate_block_id()}_turn": {
"opcode": "motion_turnright",
"next": None,
"parent": f"block_id_{generate_block_id()}_if",
"inputs": {
"DEGREES": [1, [4, "15"]]
}, "fields": {}, "shadow": False, "topLevel": False
},
f"block_id_{generate_block_id()}_say": {
"opcode": "looks_sayforsecs",
"next": None,
"parent": None, # This block would typically be part of a separate script
"inputs": {
"MESSAGE": [1, [10, "Hello!"]],
"SECS": [1, [4, "2"]]
}, "fields": {}, "shadow": False, "topLevel": True, "x": 300, "y": 100
}
})
}]
}
elif "Ball" in user_message:
return {
"messages": [{
"content": json.dumps({
f"block_id_{generate_block_id()}": {
"opcode": "event_whenflagclicked",
"next": f"block_id_{generate_block_id()}_moveball",
"parent": None,
"inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 100, "y": 100
},
f"block_id_{generate_block_id()}_moveball": {
"opcode": "motion_movesteps",
"next": f"block_id_{generate_block_id()}_edgebounce",
"parent": f"block_id_{generate_block_id()}",
"inputs": {
"STEPS": [1, [4, "5"]]
}, "fields": {}, "shadow": False, "topLevel": False
},
f"block_id_{generate_block_id()}_edgebounce": {
"opcode": "motion_ifonedgebounce",
"next": None,
"parent": f"block_id_{generate_block_id()}_moveball",
"inputs": {}, "fields": {}, "shadow": False, "topLevel": False
}
})
}]
}
return {"messages": [{"content": "[]"}]} # Default empty response
agent = MockAgent()
# Helper function to generate a unique block ID
def generate_block_id():
return str(uuid.uuid4())[:10].replace('-', '') # Shorten for readability, ensure uniqueness
# Placeholder for your extract_json_from_llm_response function
def extract_json_from_llm_response(response_string):
try:
# Assuming the LLM response is ONLY the JSON string within triple backticks
json_match = response_string.strip().replace("```json", "").replace("```", "").strip()
return json.loads(json_match)
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON from LLM response: {e}")
logger.error(f"Raw response: {response_string}")
raise ValueError("Invalid JSON response from LLM")
# --- GLOBAL CATALOG OF ALL SCRATCH BLOCKS ---
# This is where you would load your block_content.json
# For demonstration, I'm using your provided snippets and adding some common ones.
# In a real application, you'd load this once at startup.
ALL_SCRATCH_BLOCKS_CATALOG = {
"motion_movesteps": {
"opcode": "motion_movesteps", "next": None, "parent": None,
"inputs": {"STEPS": [1, [4, "10"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 464, "y": -416
},
"motion_turnright": {
"opcode": "motion_turnright", "next": None, "parent": None,
"inputs": {"DEGREES": [1, [4, "15"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 467, "y": -316
},
"motion_ifonedgebounce": {
"opcode": "motion_ifonedgebounce", "next": None, "parent": None,
"inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 467, "y": -316
},
"event_whenflagclicked": {
"opcode": "event_whenflagclicked", "next": None, "parent": None,
"inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
},
"event_whenkeypressed": {
"opcode": "event_whenkeypressed", "next": None, "parent": None,
"inputs": {}, "fields": {"KEY_OPTION": ["space", None]}, "shadow": False, "topLevel": True, "x": 10, "y": 10
},
"control_forever": {
"opcode": "control_forever", "next": None, "parent": None,
"inputs": {"SUBSTACK": [2, "some_id"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
},
"control_if": {
"opcode": "control_if", "next": None, "parent": None,
"inputs": {"CONDITION": [2, "some_id"], "SUBSTACK": [2, "some_id_2"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
},
"looks_sayforsecs": {
"opcode": "looks_sayforsecs", "next": None, "parent": None,
"inputs": {"MESSAGE": [1, [10, "Hello!"]], "SECS": [1, [4, "2"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
},
"looks_say": {
"opcode": "looks_say", "next": None, "parent": None,
"inputs": {"MESSAGE": [1, [10, "Hello!"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
},
"sensing_touchingobject": {
"opcode": "sensing_touchingobject", "next": None, "parent": None,
"inputs": {"TOUCHINGOBJECTMENU": [1, "some_id"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
},
"sensing_touchingobjectmenu": {
"opcode": "sensing_touchingobjectmenu", "next": None, "parent": None,
"inputs": {}, "fields": {"TOUCHINGOBJECTMENU": ["_mouse_", None]}, "shadow": True, "topLevel": True, "x": 10, "y": 10
},
# Add more blocks from your block_content.json here...
}
# --- Heuristic-based block selection ---
def get_relevant_blocks_for_plan(action_plan: Dict[str, Any], all_blocks_catalog: Dict[str, Any]) -> Dict[str, Any]:
"""
Analyzes the natural language action plan and selects relevant Scratch blocks
from the comprehensive catalog. This is a heuristic approach and might need
to be refined based on your specific use cases and LLM capabilities.
"""
relevant_opcodes = set()
# Always include common event blocks
relevant_opcodes.add("event_whenflagclicked")
relevant_opcodes.add("event_whenkeypressed") # Could be more specific if key is mentioned
# Keyword to opcode mapping (can be expanded)
keyword_map = {
"move": "motion_movesteps",
"steps": "motion_movesteps",
"turn": "motion_turnright",
"rotate": "motion_turnright",
"bounce": "motion_ifonedgebounce",
"edge": "motion_ifonedgebounce",
"forever": "control_forever",
"loop": "control_forever",
"if": "control_if",
"condition": "control_if",
"say": "looks_say",
"hello": "looks_say", # Simple example, might need more context
"touching": "sensing_touchingobject",
"mouse pointer": "sensing_touchingobjectmenu",
"edge": "sensing_touchingobjectmenu", # For touching edge
}
# Iterate through the action plan to find keywords
for sprite_name, sprite_actions in action_plan.get("action_overall_flow", {}).items():
for plan in sprite_actions.get("plans", []):
event_logic = plan.get("event", "").lower() + " " + plan.get("logic", "").lower()
# Check for direct opcode matches (if the LLM somehow outputs opcodes in its plan)
for opcode in all_blocks_catalog.keys():
if opcode in event_logic:
relevant_opcodes.add(opcode)
# Check for keywords
for keyword, opcode in keyword_map.items():
if keyword in event_logic:
relevant_opcodes.add(opcode)
# Add associated shadow blocks if known
if opcode == "sensing_touchingobject":
relevant_opcodes.add("sensing_touchingobjectmenu")
if opcode == "event_whenkeypressed":
relevant_opcodes.add("event_whenkeypressed") # It's already there but good to be explicit
# Construct the filtered catalog
relevant_blocks_catalog = {
opcode: all_blocks_catalog[opcode]
for opcode in relevant_opcodes if opcode in all_blocks_catalog
}
return relevant_blocks_catalog
# --- New Action Planning Node ---
def plan_sprite_actions(state: GameState):
logger.info("--- Running PlanSpriteActionsNode ---")
planning_prompt = (
f"You are an AI assistant tasked with planning Scratch 3.0 block code for a game. "
f"The game description is: '{state['description']}'.\n\n"
f"Here are the sprites currently in the project: {', '.join(target['name'] for target in state['project_json']['targets'] if not target['isStage']) if len(state['project_json']['targets']) > 1 else 'None'}.\n"
f"Initial positions: {json.dumps(state.get('sprite_initial_positions', {}), indent=2)}\n\n"
f"Consider the main actions and interactions required for each sprite. "
f"Think step-by-step about what each sprite needs to *do*, *when* it needs to do it (events), "
f"and if any actions need to *repeat* or depend on *conditions*.\n\n"
f"Propose a high-level action flow for each sprite in the following JSON format. "
f"Do NOT generate Scratch block JSON yet. Only describe the logic using natural language or simplified pseudo-code.\n\n"
f"Example format:\n"
f"```json\n"
f"{{\n"
f" \"action_overall_flow\": {{\n"
f" \"Sprite1\": {{\n"
f" \"description\": \"Main character actions\",\n"
f" \"plans\": [\n"
f" {{\n"
f" \"event\": \"when flag clicked\",\n"
f" \"logic\": \"forever loop: move 10 steps, if on edge bounce\"\n"
f" }},\n"
f" {{\n"
f" \"event\": \"when space key pressed\",\n"
f" \"logic\": \"change y by 10, wait 0.1 seconds, change y by -10\"\n"
f" }}\n"
f" ]\n"
f" }},\n"
f" \"Ball\": {{\n"
f" \"description\": \"Projectile movement\",\n"
f" \"plans\": [\n"
f" {{\n"
f" \"event\": \"when I start as a clone\",\n"
f" \"logic\": \"glide 1 sec to random position, if touching Sprite1 then stop this script\"\n"
f" }}\n"
f" ]\n"
f" }}\n"
f" }}\n"
f"}}\n"
f"```\n\n"
f"Return ONLY the JSON object for the action overall flow."
)
try:
response = agent.invoke({"messages": [{"role": "user", "content": planning_prompt}]})
raw_response = response["messages"][-1].content
print("Raw response from LLM [PlanSpriteActionsNode]:", raw_response)
action_plan = extract_json_from_llm_response(raw_response)
logger.info("Sprite action plan generated by PlanSpriteActionsNode.")
return {"action_plan": action_plan}
except Exception as e:
logger.error(f"Error in PlanSpriteActionsNode: {e}")
raise
# --- Updated Action Node Builder (to consume the plan and build blocks) ---
def build_action_nodes(state: GameState):
logger.info("--- Running ActionNodeBuilder ---")
action_plan = state.get("action_plan", {})
if not action_plan:
raise ValueError("No action plan found in state. Run PlanSpriteActionsNode first.")
# Convert the Scratch project JSON to a mutable Python object
project_json = state["project_json"]
targets = project_json["targets"]
# We need a way to map sprite names to their actual target objects in project_json
sprite_map = {target["name"]: target for target in targets if not target["isStage"]}
# --- NEW: Get only the relevant blocks for the entire action plan ---
relevant_scratch_blocks_catalog = get_relevant_blocks_for_plan(action_plan, ALL_SCRATCH_BLOCKS_CATALOG)
logger.info(f"Filtered {len(relevant_scratch_blocks_catalog)} relevant blocks out of {len(ALL_SCRATCH_BLOCKS_CATALOG)} total.")
# Iterate through the planned actions for each sprite
for sprite_name, sprite_actions in action_plan.get("action_overall_flow", {}).items():
if sprite_name in sprite_map:
current_sprite_target = sprite_map[sprite_name]
# Ensure 'blocks' field exists for the sprite
if "blocks" not in current_sprite_target:
current_sprite_target["blocks"] = {}
# Generate block JSON based on the detailed action plan for this sprite
# This is where the LLM's role becomes crucial: translating logic to blocks
llm_block_generation_prompt = (
f"You are an AI assistant generating Scratch 3.0 block JSON based on a provided plan. "
f"The current sprite is '{sprite_name}'.\n"
f"Its planned actions are:\n"
f"```json\n{json.dumps(sprite_actions, indent=2)}\n```\n\n"
f"Here is a **curated catalog of only the most relevant Scratch 3.0 blocks** for this plan:\n"
f"```json\n{json.dumps(relevant_scratch_blocks_catalog, indent=2)}\n```\n\n"
f"Current Scratch project JSON (for context, specifically this sprite's existing blocks if any):\n"
f"```json\n{json.dumps(current_sprite_target, indent=2)}\n```\n\n"
f"**Instructions:**\n"
f"1. For each planned event and its associated logic, generate the corresponding Scratch 3.0 block JSON.\n"
f"2. **Generate unique block IDs** for every new block. Use a format like 'block_id_abcdef12'.\n" # Updated ID format hint
f"3. Properly link blocks using `next` and `parent` fields to form execution stacks. Hat blocks (`topLevel: true`, `parent: null`).\n"
f"4. Correctly fill `inputs` and `fields` based on the catalog and the plan's logic (e.g., specific values for motion, keys for events, conditions for controls).\n"
f"5. For C-blocks (like `control_repeat`, `control_forever`, `control_if`), use the `SUBSTACK` input to link to the first block inside its loop/conditional.\n"
f"6. If the plan involves operators (e.g., 'if touching Sprite1'), use the appropriate operator blocks from the catalog and link them correctly as `CONDITION` inputs.\n"
f"7. Ensure that any shadow blocks (e.g., for dropdowns like `motion_goto_menu`, `sensing_touchingobjectmenu`) are generated with `shadow: true` and linked correctly as inputs to their parent block.\n"
f"8. Return ONLY the **updated 'blocks' dictionary** for this specific sprite. Do NOT return the full project JSON. ONLY the `blocks` dictionary."
)
try:
response = agent.invoke({"messages": [{"role": "user", "content": llm_block_generation_prompt}]})
raw_response = response["messages"][-1].content
print(f"Raw response from LLM [ActionNodeBuilder - {sprite_name}]:", raw_response)
generated_blocks = extract_json_from_llm_response(raw_response)
current_sprite_target["blocks"].update(generated_blocks) # Merge new blocks
logger.info(f"Action blocks added for sprite '{sprite_name}' by ActionNodeBuilder.")
except Exception as e:
logger.error(f"Error generating blocks for sprite '{sprite_name}': {e}")
# Depending on robustness needed, you might continue or re-raise
raise
return {"project_json": project_json}
# --- Example Usage (to demonstrate the flow) ---
if __name__ == "__main__":
# Initialize a mock game state
initial_game_state = {
"description": "A simple game where a sprite moves and says hello.",
"project_json": {
"targets": [
{"isStage": True, "name": "Stage", "blocks": {}},
{"isStage": False, "name": "Sprite1", "blocks": {}},
{"isStage": False, "name": "Ball", "blocks": {}}
]
},
"sprite_initial_positions": {}
}
# Step 1: Plan Sprite Actions
try:
state_after_planning = plan_sprite_actions(initial_game_state)
initial_game_state.update(state_after_planning)
print("\n--- Game State After Planning ---")
print(json.dumps(initial_game_state, indent=2))
except Exception as e:
print(f"Planning failed: {e}")
exit()
# Step 2: Build Action Nodes (Generate Blocks)
try:
state_after_building = build_action_nodes(initial_game_state)
initial_game_state.update(state_after_building)
print("\n--- Game State After Building Blocks ---")
# Print only the blocks for a specific sprite to keep output manageable
for target in initial_game_state["project_json"]["targets"]:
if not target["isStage"]:
print(f"\nBlocks for {target['name']}:")
print(json.dumps(target.get('blocks', {}), indent=2))
except Exception as e:
print(f"Building blocks failed: {e}")