Spaces:
Runtime error
Runtime error
File size: 24,387 Bytes
a522962 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 |
import json
import uuid
import logging
from typing import Dict, Any, List
# Assume GameState is a TypedDict or similar for clarity
# from typing import TypedDict
# class GameState(TypedDict):
# description: str
# project_json: Dict[str, Any]
# action_plan: Dict[str, Any]
# sprite_initial_positions: Dict[str, Any]
# Placeholder for actual GameState in your application
GameState = Dict[str, Any]
logger = logging.getLogger(__name__)
# --- Mock Agent for demonstration ---
class MockAgent:
def invoke(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
Mocks an LLM agent invocation. In a real scenario, this would call your
actual LLM API (e.g., through LangChain, LlamaIndex, etc.).
"""
user_message = payload["messages"][-1]["content"]
print(f"\n--- Mock Agent Received Prompt (partial) ---\n{user_message[:500]}...\n------------------------------------------")
# Simplified mock responses for demonstration purposes
# In a real scenario, the LLM would generate actual Scratch block JSON
if "Propose a high-level action flow" in user_message:
return {
"messages": [{
"content": json.dumps({
"action_overall_flow": {
"Sprite1": {
"description": "Basic movement and interaction",
"plans": [
{
"event": "when flag clicked",
"logic": "forever loop: move 10 steps, if touching Edge then turn 15 degrees"
},
{
"event": "when space key pressed",
"logic": "say Hello! for 2 seconds"
}
]
},
"Ball": {
"description": "Simple bouncing behavior",
"plans": [
{
"event": "when flag clicked",
"logic": "move 5 steps, if on edge bounce"
}
]
}
}
})
}]
}
elif "You are an AI assistant generating Scratch 3.0 block JSON" in user_message:
# This mock response is highly simplified. A real LLM would generate
# valid Scratch blocks based on the provided relevant catalog and plan.
# We're just demonstrating the *mechanism* of filtering the catalog.
if "Sprite1" in user_message:
return {
"messages": [{
"content": json.dumps({
f"block_id_{generate_block_id()}": {
"opcode": "event_whenflagclicked",
"next": f"block_id_{generate_block_id()}_forever",
"parent": None,
"inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 100, "y": 100
},
f"block_id_{generate_block_id()}_forever": {
"opcode": "control_forever",
"next": None,
"parent": f"block_id_{generate_block_id()}",
"inputs": {
"SUBSTACK": [2, f"block_id_{generate_block_id()}_move"]
}, "fields": {}, "shadow": False, "topLevel": False
},
f"block_id_{generate_block_id()}_move": {
"opcode": "motion_movesteps",
"next": f"block_id_{generate_block_id()}_if",
"parent": f"block_id_{generate_block_id()}_forever",
"inputs": {
"STEPS": [1, [4, "10"]]
}, "fields": {}, "shadow": False, "topLevel": False
},
f"block_id_{generate_block_id()}_if": {
"opcode": "control_if",
"next": None,
"parent": f"block_id_{generate_block_id()}_forever",
"inputs": {
"CONDITION": [2, f"block_id_{generate_block_id()}_touching"],
"SUBSTACK": [2, f"block_id_{generate_block_id()}_turn"]
}, "fields": {}, "shadow": False, "topLevel": False
},
f"block_id_{generate_block_id()}_touching": {
"opcode": "sensing_touchingobject",
"next": None,
"parent": f"block_id_{generate_block_id()}_if",
"inputs": {
"TOUCHINGOBJECTMENU": [1, f"block_id_{generate_block_id()}_touching_menu"]
}, "fields": {}, "shadow": False, "topLevel": False
},
f"block_id_{generate_block_id()}_touching_menu": {
"opcode": "sensing_touchingobjectmenu",
"next": None,
"parent": f"block_id_{generate_block_id()}_touching",
"inputs": {},
"fields": {"TOUCHINGOBJECTMENU": ["_edge_", None]},
"shadow": True, "topLevel": False
},
f"block_id_{generate_block_id()}_turn": {
"opcode": "motion_turnright",
"next": None,
"parent": f"block_id_{generate_block_id()}_if",
"inputs": {
"DEGREES": [1, [4, "15"]]
}, "fields": {}, "shadow": False, "topLevel": False
},
f"block_id_{generate_block_id()}_say": {
"opcode": "looks_sayforsecs",
"next": None,
"parent": None, # This block would typically be part of a separate script
"inputs": {
"MESSAGE": [1, [10, "Hello!"]],
"SECS": [1, [4, "2"]]
}, "fields": {}, "shadow": False, "topLevel": True, "x": 300, "y": 100
}
})
}]
}
elif "Ball" in user_message:
return {
"messages": [{
"content": json.dumps({
f"block_id_{generate_block_id()}": {
"opcode": "event_whenflagclicked",
"next": f"block_id_{generate_block_id()}_moveball",
"parent": None,
"inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 100, "y": 100
},
f"block_id_{generate_block_id()}_moveball": {
"opcode": "motion_movesteps",
"next": f"block_id_{generate_block_id()}_edgebounce",
"parent": f"block_id_{generate_block_id()}",
"inputs": {
"STEPS": [1, [4, "5"]]
}, "fields": {}, "shadow": False, "topLevel": False
},
f"block_id_{generate_block_id()}_edgebounce": {
"opcode": "motion_ifonedgebounce",
"next": None,
"parent": f"block_id_{generate_block_id()}_moveball",
"inputs": {}, "fields": {}, "shadow": False, "topLevel": False
}
})
}]
}
return {"messages": [{"content": "[]"}]} # Default empty response
agent = MockAgent()
# Helper function to generate a unique block ID
def generate_block_id():
return str(uuid.uuid4())[:10].replace('-', '') # Shorten for readability, ensure uniqueness
# Placeholder for your extract_json_from_llm_response function
def extract_json_from_llm_response(response_string):
try:
# Assuming the LLM response is ONLY the JSON string within triple backticks
json_match = response_string.strip().replace("```json", "").replace("```", "").strip()
return json.loads(json_match)
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON from LLM response: {e}")
logger.error(f"Raw response: {response_string}")
raise ValueError("Invalid JSON response from LLM")
# --- GLOBAL CATALOG OF ALL SCRATCH BLOCKS ---
# This is where you would load your block_content.json
# For demonstration, I'm using your provided snippets and adding some common ones.
# In a real application, you'd load this once at startup.
ALL_SCRATCH_BLOCKS_CATALOG = {
"motion_movesteps": {
"opcode": "motion_movesteps", "next": None, "parent": None,
"inputs": {"STEPS": [1, [4, "10"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 464, "y": -416
},
"motion_turnright": {
"opcode": "motion_turnright", "next": None, "parent": None,
"inputs": {"DEGREES": [1, [4, "15"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 467, "y": -316
},
"motion_ifonedgebounce": {
"opcode": "motion_ifonedgebounce", "next": None, "parent": None,
"inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 467, "y": -316
},
"event_whenflagclicked": {
"opcode": "event_whenflagclicked", "next": None, "parent": None,
"inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
},
"event_whenkeypressed": {
"opcode": "event_whenkeypressed", "next": None, "parent": None,
"inputs": {}, "fields": {"KEY_OPTION": ["space", None]}, "shadow": False, "topLevel": True, "x": 10, "y": 10
},
"control_forever": {
"opcode": "control_forever", "next": None, "parent": None,
"inputs": {"SUBSTACK": [2, "some_id"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
},
"control_if": {
"opcode": "control_if", "next": None, "parent": None,
"inputs": {"CONDITION": [2, "some_id"], "SUBSTACK": [2, "some_id_2"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
},
"looks_sayforsecs": {
"opcode": "looks_sayforsecs", "next": None, "parent": None,
"inputs": {"MESSAGE": [1, [10, "Hello!"]], "SECS": [1, [4, "2"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
},
"looks_say": {
"opcode": "looks_say", "next": None, "parent": None,
"inputs": {"MESSAGE": [1, [10, "Hello!"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
},
"sensing_touchingobject": {
"opcode": "sensing_touchingobject", "next": None, "parent": None,
"inputs": {"TOUCHINGOBJECTMENU": [1, "some_id"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
},
"sensing_touchingobjectmenu": {
"opcode": "sensing_touchingobjectmenu", "next": None, "parent": None,
"inputs": {}, "fields": {"TOUCHINGOBJECTMENU": ["_mouse_", None]}, "shadow": True, "topLevel": True, "x": 10, "y": 10
},
# Add more blocks from your block_content.json here...
}
# --- Heuristic-based block selection ---
def get_relevant_blocks_for_plan(action_plan: Dict[str, Any], all_blocks_catalog: Dict[str, Any]) -> Dict[str, Any]:
"""
Analyzes the natural language action plan and selects relevant Scratch blocks
from the comprehensive catalog. This is a heuristic approach and might need
to be refined based on your specific use cases and LLM capabilities.
"""
relevant_opcodes = set()
# Always include common event blocks
relevant_opcodes.add("event_whenflagclicked")
relevant_opcodes.add("event_whenkeypressed") # Could be more specific if key is mentioned
# Keyword to opcode mapping (can be expanded)
keyword_map = {
"move": "motion_movesteps",
"steps": "motion_movesteps",
"turn": "motion_turnright",
"rotate": "motion_turnright",
"bounce": "motion_ifonedgebounce",
"edge": "motion_ifonedgebounce",
"forever": "control_forever",
"loop": "control_forever",
"if": "control_if",
"condition": "control_if",
"say": "looks_say",
"hello": "looks_say", # Simple example, might need more context
"touching": "sensing_touchingobject",
"mouse pointer": "sensing_touchingobjectmenu",
"edge": "sensing_touchingobjectmenu", # For touching edge
}
# Iterate through the action plan to find keywords
for sprite_name, sprite_actions in action_plan.get("action_overall_flow", {}).items():
for plan in sprite_actions.get("plans", []):
event_logic = plan.get("event", "").lower() + " " + plan.get("logic", "").lower()
# Check for direct opcode matches (if the LLM somehow outputs opcodes in its plan)
for opcode in all_blocks_catalog.keys():
if opcode in event_logic:
relevant_opcodes.add(opcode)
# Check for keywords
for keyword, opcode in keyword_map.items():
if keyword in event_logic:
relevant_opcodes.add(opcode)
# Add associated shadow blocks if known
if opcode == "sensing_touchingobject":
relevant_opcodes.add("sensing_touchingobjectmenu")
if opcode == "event_whenkeypressed":
relevant_opcodes.add("event_whenkeypressed") # It's already there but good to be explicit
# Construct the filtered catalog
relevant_blocks_catalog = {
opcode: all_blocks_catalog[opcode]
for opcode in relevant_opcodes if opcode in all_blocks_catalog
}
return relevant_blocks_catalog
# --- New Action Planning Node ---
def plan_sprite_actions(state: GameState):
logger.info("--- Running PlanSpriteActionsNode ---")
planning_prompt = (
f"You are an AI assistant tasked with planning Scratch 3.0 block code for a game. "
f"The game description is: '{state['description']}'.\n\n"
f"Here are the sprites currently in the project: {', '.join(target['name'] for target in state['project_json']['targets'] if not target['isStage']) if len(state['project_json']['targets']) > 1 else 'None'}.\n"
f"Initial positions: {json.dumps(state.get('sprite_initial_positions', {}), indent=2)}\n\n"
f"Consider the main actions and interactions required for each sprite. "
f"Think step-by-step about what each sprite needs to *do*, *when* it needs to do it (events), "
f"and if any actions need to *repeat* or depend on *conditions*.\n\n"
f"Propose a high-level action flow for each sprite in the following JSON format. "
f"Do NOT generate Scratch block JSON yet. Only describe the logic using natural language or simplified pseudo-code.\n\n"
f"Example format:\n"
f"```json\n"
f"{{\n"
f" \"action_overall_flow\": {{\n"
f" \"Sprite1\": {{\n"
f" \"description\": \"Main character actions\",\n"
f" \"plans\": [\n"
f" {{\n"
f" \"event\": \"when flag clicked\",\n"
f" \"logic\": \"forever loop: move 10 steps, if on edge bounce\"\n"
f" }},\n"
f" {{\n"
f" \"event\": \"when space key pressed\",\n"
f" \"logic\": \"change y by 10, wait 0.1 seconds, change y by -10\"\n"
f" }}\n"
f" ]\n"
f" }},\n"
f" \"Ball\": {{\n"
f" \"description\": \"Projectile movement\",\n"
f" \"plans\": [\n"
f" {{\n"
f" \"event\": \"when I start as a clone\",\n"
f" \"logic\": \"glide 1 sec to random position, if touching Sprite1 then stop this script\"\n"
f" }}\n"
f" ]\n"
f" }}\n"
f" }}\n"
f"}}\n"
f"```\n\n"
f"Return ONLY the JSON object for the action overall flow."
)
try:
response = agent.invoke({"messages": [{"role": "user", "content": planning_prompt}]})
raw_response = response["messages"][-1].content
print("Raw response from LLM [PlanSpriteActionsNode]:", raw_response)
action_plan = extract_json_from_llm_response(raw_response)
logger.info("Sprite action plan generated by PlanSpriteActionsNode.")
return {"action_plan": action_plan}
except Exception as e:
logger.error(f"Error in PlanSpriteActionsNode: {e}")
raise
# --- Updated Action Node Builder (to consume the plan and build blocks) ---
def build_action_nodes(state: GameState):
logger.info("--- Running ActionNodeBuilder ---")
action_plan = state.get("action_plan", {})
if not action_plan:
raise ValueError("No action plan found in state. Run PlanSpriteActionsNode first.")
# Convert the Scratch project JSON to a mutable Python object
project_json = state["project_json"]
targets = project_json["targets"]
# We need a way to map sprite names to their actual target objects in project_json
sprite_map = {target["name"]: target for target in targets if not target["isStage"]}
# --- NEW: Get only the relevant blocks for the entire action plan ---
relevant_scratch_blocks_catalog = get_relevant_blocks_for_plan(action_plan, ALL_SCRATCH_BLOCKS_CATALOG)
logger.info(f"Filtered {len(relevant_scratch_blocks_catalog)} relevant blocks out of {len(ALL_SCRATCH_BLOCKS_CATALOG)} total.")
# Iterate through the planned actions for each sprite
for sprite_name, sprite_actions in action_plan.get("action_overall_flow", {}).items():
if sprite_name in sprite_map:
current_sprite_target = sprite_map[sprite_name]
# Ensure 'blocks' field exists for the sprite
if "blocks" not in current_sprite_target:
current_sprite_target["blocks"] = {}
# Generate block JSON based on the detailed action plan for this sprite
# This is where the LLM's role becomes crucial: translating logic to blocks
llm_block_generation_prompt = (
f"You are an AI assistant generating Scratch 3.0 block JSON based on a provided plan. "
f"The current sprite is '{sprite_name}'.\n"
f"Its planned actions are:\n"
f"```json\n{json.dumps(sprite_actions, indent=2)}\n```\n\n"
f"Here is a **curated catalog of only the most relevant Scratch 3.0 blocks** for this plan:\n"
f"```json\n{json.dumps(relevant_scratch_blocks_catalog, indent=2)}\n```\n\n"
f"Current Scratch project JSON (for context, specifically this sprite's existing blocks if any):\n"
f"```json\n{json.dumps(current_sprite_target, indent=2)}\n```\n\n"
f"**Instructions:**\n"
f"1. For each planned event and its associated logic, generate the corresponding Scratch 3.0 block JSON.\n"
f"2. **Generate unique block IDs** for every new block. Use a format like 'block_id_abcdef12'.\n" # Updated ID format hint
f"3. Properly link blocks using `next` and `parent` fields to form execution stacks. Hat blocks (`topLevel: true`, `parent: null`).\n"
f"4. Correctly fill `inputs` and `fields` based on the catalog and the plan's logic (e.g., specific values for motion, keys for events, conditions for controls).\n"
f"5. For C-blocks (like `control_repeat`, `control_forever`, `control_if`), use the `SUBSTACK` input to link to the first block inside its loop/conditional.\n"
f"6. If the plan involves operators (e.g., 'if touching Sprite1'), use the appropriate operator blocks from the catalog and link them correctly as `CONDITION` inputs.\n"
f"7. Ensure that any shadow blocks (e.g., for dropdowns like `motion_goto_menu`, `sensing_touchingobjectmenu`) are generated with `shadow: true` and linked correctly as inputs to their parent block.\n"
f"8. Return ONLY the **updated 'blocks' dictionary** for this specific sprite. Do NOT return the full project JSON. ONLY the `blocks` dictionary."
)
try:
response = agent.invoke({"messages": [{"role": "user", "content": llm_block_generation_prompt}]})
raw_response = response["messages"][-1].content
print(f"Raw response from LLM [ActionNodeBuilder - {sprite_name}]:", raw_response)
generated_blocks = extract_json_from_llm_response(raw_response)
current_sprite_target["blocks"].update(generated_blocks) # Merge new blocks
logger.info(f"Action blocks added for sprite '{sprite_name}' by ActionNodeBuilder.")
except Exception as e:
logger.error(f"Error generating blocks for sprite '{sprite_name}': {e}")
# Depending on robustness needed, you might continue or re-raise
raise
return {"project_json": project_json}
# --- Example Usage (to demonstrate the flow) ---
if __name__ == "__main__":
# Initialize a mock game state
initial_game_state = {
"description": "A simple game where a sprite moves and says hello.",
"project_json": {
"targets": [
{"isStage": True, "name": "Stage", "blocks": {}},
{"isStage": False, "name": "Sprite1", "blocks": {}},
{"isStage": False, "name": "Ball", "blocks": {}}
]
},
"sprite_initial_positions": {}
}
# Step 1: Plan Sprite Actions
try:
state_after_planning = plan_sprite_actions(initial_game_state)
initial_game_state.update(state_after_planning)
print("\n--- Game State After Planning ---")
print(json.dumps(initial_game_state, indent=2))
except Exception as e:
print(f"Planning failed: {e}")
exit()
# Step 2: Build Action Nodes (Generate Blocks)
try:
state_after_building = build_action_nodes(initial_game_state)
initial_game_state.update(state_after_building)
print("\n--- Game State After Building Blocks ---")
# Print only the blocks for a specific sprite to keep output manageable
for target in initial_game_state["project_json"]["targets"]:
if not target["isStage"]:
print(f"\nBlocks for {target['name']}:")
print(json.dumps(target.get('blocks', {}), indent=2))
except Exception as e:
print(f"Building blocks failed: {e}") |