Spaces:
No application file
No application file
import json | |
import uuid | |
import logging | |
from typing import Dict, Any, List | |
# Assume GameState is a TypedDict or similar for clarity | |
# from typing import TypedDict | |
# class GameState(TypedDict): | |
# description: str | |
# project_json: Dict[str, Any] | |
# action_plan: Dict[str, Any] | |
# sprite_initial_positions: Dict[str, Any] | |
# Placeholder for actual GameState in your application | |
GameState = Dict[str, Any] | |
logger = logging.getLogger(__name__) | |
# --- Mock Agent for demonstration --- | |
class MockAgent: | |
def invoke(self, payload: Dict[str, Any]) -> Dict[str, Any]: | |
""" | |
Mocks an LLM agent invocation. In a real scenario, this would call your | |
actual LLM API (e.g., through LangChain, LlamaIndex, etc.). | |
""" | |
user_message = payload["messages"][-1]["content"] | |
print(f"\n--- Mock Agent Received Prompt (partial) ---\n{user_message[:500]}...\n------------------------------------------") | |
# Simplified mock responses for demonstration purposes | |
# In a real scenario, the LLM would generate actual Scratch block JSON | |
if "Propose a high-level action flow" in user_message: | |
return { | |
"messages": [{ | |
"content": json.dumps({ | |
"action_overall_flow": { | |
"Sprite1": { | |
"description": "Basic movement and interaction", | |
"plans": [ | |
{ | |
"event": "when flag clicked", | |
"logic": "forever loop: move 10 steps, if touching Edge then turn 15 degrees" | |
}, | |
{ | |
"event": "when space key pressed", | |
"logic": "say Hello! for 2 seconds" | |
} | |
] | |
}, | |
"Ball": { | |
"description": "Simple bouncing behavior", | |
"plans": [ | |
{ | |
"event": "when flag clicked", | |
"logic": "move 5 steps, if on edge bounce" | |
} | |
] | |
} | |
} | |
}) | |
}] | |
} | |
elif "You are an AI assistant generating Scratch 3.0 block JSON" in user_message: | |
# This mock response is highly simplified. A real LLM would generate | |
# valid Scratch blocks based on the provided relevant catalog and plan. | |
# We're just demonstrating the *mechanism* of filtering the catalog. | |
if "Sprite1" in user_message: | |
return { | |
"messages": [{ | |
"content": json.dumps({ | |
f"block_id_{generate_block_id()}": { | |
"opcode": "event_whenflagclicked", | |
"next": f"block_id_{generate_block_id()}_forever", | |
"parent": None, | |
"inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 100, "y": 100 | |
}, | |
f"block_id_{generate_block_id()}_forever": { | |
"opcode": "control_forever", | |
"next": None, | |
"parent": f"block_id_{generate_block_id()}", | |
"inputs": { | |
"SUBSTACK": [2, f"block_id_{generate_block_id()}_move"] | |
}, "fields": {}, "shadow": False, "topLevel": False | |
}, | |
f"block_id_{generate_block_id()}_move": { | |
"opcode": "motion_movesteps", | |
"next": f"block_id_{generate_block_id()}_if", | |
"parent": f"block_id_{generate_block_id()}_forever", | |
"inputs": { | |
"STEPS": [1, [4, "10"]] | |
}, "fields": {}, "shadow": False, "topLevel": False | |
}, | |
f"block_id_{generate_block_id()}_if": { | |
"opcode": "control_if", | |
"next": None, | |
"parent": f"block_id_{generate_block_id()}_forever", | |
"inputs": { | |
"CONDITION": [2, f"block_id_{generate_block_id()}_touching"], | |
"SUBSTACK": [2, f"block_id_{generate_block_id()}_turn"] | |
}, "fields": {}, "shadow": False, "topLevel": False | |
}, | |
f"block_id_{generate_block_id()}_touching": { | |
"opcode": "sensing_touchingobject", | |
"next": None, | |
"parent": f"block_id_{generate_block_id()}_if", | |
"inputs": { | |
"TOUCHINGOBJECTMENU": [1, f"block_id_{generate_block_id()}_touching_menu"] | |
}, "fields": {}, "shadow": False, "topLevel": False | |
}, | |
f"block_id_{generate_block_id()}_touching_menu": { | |
"opcode": "sensing_touchingobjectmenu", | |
"next": None, | |
"parent": f"block_id_{generate_block_id()}_touching", | |
"inputs": {}, | |
"fields": {"TOUCHINGOBJECTMENU": ["_edge_", None]}, | |
"shadow": True, "topLevel": False | |
}, | |
f"block_id_{generate_block_id()}_turn": { | |
"opcode": "motion_turnright", | |
"next": None, | |
"parent": f"block_id_{generate_block_id()}_if", | |
"inputs": { | |
"DEGREES": [1, [4, "15"]] | |
}, "fields": {}, "shadow": False, "topLevel": False | |
}, | |
f"block_id_{generate_block_id()}_say": { | |
"opcode": "looks_sayforsecs", | |
"next": None, | |
"parent": None, # This block would typically be part of a separate script | |
"inputs": { | |
"MESSAGE": [1, [10, "Hello!"]], | |
"SECS": [1, [4, "2"]] | |
}, "fields": {}, "shadow": False, "topLevel": True, "x": 300, "y": 100 | |
} | |
}) | |
}] | |
} | |
elif "Ball" in user_message: | |
return { | |
"messages": [{ | |
"content": json.dumps({ | |
f"block_id_{generate_block_id()}": { | |
"opcode": "event_whenflagclicked", | |
"next": f"block_id_{generate_block_id()}_moveball", | |
"parent": None, | |
"inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 100, "y": 100 | |
}, | |
f"block_id_{generate_block_id()}_moveball": { | |
"opcode": "motion_movesteps", | |
"next": f"block_id_{generate_block_id()}_edgebounce", | |
"parent": f"block_id_{generate_block_id()}", | |
"inputs": { | |
"STEPS": [1, [4, "5"]] | |
}, "fields": {}, "shadow": False, "topLevel": False | |
}, | |
f"block_id_{generate_block_id()}_edgebounce": { | |
"opcode": "motion_ifonedgebounce", | |
"next": None, | |
"parent": f"block_id_{generate_block_id()}_moveball", | |
"inputs": {}, "fields": {}, "shadow": False, "topLevel": False | |
} | |
}) | |
}] | |
} | |
return {"messages": [{"content": "[]"}]} # Default empty response | |
agent = MockAgent() | |
# Helper function to generate a unique block ID | |
def generate_block_id(): | |
return str(uuid.uuid4())[:10].replace('-', '') # Shorten for readability, ensure uniqueness | |
# Placeholder for your extract_json_from_llm_response function | |
def extract_json_from_llm_response(response_string): | |
try: | |
# Assuming the LLM response is ONLY the JSON string within triple backticks | |
json_match = response_string.strip().replace("```json", "").replace("```", "").strip() | |
return json.loads(json_match) | |
except json.JSONDecodeError as e: | |
logger.error(f"Failed to decode JSON from LLM response: {e}") | |
logger.error(f"Raw response: {response_string}") | |
raise ValueError("Invalid JSON response from LLM") | |
# --- GLOBAL CATALOG OF ALL SCRATCH BLOCKS --- | |
# This is where you would load your block_content.json | |
# For demonstration, I'm using your provided snippets and adding some common ones. | |
# In a real application, you'd load this once at startup. | |
ALL_SCRATCH_BLOCKS_CATALOG = { | |
"motion_movesteps": { | |
"opcode": "motion_movesteps", "next": None, "parent": None, | |
"inputs": {"STEPS": [1, [4, "10"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 464, "y": -416 | |
}, | |
"motion_turnright": { | |
"opcode": "motion_turnright", "next": None, "parent": None, | |
"inputs": {"DEGREES": [1, [4, "15"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 467, "y": -316 | |
}, | |
"motion_ifonedgebounce": { | |
"opcode": "motion_ifonedgebounce", "next": None, "parent": None, | |
"inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 467, "y": -316 | |
}, | |
"event_whenflagclicked": { | |
"opcode": "event_whenflagclicked", "next": None, "parent": None, | |
"inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10 | |
}, | |
"event_whenkeypressed": { | |
"opcode": "event_whenkeypressed", "next": None, "parent": None, | |
"inputs": {}, "fields": {"KEY_OPTION": ["space", None]}, "shadow": False, "topLevel": True, "x": 10, "y": 10 | |
}, | |
"control_forever": { | |
"opcode": "control_forever", "next": None, "parent": None, | |
"inputs": {"SUBSTACK": [2, "some_id"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10 | |
}, | |
"control_if": { | |
"opcode": "control_if", "next": None, "parent": None, | |
"inputs": {"CONDITION": [2, "some_id"], "SUBSTACK": [2, "some_id_2"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10 | |
}, | |
"looks_sayforsecs": { | |
"opcode": "looks_sayforsecs", "next": None, "parent": None, | |
"inputs": {"MESSAGE": [1, [10, "Hello!"]], "SECS": [1, [4, "2"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10 | |
}, | |
"looks_say": { | |
"opcode": "looks_say", "next": None, "parent": None, | |
"inputs": {"MESSAGE": [1, [10, "Hello!"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10 | |
}, | |
"sensing_touchingobject": { | |
"opcode": "sensing_touchingobject", "next": None, "parent": None, | |
"inputs": {"TOUCHINGOBJECTMENU": [1, "some_id"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10 | |
}, | |
"sensing_touchingobjectmenu": { | |
"opcode": "sensing_touchingobjectmenu", "next": None, "parent": None, | |
"inputs": {}, "fields": {"TOUCHINGOBJECTMENU": ["_mouse_", None]}, "shadow": True, "topLevel": True, "x": 10, "y": 10 | |
}, | |
# Add more blocks from your block_content.json here... | |
} | |
# --- Heuristic-based block selection --- | |
def get_relevant_blocks_for_plan(action_plan: Dict[str, Any], all_blocks_catalog: Dict[str, Any]) -> Dict[str, Any]: | |
""" | |
Analyzes the natural language action plan and selects relevant Scratch blocks | |
from the comprehensive catalog. This is a heuristic approach and might need | |
to be refined based on your specific use cases and LLM capabilities. | |
""" | |
relevant_opcodes = set() | |
# Always include common event blocks | |
relevant_opcodes.add("event_whenflagclicked") | |
relevant_opcodes.add("event_whenkeypressed") # Could be more specific if key is mentioned | |
# Keyword to opcode mapping (can be expanded) | |
keyword_map = { | |
"move": "motion_movesteps", | |
"steps": "motion_movesteps", | |
"turn": "motion_turnright", | |
"rotate": "motion_turnright", | |
"bounce": "motion_ifonedgebounce", | |
"edge": "motion_ifonedgebounce", | |
"forever": "control_forever", | |
"loop": "control_forever", | |
"if": "control_if", | |
"condition": "control_if", | |
"say": "looks_say", | |
"hello": "looks_say", # Simple example, might need more context | |
"touching": "sensing_touchingobject", | |
"mouse pointer": "sensing_touchingobjectmenu", | |
"edge": "sensing_touchingobjectmenu", # For touching edge | |
} | |
# Iterate through the action plan to find keywords | |
for sprite_name, sprite_actions in action_plan.get("action_overall_flow", {}).items(): | |
for plan in sprite_actions.get("plans", []): | |
event_logic = plan.get("event", "").lower() + " " + plan.get("logic", "").lower() | |
# Check for direct opcode matches (if the LLM somehow outputs opcodes in its plan) | |
for opcode in all_blocks_catalog.keys(): | |
if opcode in event_logic: | |
relevant_opcodes.add(opcode) | |
# Check for keywords | |
for keyword, opcode in keyword_map.items(): | |
if keyword in event_logic: | |
relevant_opcodes.add(opcode) | |
# Add associated shadow blocks if known | |
if opcode == "sensing_touchingobject": | |
relevant_opcodes.add("sensing_touchingobjectmenu") | |
if opcode == "event_whenkeypressed": | |
relevant_opcodes.add("event_whenkeypressed") # It's already there but good to be explicit | |
# Construct the filtered catalog | |
relevant_blocks_catalog = { | |
opcode: all_blocks_catalog[opcode] | |
for opcode in relevant_opcodes if opcode in all_blocks_catalog | |
} | |
return relevant_blocks_catalog | |
# --- New Action Planning Node --- | |
def plan_sprite_actions(state: GameState): | |
logger.info("--- Running PlanSpriteActionsNode ---") | |
planning_prompt = ( | |
f"You are an AI assistant tasked with planning Scratch 3.0 block code for a game. " | |
f"The game description is: '{state['description']}'.\n\n" | |
f"Here are the sprites currently in the project: {', '.join(target['name'] for target in state['project_json']['targets'] if not target['isStage']) if len(state['project_json']['targets']) > 1 else 'None'}.\n" | |
f"Initial positions: {json.dumps(state.get('sprite_initial_positions', {}), indent=2)}\n\n" | |
f"Consider the main actions and interactions required for each sprite. " | |
f"Think step-by-step about what each sprite needs to *do*, *when* it needs to do it (events), " | |
f"and if any actions need to *repeat* or depend on *conditions*.\n\n" | |
f"Propose a high-level action flow for each sprite in the following JSON format. " | |
f"Do NOT generate Scratch block JSON yet. Only describe the logic using natural language or simplified pseudo-code.\n\n" | |
f"Example format:\n" | |
f"```json\n" | |
f"{{\n" | |
f" \"action_overall_flow\": {{\n" | |
f" \"Sprite1\": {{\n" | |
f" \"description\": \"Main character actions\",\n" | |
f" \"plans\": [\n" | |
f" {{\n" | |
f" \"event\": \"when flag clicked\",\n" | |
f" \"logic\": \"forever loop: move 10 steps, if on edge bounce\"\n" | |
f" }},\n" | |
f" {{\n" | |
f" \"event\": \"when space key pressed\",\n" | |
f" \"logic\": \"change y by 10, wait 0.1 seconds, change y by -10\"\n" | |
f" }}\n" | |
f" ]\n" | |
f" }},\n" | |
f" \"Ball\": {{\n" | |
f" \"description\": \"Projectile movement\",\n" | |
f" \"plans\": [\n" | |
f" {{\n" | |
f" \"event\": \"when I start as a clone\",\n" | |
f" \"logic\": \"glide 1 sec to random position, if touching Sprite1 then stop this script\"\n" | |
f" }}\n" | |
f" ]\n" | |
f" }}\n" | |
f" }}\n" | |
f"}}\n" | |
f"```\n\n" | |
f"Return ONLY the JSON object for the action overall flow." | |
) | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": planning_prompt}]}) | |
raw_response = response["messages"][-1].content | |
print("Raw response from LLM [PlanSpriteActionsNode]:", raw_response) | |
action_plan = extract_json_from_llm_response(raw_response) | |
logger.info("Sprite action plan generated by PlanSpriteActionsNode.") | |
return {"action_plan": action_plan} | |
except Exception as e: | |
logger.error(f"Error in PlanSpriteActionsNode: {e}") | |
raise | |
# --- Updated Action Node Builder (to consume the plan and build blocks) --- | |
def build_action_nodes(state: GameState): | |
logger.info("--- Running ActionNodeBuilder ---") | |
action_plan = state.get("action_plan", {}) | |
if not action_plan: | |
raise ValueError("No action plan found in state. Run PlanSpriteActionsNode first.") | |
# Convert the Scratch project JSON to a mutable Python object | |
project_json = state["project_json"] | |
targets = project_json["targets"] | |
# We need a way to map sprite names to their actual target objects in project_json | |
sprite_map = {target["name"]: target for target in targets if not target["isStage"]} | |
# --- NEW: Get only the relevant blocks for the entire action plan --- | |
relevant_scratch_blocks_catalog = get_relevant_blocks_for_plan(action_plan, ALL_SCRATCH_BLOCKS_CATALOG) | |
logger.info(f"Filtered {len(relevant_scratch_blocks_catalog)} relevant blocks out of {len(ALL_SCRATCH_BLOCKS_CATALOG)} total.") | |
# Iterate through the planned actions for each sprite | |
for sprite_name, sprite_actions in action_plan.get("action_overall_flow", {}).items(): | |
if sprite_name in sprite_map: | |
current_sprite_target = sprite_map[sprite_name] | |
# Ensure 'blocks' field exists for the sprite | |
if "blocks" not in current_sprite_target: | |
current_sprite_target["blocks"] = {} | |
# Generate block JSON based on the detailed action plan for this sprite | |
# This is where the LLM's role becomes crucial: translating logic to blocks | |
llm_block_generation_prompt = ( | |
f"You are an AI assistant generating Scratch 3.0 block JSON based on a provided plan. " | |
f"The current sprite is '{sprite_name}'.\n" | |
f"Its planned actions are:\n" | |
f"```json\n{json.dumps(sprite_actions, indent=2)}\n```\n\n" | |
f"Here is a **curated catalog of only the most relevant Scratch 3.0 blocks** for this plan:\n" | |
f"```json\n{json.dumps(relevant_scratch_blocks_catalog, indent=2)}\n```\n\n" | |
f"Current Scratch project JSON (for context, specifically this sprite's existing blocks if any):\n" | |
f"```json\n{json.dumps(current_sprite_target, indent=2)}\n```\n\n" | |
f"**Instructions:**\n" | |
f"1. For each planned event and its associated logic, generate the corresponding Scratch 3.0 block JSON.\n" | |
f"2. **Generate unique block IDs** for every new block. Use a format like 'block_id_abcdef12'.\n" # Updated ID format hint | |
f"3. Properly link blocks using `next` and `parent` fields to form execution stacks. Hat blocks (`topLevel: true`, `parent: null`).\n" | |
f"4. Correctly fill `inputs` and `fields` based on the catalog and the plan's logic (e.g., specific values for motion, keys for events, conditions for controls).\n" | |
f"5. For C-blocks (like `control_repeat`, `control_forever`, `control_if`), use the `SUBSTACK` input to link to the first block inside its loop/conditional.\n" | |
f"6. If the plan involves operators (e.g., 'if touching Sprite1'), use the appropriate operator blocks from the catalog and link them correctly as `CONDITION` inputs.\n" | |
f"7. Ensure that any shadow blocks (e.g., for dropdowns like `motion_goto_menu`, `sensing_touchingobjectmenu`) are generated with `shadow: true` and linked correctly as inputs to their parent block.\n" | |
f"8. Return ONLY the **updated 'blocks' dictionary** for this specific sprite. Do NOT return the full project JSON. ONLY the `blocks` dictionary." | |
) | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": llm_block_generation_prompt}]}) | |
raw_response = response["messages"][-1].content | |
print(f"Raw response from LLM [ActionNodeBuilder - {sprite_name}]:", raw_response) | |
generated_blocks = extract_json_from_llm_response(raw_response) | |
current_sprite_target["blocks"].update(generated_blocks) # Merge new blocks | |
logger.info(f"Action blocks added for sprite '{sprite_name}' by ActionNodeBuilder.") | |
except Exception as e: | |
logger.error(f"Error generating blocks for sprite '{sprite_name}': {e}") | |
# Depending on robustness needed, you might continue or re-raise | |
raise | |
return {"project_json": project_json} | |
# --- Example Usage (to demonstrate the flow) --- | |
if __name__ == "__main__": | |
# Initialize a mock game state | |
initial_game_state = { | |
"description": "A simple game where a sprite moves and says hello.", | |
"project_json": { | |
"targets": [ | |
{"isStage": True, "name": "Stage", "blocks": {}}, | |
{"isStage": False, "name": "Sprite1", "blocks": {}}, | |
{"isStage": False, "name": "Ball", "blocks": {}} | |
] | |
}, | |
"sprite_initial_positions": {} | |
} | |
# Step 1: Plan Sprite Actions | |
try: | |
state_after_planning = plan_sprite_actions(initial_game_state) | |
initial_game_state.update(state_after_planning) | |
print("\n--- Game State After Planning ---") | |
print(json.dumps(initial_game_state, indent=2)) | |
except Exception as e: | |
print(f"Planning failed: {e}") | |
exit() | |
# Step 2: Build Action Nodes (Generate Blocks) | |
try: | |
state_after_building = build_action_nodes(initial_game_state) | |
initial_game_state.update(state_after_building) | |
print("\n--- Game State After Building Blocks ---") | |
# Print only the blocks for a specific sprite to keep output manageable | |
for target in initial_game_state["project_json"]["targets"]: | |
if not target["isStage"]: | |
print(f"\nBlocks for {target['name']}:") | |
print(json.dumps(target.get('blocks', {}), indent=2)) | |
except Exception as e: | |
print(f"Building blocks failed: {e}") |