WebashalarForML's picture
Update app.py
1b95385 verified
from flask import Flask, request, jsonify, render_template, send_from_directory
from langgraph.prebuilt import create_react_agent
from langchain_groq import ChatGroq
#from langgraph.graph import draw
from langchain.chat_models import ChatOpenAI
from dotenv import load_dotenv
from langchain_core.utils.utils import secret_from_env
from langchain_openai import ChatOpenAI
from pydantic import Field, SecretStr
from PIL import Image
import os, json, re
import shutil
import uuid
from langgraph.graph import StateGraph, END
import logging
from typing import Dict, TypedDict, Optional, Any, List
from pathlib import Path
# --- Configure logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
app = Flask(__name__)
# Configurations
BLOCKS_FOLDER = "blocks/"
GEN_PROJECT_FOLDER = "generated_projects/"
STATIC_FOLDER = "static/"
BASE_DIR = Path(__file__).parent
BLOCKS_DIR = BASE_DIR / "blocks"
STATIC_DIR = BASE_DIR / "static"
GEN_PROJECT_DIR = BASE_DIR / "generated_projects"
ASSET_DIR = STATIC_DIR / "assets"
BACKDROP_DIR = ASSET_DIR / "backdrops"
SPRITE_DIR = ASSET_DIR / "sprites"
SOUND_DIR = ASSET_DIR / "sounds"
# Create them if they're missing:
for d in (BLOCKS_DIR, STATIC_DIR, GEN_PROJECT_DIR, ASSET_DIR, BACKDROP_DIR, SPRITE_DIR, SOUND_DIR):
d.mkdir(parents=True, exist_ok=True)
# --- LLM / Vision model setup ---
#os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY_2", "default_key_or_placeholder")
os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY", "default_key_or_placeholder")
os.environ["OPENROUTER_API_KEY"] = os.getenv("OPENROUTER_API_KEY", "default_key_or_placeholder")
os.environ["OPENROUTER_BASE_URL"] = os.getenv("OPENROUTER_BASE_URL", "default_key_or_placeholder")
groq_key = os.environ["GROQ_API_KEY"]
if not groq_key or groq_key == "default_key_or_placeholder":
logger.critical("GROQ_API_KEY environment variable is not set or invalid. Please set it to proceed.")
raise ValueError("GROQ_API_KEY environment variable is not set or invalid.")
#Main LLM for the SCRATCH 3.0 Agent
llm = ChatGroq(
#model="deepseek-r1-distill-llama-70b",
#model="llama-3.3-70b-versatile",
#model="gemma2-9b-it",
#model="compound-beta-mini",
#model="meta-llama/llama-4-maverick-17b-128e-instruct",
model="meta-llama/llama-4-scout-17b-16e-instruct",
temperature=0.0,
)
# Json debugger [temporary]
llm2 = ChatGroq(
model="deepseek-r1-distill-llama-70b",
#model="llama-3.3-70b-versatile",
#model="gemma2-9b-it",
#model="compound-beta-mini",
#model="meta-llama/llama-4-maverick-17b-128e-instruct",
#model="meta-llama/llama-4-scout-17b-16e-instruct",
temperature=0.0,
)
class ChatOpenRouter(ChatOpenAI):
openai_api_key: Optional[SecretStr] = Field(
alias="api_key",
default_factory=secret_from_env("OPENROUTER_API_KEY", default=None),
)
@property
def lc_secrets(self) -> dict[str, str]:
return {"openai_api_key": "OPENROUTER_API_KEY"}
def __init__(self,
openai_api_key: Optional[str] = None,
**kwargs):
openai_api_key = (
openai_api_key or os.environ.get("OPENROUTER_API_KEY")
)
super().__init__(
base_url="https://openrouter.ai/api/v1",
openai_api_key=openai_api_key,
**kwargs
)
# llm = ChatOpenRouter(
# #model_name="deepseek/deepseek-r1-0528:free",
# #model_name="google/gemini-2.0-flash-exp:free",
# #model_name="deepseek/deepseek-v3-base:free",
# model_name="deepseek/deepseek-r1:free"
# )
# Refined SYSTEM_PROMPT with more explicit Scratch JSON rules, especially for variables
SYSTEM_PROMPT = """
You are an expert AI assistant named GameScratchAgent, specialized in generating and modifying Scratch-VM 3.x game project JSON.
Your core task is to process game descriptions and existing Scratch JSON structures, then produce or update JSON segments accurately.
You possess deep knowledge of Scratch 3.0 project schema, informed by comprehensive reference materials. When generating or modifying the `blocks` section, pay extremely close attention to the following:
**Scratch Project JSON Schema Rules:**
1. **Target Structure (`project.json`'s `targets` array):**
* Each object in the `targets` array represents a Stage or a Sprite.
* `isStage`: A boolean indicating if the target is the Stage (`true`) or a Sprite (`false`).
* `name`: The name of the Stage (e.g., `"Stage"`) or the Sprite (e.g., `"Cat"`). This property replaces `objName` found in older Scratch versions.
* `variables` dictionary: This dictionary maps unique variable IDs to arrays `[variable_name, initial_value, isCloudVariable?]`.
* `variable_name`: The user-defined name of the variable.
* `initial_value`: The variable's initial value, which can be a number or a string.
* `isCloudVariable?`: (Optional) A boolean indicating if it's a cloud variable (`true`) or a local variable (`false` or absent for regular variables).
* Example: `"myVarId123": ["score", 0]`, `"cloudVarId456": ["☁ High Score", "54", true]`
* `lists` dictionary: This dictionary maps unique list IDs to arrays `[list_name, [item1, item2, ...]]`.
* Example: `"myListId789": ["my list", ["apple", "banana"]]`
* `broadcasts` dictionary: This dictionary maps unique broadcast IDs to their names.
* Example: `"myBroadcastId": "Game Over"`
* `blocks` dictionary: This dictionary contains all the blocks belonging to this target. Keys are block IDs, values are block objects.
2. **Block Structure (within a `target`'s `blocks` dictionary):**
* Every block object must have the following core properties:
* [cite_start]`opcode`: A unique internal identifier for the block's specific functionality (e.g., `"motion_movesteps"`, `"event_whenflagclicked"`)[cite: 31, 18, 439, 452].
* `parent`: The ID of the block directly above it in the script stack (or `null` for a top-level block).
* `next`: The ID of the block directly below it in the script stack (or `null` for the end of a stack).
* `inputs`: An object defining values or blocks plugged into the block's input slots. Values are **arrays**.
* `fields`: An object defining dropdown menu selections or direct internal values within the block. Values are **arrays**.
* `shadow`: `true` if it's a shadow block (e.g., a default number input that can be replaced by another block), `false` otherwise.
* `topLevel`: `true` if it's a hat block or a standalone block (not connected to a parent), `false` otherwise.
3. **`inputs` Property Details (for blocks plugged into input slots):**
* **Direct Block Connection (Reporter/Boolean block plugged in):**
* Format: `"<INPUT_NAME>": [1, "<blockId_of_plugged_block>"]`
* Example: `"CONDITION": [1, "someBooleanBlockId"]` (e.g., for an `if` block).
* **Literal Value Input (Shadow block with a literal):**
* Format: `"<INPUT_NAME>": [1, [<type_code>, "<value_string>"]]`
* `type_code`: A numeric code representing the data type. Common codes include: `4` for number, `7` for string/text, `10` for string/message.
* `value_string`: The literal value as a string.
* Examples:
* Number: `"STEPS": [1, [4, "10"]]` (for `move 10 steps` block).
* String/Text: `"MESSAGE": [1, [7, "Hello"]]` (for `say Hello` block).
* String/Message (common for text inputs): `"MESSAGE": [1, [10, "Hello!"]]` (for `say Hello! for 2 secs`).
* **C-Block Substack (blocks within a loop or conditional):**
* Format: `"<SUBSTACK_NAME>": [2, "<blockId_of_first_block_in_substack>"]`
* Common `SUBSTACK_NAME` values are `SUBSTACK` (for `if`, `forever`, `repeat`) and `SUBSTACK2` (for `else` in `if else`).
* Example: `"SUBSTACK": [2, "firstBlockInLoopId"]`
4. **`fields` Property Details (for dropdowns or direct internal values):**
* Used for dropdown menus, variable names, list names, or other static selections directly within the block.
* Format: `"<FIELD_NAME>": ["<selected_value>", null]`
* Examples:
* Dropdown: `"KEY_OPTION": ["space", null]` (for `when space key pressed`).
* Variable Name: `"VARIABLE": ["score", null]` (for `set score to 0`).
* Direction (specific motion block): `"FORWARD_BACKWARD": ["forward", null]` (for `go forward layers`).
5. **Unique IDs:**
* All block IDs, variable IDs, and list IDs must be unique strings (e.g., "myBlock123", "myVarId456", "myListId789"). Do NOT use placeholder strings like "block_id_here".
6. **No Nested `blocks` Dictionary:**
* The `blocks` dictionary should only appear once per `target` (sprite/stage). Do NOT nest a `blocks` dictionary inside an individual block definition. Blocks that are part of a substack are linked via the `SUBSTACK` input.
7. **Asset Properties (for Costumes/Sounds):**
* `assetId`, `md5ext`, `bitmapResolution`, `rotationCenterX`/`rotationCenterY` should be correctly associated with costume and sound objects within the `costumes` and `sounds` arrays.
**General Principles and Important Considerations:**
* **Backward Compatibility:** Adhere strictly to existing Scratch 3.0 opcodes and schema to ensure backward compatibility with older projects. [cite_start]Opcodes must remain consistent to prevent previously saved projects from failing to load or behaving unexpectedly[cite: 18, 19, 25, 65].
* **Forgiving Inputs:** Recognize that Scratch is designed to be "forgiving in its interpretation of inputs." [cite_start]The Scratch VM handles potentially "invalid" inputs gracefully (e.g., converting a number to a string if expected, returning default values like zero or empty strings, or performing no action) rather than crashing[cite: 20, 21, 22, 38, 39, 41]. This implies that precise type matching for inputs might be handled internally by Scratch, allowing for some flexibility in how values are provided, but the agent should aim for the most common and logical type.
"""
SYSTEM_PROMPT_JSON_CORRECTOR ="""
You are an assistant that outputs JSON responses strictly following the given schema.
If the JSON you produce has any formatting errors, missing required fields, or invalid structure, you must identify the problems and correct them.
Always return only valid JSON that fully conforms to the schema below, enclosed in triple backticks (```), without any extra text or explanation.
If you receive an invalid or incomplete JSON response, fix it by:
- Adding any missing required fields with appropriate values.
- Correcting syntax errors such as missing commas, brackets, or quotes.
- Ensuring the JSON structure matches the schema exactly.
Remember: Your output must be valid JSON only, ready to be parsed without errors.
"""
# Main agent of the system agent for Scratch 3.0
agent = create_react_agent(
model=llm,
tools=[], # No specific tools are defined here, but could be added later
prompt=SYSTEM_PROMPT
)
# debugger and resolver agent for Scratch 3.0
agent_json_resolver = create_react_agent(
model=llm,
tools=[], # No specific tools are defined here, but could be added later
prompt=SYSTEM_PROMPT_JSON_CORRECTOR
)
### LangGraph Workflow Definition
# Define a state for your graph using TypedDict
class GameState(TypedDict):
project_json: dict
description: str
project_id: str
sprite_initial_positions: dict # Add this as well, as it's part of your state
action_plan: Optional[Dict]
#behavior_plan: Optional[Dict]
improvement_plan: Optional[Dict]
needs_improvement: bool
plan_validation_feedback: Optional[Dict]
iteration_count: int # Track the number of iterations for improvements
review_block_feedback: Optional[Dict] # Feedback from the agent on the blocks after verification
declaration_plan: Optional[Dict]
temporary_node: Optional[Dict]
# Helper function to update project JSON with sprite positions
import copy
def update_project_with_sprite_positions(project_json: dict, sprite_positions: dict) -> dict:
"""
Update the 'x' and 'y' coordinates of sprites in the Scratch project JSON.
Args:
project_json (dict): Original Scratch project JSON.
sprite_positions (dict): Dict mapping sprite names to {'x': int, 'y': int}.
Returns:
dict: Updated project JSON with new sprite positions.
"""
updated_project = copy.deepcopy(project_json)
for target in updated_project.get("targets", []):
if not target.get("isStage", False):
sprite_name = target.get("name")
if sprite_name in sprite_positions:
pos = sprite_positions[sprite_name]
if "x" in pos and "y" in pos:
target["x"] = pos["x"]
target["y"] = pos["y"]
return updated_project
# Helper function to load the block catalog from a JSON file
def _load_block_catalog(block_type: str) -> Dict:
"""
Loads the Scratch block catalog named '{block_type}_blocks.json'
from the <project_root>/blocks/ folder. Returns {} on any error.
"""
catalog_path = BLOCKS_DIR / f"{block_type}.json"
try:
text = catalog_path.read_text() # will raise FileNotFoundError if missing
catalog = json.loads(text) # will raise JSONDecodeError if malformed
logger.info(f"Successfully loaded block catalog from {catalog_path}")
return catalog
except FileNotFoundError:
logger.error(f"Error: Block catalog file not found at {catalog_path}")
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON from {catalog_path}: {e}")
except Exception as e:
logger.error(f"Unexpected error loading {catalog_path}: {e}")
# In all error cases, return an empty dict so the rest of your app can continue
return {}
# --- Global variable for the block catalog ---
ALL_SCRATCH_BLOCKS_CATALOG = {}
#BLOCK_CATALOG_PATH = r"blocks\blocks.json" # Define the path to your JSON file
#HAT_BLOCKS_PATH = r"blocks\hat_blocks.json" # Path to the hat blocks JSON file
#STACK_BLOCKS_PATH = r"blocks\stack_blocks.json" # Path to the stack blocks JSON file
#REPORTER_BLOCKS_PATH = r"blocks\reporter_blocks.json" # Path to the reporter blocks JSON file
#BOOLEAN_BLOCKS_PATH = r"blocks\boolean_blocks.json" # Path to the boolean blocks JSON file
#C_BLOCKS_PATH = r"blocks\c_blocks.json" # Path to the C blocks JSON file
#CAP_BLOCKS_PATH = r"blocks\cap_blocks.json" # Path to the cap blocks JSON file
BLOCK_CATALOG_PATH = "blocks" # Define the path to your JSON file
HAT_BLOCKS_PATH = "hat_blocks" # Path to the hat blocks JSON file
STACK_BLOCKS_PATH = "stack_blocks" # Path to the stack blocks JSON file
REPORTER_BLOCKS_PATH = "reporter_blocks" # Path to the reporter blocks JSON file
BOOLEAN_BLOCKS_PATH = "boolean_blocks" # Path to the boolean blocks JSON file
C_BLOCKS_PATH = "c_blocks" # Path to the C blocks JSON file
CAP_BLOCKS_PATH = "cap_blocks" # Path to the cap blocks JSON file
# Load the block catalogs from their respective JSON files
# ================== HAT BLOCK ===============================================
hat_block_data = _load_block_catalog(HAT_BLOCKS_PATH)
#hat_description = hat_block_data["description"]
hat_description = hat_block_data.get("description", "No description available")
#hat_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in hat_block_data["blocks"]])
hat_opcodes_functionalities = "\n".join([
f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}"
for block in hat_block_data.get("blocks", [])
]) if isinstance(hat_block_data.get("blocks"), list) else " No blocks information available."
print("Hat blocks loaded successfully.", hat_description)
# ================== BOOLEAN BLOCK ===============================================
boolean_block_data = _load_block_catalog(BOOLEAN_BLOCKS_PATH)
#boolean_description = boolean_block_data["description"]
boolean_description = boolean_block_data.get("description", "No description available")
#boolean_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in boolean_block_data["blocks"]])
boolean_opcodes_functionalities = "\n".join([
f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}"
for block in boolean_block_data.get("blocks", [])
]) if isinstance(boolean_block_data.get("blocks"), list) else " No blocks information available."
# ================== C BLOCK ===============================================
c_block_data = _load_block_catalog(C_BLOCKS_PATH)
#c_description = c_block_data["description"]
c_description = c_block_data.get("description", "No description available")
#c_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in c_block_data["blocks"]])
c_opcodes_functionalities = "\n".join([
f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}"
for block in c_block_data.get("blocks", [])
]) if isinstance(c_block_data.get("blocks"), list) else " No blocks information available."
# ================== CAP BLOCK ===============================================
cap_block_data = _load_block_catalog(CAP_BLOCKS_PATH)
#cap_description = cap_block_data["description"]
cap_description = cap_block_data.get("description", "No description available")
#cap_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in cap_block_data["blocks"]])
cap_opcodes_functionalities = "\n".join([
f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}"
for block in cap_block_data.get("blocks", [])
]) if isinstance(cap_block_data.get("blocks"), list) else " No blocks information available."
# ================== REPORTER BLOCK ===============================================
reporter_block_data = _load_block_catalog(REPORTER_BLOCKS_PATH)
#reporter_description = reporter_block_data["description"]
reporter_description = reporter_block_data.get("description", "No description available")
#reporter_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in reporter_block_data["blocks"]])
reporter_opcodes_functionalities = "\n".join([
f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}"
for block in reporter_block_data.get("blocks", [])
]) if isinstance(reporter_block_data.get("blocks"), list) else " No blocks information available."
# ================== STACK BLOCK ===============================================
stack_block_data = _load_block_catalog(STACK_BLOCKS_PATH)
#stack_description = stack_block_data["description"]
stack_description = stack_block_data.get("description", "No description available")
#stack_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in stack_block_data["blocks"]])
stack_opcodes_functionalities = "\n".join([
f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}"
for block in stack_block_data.get("blocks", [])
]) if isinstance(stack_block_data.get("blocks"), list) else " No blocks information available."
# This makes ALL_SCRATCH_BLOCKS_CATALOG available globally
ALL_SCRATCH_BLOCKS_CATALOG = _load_block_catalog(BLOCK_CATALOG_PATH)
# Helper function to generate a unique block ID
def generate_block_id():
"""Generates a short, unique ID for a Scratch block."""
return str(uuid.uuid4())[:10].replace('-', '') # Shorten for readability, ensure uniqueness
# Placeholder for your extract_json_from_llm_response function
# # Helper function to extract JSON from LLM response
def extract_json_from_llm_response(raw_response: str) -> dict:
# --- 1) Pull out the JSON code‑block if present ---
md = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", raw_response)
json_string = md.group(1).strip() if md else raw_response
# --- 2) Trim to the outermost { … } so we drop any prefix/suffix junk ---
first, last = json_string.find('{'), json_string.rfind('}')
if 0 <= first < last:
json_string = json_string[first:last+1]
# --- 3) PRE‑CLEANUP: remove stray assistant{…}, rogue assistant keys, fix boolean quotes ---
json_string = re.sub(r'\b\w+\s*{', '{', json_string)
json_string = re.sub(r'"assistant"\s*:', '', json_string)
json_string = re.sub(r'\b(false|true)"', r'\1', json_string)
logger.debug("Ran pre‑cleanup for stray tokens and boolean quotes.")
# --- 3.1) Fix stray inner quotes at start of name/list values ---
# e.g., { "name": " \"recent_scoress\"", ... } → "recent_scoress"
json_string = re.sub(
r'("name"\s*:\s*")\s*"',
r'\1',
json_string
)
# --- 4) Escape all embedded quotes in any `logic` value up to the next key ---
def _esc(m):
prefix, body = m.group(1), m.group(2)
return prefix + body.replace('"', r'\"')
json_string = re.sub(
r'("logic"\s*:\s*")([\s\S]+?)(?=",\s*"[A-Za-z_]\w*"\s*:\s*)',
_esc,
json_string
)
logger.debug("Escaped embedded quotes in logic fields.")
logger.debug("Quoted unquoted keys.")
# --- 6) Remove trailing commas before } or ] ---
json_string = re.sub(r',\s*(?=[}\],])', '', json_string)
json_string = re.sub(r',\s*,', ',', json_string)
logger.debug("Removed trailing commas.")
# --- 7) Balance braces: drop extra } at end if needed ---
ob, cb = json_string.count('{'), json_string.count('}')
if cb > ob:
excess = cb - ob
json_string = json_string.rstrip()[:-excess]
logger.debug(f"Stripped {excess} extra closing brace(s).")
# --- 8) Escape literal newlines in *all* string values ---
json_string = re.sub(
r'"((?:[^"\\]|\\.)*?)"',
lambda m: '"' + m.group(1).replace('\n', '\\n').replace('\r', '\\r') + '"',
json_string,
flags=re.DOTALL
)
logger.debug("Escaped newlines in strings.")
# --- 9) Final parse attempt ---
try:
return json.loads(json_string)
except json.JSONDecodeError:
logger.error("Sanitized JSON still invalid:\n%s", json_string)
raise
def strip_noise(s: str) -> str:
# 1. Remove any <|…|> markers
s = re.sub(r"<\|.*?\|>", "", s)
# 2. Remove any 'assistant<' plus following non‑whitespace
s = re.sub(r"assistant<\S*", "", s)
# 3. Strip stray angle‑brackets
s = re.sub(r"[<>]", "", s)
# 4. Fix malformed empty keys like sensing"": → sensing":
s = re.sub(r'(\w+)""', r'\1"', s)
# 5. Deduplicate any immediately–repeated JSON keys, e.g.
# "control": […], "control": […] → keep only the first
s = re.sub(
r'("(?P<key>\w+)":\s*(?P<first>\[[^\]]*\]))\s*,\s*"\2":\s*\[[^\]]*\]',
lambda m: m.group(1),
s
)
# 6. Collapse multiple blank lines and extra spaces
s = re.sub(r"\n\s*\n+", "\n\n", s)
s = re.sub(r" {2,}", " ", s)
return s.strip()
# Node 1: Detailed description generator.
def game_description_node(state: GameState):
"""
Generates a detailed narrative description of the game based on the initial query.
"""
logger.info("--- Running GameDescriptionNode ---")
sprite_name = {}
initial_description = state.get("description", "A simple game.")
project_json = state["project_json"]
for target in project_json["targets"]:
sprite_name[target["name"]] = target["name"]
description_prompt = (
f"You are an AI assistant tasked with generating a detailed narrative description for a Scratch 3.0 game.\n"
f"The initial high-level description is: '{initial_description}'.\n"
f"The current Scratch project JSON is:\n```json\n{json.dumps(state['project_json'], indent=2)}\n```\n"
f"Make sure you donot change Sprite and Stage name. Here are all the name: {sprite_name} \n"
f"Create a rich, engaging, and detailed description, including potential gameplay elements, objectives, and overall feel with the available resources\n"
f"The output should be a plain text description."
)
try:
response = agent.invoke({"messages": [{"role": "user", "content": description_prompt}]})
detailed_game_description = response["messages"][-1].content
state["description"] = detailed_game_description#strip_noise(detailed_game_description)
logger.info("Detailed game description generated by GameDescriptionNode.")
print(f"Detailed Game Description: {detailed_game_description}")
return state
except Exception as e:
logger.error(f"Error in GameDescriptionNode: {e}")
raise
# Node 2: Analysis User Query and Initial Positions
def parse_query_and_set_initial_positions(state: GameState):
logger.info("--- Running ParseQueryNode ---")
# the Stage's center is `(0,0)` and height is from `(0,-200)` to `(0,200)` and width is `(-200,0)` to `(200,0)`, [temporaryly used 150 by -150 or 130 to -130]
llm_query_prompt = f"""Based on the user's game description: '{state['description']}', \
Stage description: the Stage's center is `(0,0)` and height is from `(0,-130)` to `(0,130)` and width is `(-130,0)` to `(130,0)`,
and the current Scratch project JSON below, \
determine the most appropriate initial 'x' and 'y' coordinates for each sprite. \
Return ONLY a JSON object with a single key 'sprite_initial_positions' mapping sprite names to their {{'x': int, 'y': int}} coordinates.
The current Scratch project JSON is:
```json
{json.dumps(state['project_json'], indent=2)}
```
Example Json output:
```json
{{
"sprite_initial_positions": {{
"Sprite1": {{"x": -150, "y": -120}},
"Sprite2": {{"x": 150, "y": -120}}
}}
}}
```
"""
try:
response = agent.invoke({"messages": [{"role": "user", "content": llm_query_prompt}]})
raw_response = response["messages"][-1].content#strip_noise(response["messages"][-1].content)
print("Raw response from LLM:", raw_response)
# json debugging and solving
try:
updated_data = extract_json_from_llm_response(raw_response)
sprite_positions = updated_data.get("sprite_initial_positions", {})
except json.JSONDecodeError as error_json:
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.")
# Use the JSON resolver agent to fix the response
correction_prompt = (
"Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n"
"Carefully review the JSON for any errors, especially focusing on the reported error at:\n"
f"- **Error Details**: {error_json}\n\n"
"**Strict Instructions for your response:**\n"
"1. **ONLY** output the corrected JSON. Do not include any other text, comments, or explanations outside the JSON.\n"
"2. Ensure all property names (keys) are enclosed in **double quotes**.\n"
"3. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\\"`) are properly **escaped**.\n"
"4. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n"
"5. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n"
"6. The corrected JSON must be a **complete and valid** JSON object.\n\n"
"Here is the problematic JSON string to correct:\n"
"```json\n" # Use markdown for clear JSON distinction
f"{raw_response}\n"
"```\n"
"Corrected JSON:\n" # Indicate where the corrected JSON should start
)
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
print(f"[JSON CORRECTOR RESPONSE AT PARSER]: {correction_response['messages'][-1].content}")
corrected_data = extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content))
sprite_positions = corrected_data.get("sprite_initial_positions", {})
new_project_json = update_project_with_sprite_positions(state["project_json"], sprite_positions)
state["project_json"]= new_project_json
print("Updated project JSON with sprite positions:", json.dumps(new_project_json, indent=2))
# [TEMPORARY FOR LOGGING]
# Optional: Save raw_response that failed parsing
#with open("debug_state.json", "w", encoding="utf-8") as f:
# json.dump(state, f, indent=2, ensure_ascii=False)
return state
#return {"project_json": new_project_json}
except Exception as e:
logger.error(f"Error in ParseQueryNode: {e}")
raise
# Node 3: Declaration Planner Node (Updated)
def declaration_planner_node(state: GameState):
"""
Generates a complete declaration plan that defines:
- variables and cloud variables
- lists
- broadcast messages
- monitors (usually tied to reporter blocks or variables)
"""
logger.info("--- Running DeclarationPlannerNode ---")
description = state.get("description", "")
# project_json = state["project_json"] # Not directly used in planning prompt, but good to have
planning_prompt = f"""Game Description:
{description}
You are given the game design and are tasked to prepare a full declaration plan for Scratch 3.0.
Here is the description to have opcode in the monitor which are mostly reporting block and the target is always **Stage**
--- Scratch 3.0 Block Reference ---
### Reporter Blocks
Description: {reporter_description}
Blocks:
{reporter_opcodes_functionalities}
Your output must be valid JSON and follow this exact structure given as the example:
```json
{{
"declaration_plan": {{
"variables": [
{{ "name": "score", "default": 0, "cloud": false }},
{{ "name": "☁ High Score", "default": 0, "cloud": true }},
{{ "name": "speed", "default": 0, "cloud": false }}
],
"lists": [
{{ "name": "recent_scores", "default": [] }},
{{ "name": "tools", "default": [] }}
],
"broadcasts": [
"Game Over",
"Game Start"
],
"monitors": [
{{ "target": "Stage", "variable": "score", "visible": true, "opcode_hint": "data_variable" }},
{{ "target": "Stage", "variable": "☁ High Score", "visible": true, "opcode_hint": "data_variable" }},
{{ "target": "Stage", "reporter_name": "costume #", "visible": false, "opcode_hint": "looks_costumenumbername", "param_name": "NUMBER_NAME" }}
{{ "target": "Stage", "reporter_name": "costume #", "visible": false, "opcode_hint": "looks_costumenumbername", "param_name": "NUMBER_NAME" }}
]
}}
}}
```
Guidelines:
- Add meaningful variables to represent game mechanics (e.g., score, health, level, etc).
- If a variable is shared across devices or saved to the cloud, mark it as `cloud: true`.
- Lists should reflect repeated values or game logs like `recent_scores`, `highscore_history` etc.
- Broadcasts should represent game state changes (e.g., Game Over, Game Start, Reset Game).
- Monitors include on-screen displayed data like variable reports, costume index, or stage backdrop number.
- For monitors, include an `opcode_hint` to suggest the Scratch block opcode that will report the value (e.g., `data_variable`, `motion_xposition`, `looks_costumenumbername`).
- If a monitor needs a specific parameter (like 'number' or 'name' for costume), add a `param_name` field (e.g., `"param_name": "NUMBER_NAME"`).
- Use variable and broadcast names that are clean, human-readable, and contextually relevant.
- Use the actual names given in description as the json is case sensitive **do not** assume the names.
- Avoid duplicating variable names. Always use the same case-sensitive name.
Start generating the `declaration_plan` for this game."""
try:
response = agent.invoke({"messages": [{"role": "user", "content": planning_prompt}]})
raw_response = response["messages"][-1].content
print("Raw response from LLM [DeclarationPlannerNode]:", raw_response)
try:
declaration_plan = extract_json_from_llm_response(raw_response)
except json.JSONDecodeError as error_json:
logger.warning("Malformed JSON detected. Attempting to correct...")
correction_prompt = (
"Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n"
"Carefully review the JSON for any errors, especially focusing on the reported error at:\n"
f"- **Error Details**: {error_json}\n\n"
"**Strict Instructions for your response:**\n"
"1. **ONLY** output the corrected JSON. Do not include any other text, comments, or explanations outside the JSON.\n"
"2. Ensure all property names (keys) are enclosed in **double quotes**.\n"
"3. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\\"`) are properly **escaped**.\n"
"4. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n"
"5. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n"
"6. **Crucially, remove any extraneous characters or duplicate closing braces outside the main JSON object.**\n"
"7. The corrected JSON must be a **complete and valid** JSON object.\n\n"
"Here is the problematic JSON string to correct:\n"
"```json\n"
f"{raw_response}\n"
"```\n"
"Corrected JSON:\n"
)
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
declaration_plan = extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content))
state["declaration_plan"] = declaration_plan
logger.info("Declaration plan successfully generated.")
# Save debug
#with open("debug_declaration_plan.json", "w", encoding="utf-8") as f:
# json.dump(declaration_plan, f, indent=2, ensure_ascii=False)
return state
except Exception as e:
logger.error(f"DeclarationPlannerNode failed: {e}")
raise # Re-raise to ensure error is propagated
# Node 4: Declaration Builder Node
def declaration_builder_node(state: GameState):
logger.info("--- Running DeclarationBuilderNode ---")
declaration_plan = state.get("declaration_plan")
if not declaration_plan:
logger.warning("No declaration plan found in state. Skipping DeclarationBuilderNode.")
return state
project_json = state["project_json"]
targets = project_json["targets"]
# Find the Stage target, which holds global variables, lists, and broadcasts
stage_target = next((target for target in targets if target["isStage"]), None)
if not stage_target:
logger.error("Stage target not found in project_json. Cannot build declarations.")
return state
# Initialize sections if they don't exist
if "variables" not in stage_target:
stage_target["variables"] = {}
if "lists" not in stage_target:
stage_target["lists"] = {}
if "broadcasts" not in stage_target:
stage_target["broadcasts"] = {}
if "monitors" not in project_json: # Monitors are typically at the project root
project_json["monitors"] = []
# 1. Build Variables
logger.info("Building variables...")
for var_data in declaration_plan.get("declaration_plan", {}).get("variables", []):
var_name = var_data["name"]
var_default = str(var_data["default"]) # Scratch stores values as strings
var_cloud = var_data["cloud"]
# Generate a unique ID for the variable
var_id = f"{var_name}_{str(uuid.uuid4())[:8]}" # Using UUID for uniqueness
# Check if variable already exists (e.g., if a previous run created it)
# This part requires careful handling of existing IDs, usually by looking them up by name
existing_var_id = None
for existing_id, existing_var_info in stage_target["variables"].items():
if existing_var_info[0] == var_name:
existing_var_id = existing_id
break
if existing_var_id:
# Update existing variable if found
stage_target["variables"][existing_var_id][1] = var_default
if len(stage_target["variables"][existing_var_id]) > 2:
stage_target["variables"][existing_var_id][2] = var_cloud
else:
if var_cloud: # Only add true for cloud variables if not present
stage_target["variables"][existing_var_id].append(True)
logger.info(f"Updated variable: {var_name}")
else:
# Add new variable
variable_definition = [var_name, var_default]
if var_cloud:
variable_definition.append(True)
stage_target["variables"][var_id] = variable_definition
logger.info(f"Added new variable: {var_name}")
# 2. Build Lists
logger.info("Building lists...")
for list_data in declaration_plan.get("declaration_plan", {}).get("lists", []):
list_name = list_data["name"]
list_default = list_data["default"]
# Generate a unique ID for the list
list_id = f"{list_name}_{str(uuid.uuid4())[:8]}"
# Check for existing list
existing_list_id = None
for existing_id, existing_list_info in stage_target["lists"].items():
if existing_list_info[0] == list_name:
existing_list_id = existing_id
break
if existing_list_id:
# Update existing list
stage_target["lists"][existing_list_id][1] = list_default
logger.info(f"Updated list: {list_name}")
else:
# Add new list
stage_target["lists"][list_id] = [list_name, list_default]
logger.info(f"Added new list: {list_name}")
# 3. Build Broadcasts
logger.info("Building broadcasts...")
for broadcast_name in declaration_plan.get("declaration_plan", {}).get("broadcasts", []):
# Generate a unique ID for the broadcast (Scratch uses IDs for broadcasts internally)
broadcast_id = f"{broadcast_name}_{str(uuid.uuid4())[:8]}"
# Check if broadcast already exists
existing_broadcast_id = None
for existing_id, existing_broadcast_name in stage_target["broadcasts"].items():
if existing_broadcast_name == broadcast_name:
existing_broadcast_id = existing_id
break
if existing_broadcast_id:
logger.info(f"Broadcast '{broadcast_name}' already exists.")
else:
stage_target["broadcasts"][broadcast_id] = broadcast_name
logger.info(f"Added new broadcast: {broadcast_name}")
# 4. Build Monitors
logger.info("Building monitors...")
monitor_y_offset = 0 # Starting Y position for monitors
for monitor_data in declaration_plan.get("declaration_plan", {}).get("monitors", []):
monitor_target = monitor_data.get("target")
monitor_visible = monitor_data.get("visible", False)
opcode_hint = monitor_data.get("opcode_hint")
param_name = monitor_data.get("param_name") # For things like costume # or name
# Determine the actual target object in project_json
actual_target = next((t for t in targets if t["name"] == monitor_target), None)
if not actual_target:
logger.warning(f"Monitor target '{monitor_target}' not found. Skipping monitor.")
continue
monitor_block_prompt_data = {
"opcode_hint": opcode_hint,
"target_name": monitor_target,
"visible": monitor_visible,
"current_project_json": project_json # Provide full project JSON for context to LLM
}
# Add variable/reporter specific data
if "variable" in monitor_data:
monitor_block_prompt_data["variable_name"] = monitor_data["variable"]
elif "reporter_name" in monitor_data:
monitor_block_prompt_data["reporter_name"] = monitor_data["reporter_name"]
if param_name:
monitor_block_prompt_data["param_name"] = param_name
llm_monitor_block_generation_prompt = (
f"You are an AI assistant tasked with generating the Scratch block JSON for a monitor.\n"
f"The monitor details are:\n"
f"- Target Sprite/Stage: '{monitor_block_prompt_data['target_name']}'\n"
f"- Desired Opcode Hint: '{monitor_block_prompt_data['opcode_hint']}'\n"
f"- Visibility: {monitor_block_prompt_data['visible']}\n"
)
if "variable_name" in monitor_block_prompt_data:
llm_monitor_block_generation_prompt += f"- Variable Name: '{monitor_block_prompt_data['variable_name']}'\n"
if "reporter_name" in monitor_block_prompt_data:
llm_monitor_block_generation_prompt += f"- Reporter Name: '{monitor_block_prompt_data['reporter_name']}'\n"
if "param_name" in monitor_block_prompt_data:
llm_monitor_block_generation_prompt += f"- Parameter Name: '{monitor_block_prompt_data['param_name']}'\n"
llm_monitor_block_generation_prompt += (
f"\nHere is a catalog of reporter blocks that might be relevant, including their `op_code` and parameters:\n"
f"```json\n{json.dumps(reporter_block_data, indent=2)}\n```\n\n" # Provide reporter blocks catalog
f"Current Scratch project JSON (for context, especially existing variables/lists):\n"
f"```json\n{json.dumps(project_json, indent=2)}\n```\n\n"
f"**CRITICAL INSTRUCTIONS FOR GENERATING THE MONITOR BLOCK JSON:**\n"
f"1. **Output ONLY the JSON object for the `monitor_block`.** Do NOT include any other text or markdown fences like ```json.\n"
f"2. The top-level key in your output should be `monitor_block` and its value should be the block definition.\n"
f"3. Generate a **globally unique ID** for the `monitor_block`.\n"
f"4. Set the `opcode` field based on the `opcode_hint` provided (e.g., `data_variable`, `motion_xposition`, `looks_costumenumbername`).\n"
f"5. The `mode` should typically be `\"default\"` for standard monitors.\n"
f"6. For variable monitors (`data_variable` opcode), the `params` field MUST contain `\"VARIABLE\": \"{{variable_name}}\"`.\n"
f"7. For reporter blocks that have parameters (like `looks_costumenumbername`), the `params` field should contain the correct parameter name (e.g., `\"NUMBER_NAME\": \"number\"` or `\"NUMBER_NAME\": \"name\"`). Refer to the `reporter_block_data` for parameter names.\n"
f"8. Set `spriteName` to the actual target sprite's name, or `null` if it's a Stage variable/reporter.\n"
f"9. Set `value` to a reasonable default (e.g., \"0\" for numbers, \"\" for strings, or the current value from project_json for existing variables).\n"
f"10. Set `visible` to `true` or `false` as specified in the plan.\n"
f"11. Provide sensible `x` and `y` coordinates. For demonstration, increment `y` for each new monitor.\n"
f"12. `sliderMin`, `sliderMax`, and `isDiscrete` are typically for variables; set them appropriately if known or use defaults (0, 100, true).\n"
f"13. Ensure all keys are double-quoted.\n"
)
try:
response = agent.invoke({"messages": [{"role": "user", "content": llm_monitor_block_generation_prompt}]})
raw_response = response["messages"][-1].content#strip_noise(response["messages"][-1].content)
logger.info(f"Raw response from LLM [DeclarationBuilderNode - Monitor]: {raw_response[:500]}...")
try:
# Expecting a dict with a "monitor_block" key
generated_monitor_json = extract_json_from_llm_response(raw_response)
monitor_block = generated_monitor_json.get("monitor_block")
if not monitor_block:
raise ValueError("LLM response for monitor block did not contain 'monitor_block' key.")
except json.JSONDecodeError as error_json:
logger.error("Failed to extract JSON for monitor block. Attempting to correct.")
correction_prompt = (
"Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n"
"It must represent a single Scratch monitor block definition.\n"
f"- **Error Details**: {error_json}\n\n"
"**Strict Instructions for your response:**\n"
"1. **ONLY** output the corrected JSON object. Do not include any other text, comments, or explanations outside the JSON.\n"
"2. The top-level key must be `monitor_block`.\n"
"3. Ensure all property names (keys) are enclosed in **double quotes**.\n"
"4. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\\"`) are properly **escaped**.\n"
"5. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n"
"6. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n"
"7. **Crucially, remove any extraneous characters or duplicate closing braces outside the main JSON object.**\n"
"8. The corrected JSON must be a **complete and valid** JSON object.\n\n"
"Here is the problematic JSON string to correct:\n"
"```json\n"
f"{raw_response}\n"
"```\n"
"Corrected JSON:\n"
)
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
generated_monitor_json = extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content))
monitor_block = generated_monitor_json.get("monitor_block")
if not monitor_block:
raise ValueError("JSON corrector still did not produce a valid monitor_block.")
# Assign coordinates for display, avoiding overlaps
monitor_block["x"] = 5 + (monitor_y_offset % 2) * 320 # Alternate X for better layout
monitor_block["y"] = 5 + (monitor_y_offset // 2) * 50 # Increment Y
monitor_y_offset += 1 # Increment for next monitor
project_json["monitors"].append(monitor_block)
logger.info(f"Added monitor for '{monitor_data.get('variable', monitor_data.get('reporter_name'))}' to project.json.")
except Exception as e:
logger.error(f"Error generating monitor block for {monitor_data}: {e}")
# Decide whether to raise or continue based on error tolerance
state["project_json"] = project_json
logger.info("Declaration builder node finished updating project JSON.")
#with open("debug_state.json", "w", encoding="utf-8") as f:
# json.dump(state, f, indent=2, ensure_ascii=False)
print("Updated project JSON after declarations:", json.dumps(project_json, indent=2))
return state
# Node 5: Sprite Action Plan Builder
def overall_planner_node(state: GameState):
"""
Generates a comprehensive action plan for sprites, including detailed Scratch block information.
This node acts as an overall planner, leveraging knowledge of all block shapes and categories.
"""
logger.info("--- Running OverallPlannerNode ---")
description = state.get("description", "")
project_json = state["project_json"]
# MODIFICATION 1: Include 'Stage' in the list of names to plan for.
# It's crucial to ensure 'Stage' is always present for its global role.
target_names = [t["name"] for t in project_json["targets"]]
# MODIFICATION 2: Get sprite positions, providing default for Stage as it doesn't have x,y
sprite_positions = {}
for target in project_json["targets"]:
if not target["isStage"]:
sprite_positions[target["name"]] = {"x": target.get("x", 0), "y": target.get("y", 0)}
else:
sprite_positions[target["name"]] = {"x": "N/A", "y": "N/A"} # Stage doesn't have positional coordinates
declaration_plan = state["declaration_plan"]
planning_prompt = f"""Generate a detailed action plan for the game's sprites and stage based on the user query and sprite details.
**Game Description:** '{description}'
**Targets in Game (Sprites and Stage):** {', '.join(target_names)}
**Current Target Positions (Sprites have x/y, Stage is N/A):** {json.dumps(sprite_positions)}
Here is the overall declaration of variable, broadcast and monitors to look for and utilized as per requirment.
**Current Declaration Plan:** {json.dumps(declaration_plan)}
--- Scratch 3.0 Block Reference ---
This section provides a comprehensive reference of Scratch 3.0 blocks, categorized by shape, including their opcodes and functional descriptions. Use this to accurately identify block types and behavior.
### Hat Blocks
Description: {hat_description}
Blocks:
{hat_opcodes_functionalities}
### Boolean Blocks
Description: {boolean_description}
Blocks:
{boolean_opcodes_functionalities}
### C Blocks
Description: {c_description}
Blocks:
{c_opcodes_functionalities}
### Cap Blocks
Description: {cap_description}
Blocks:
{cap_opcodes_functionalities}
### Reporter Blocks
Description: {reporter_description}
Blocks:
{reporter_opcodes_functionalities}
### Stack Blocks
Description: {stack_description}
Blocks:
{stack_opcodes_functionalities}
-----------------------------------
Your task is to define the primary actions and movements for each sprite AND THE STAGE.
The output should be a JSON object with a single key 'action_overall_flow'. Each key inside this object should be a sprite or 'Stage' name (e.g., 'Player', 'Enemy', 'Stage'), and its value must include a 'description' and a list of 'plans'.
Each plan must include a **single Scratch Hat Block** (e.g., 'event_whenflagclicked') to start scratch project and should contain:
1. **'event'**: the exact `opcode` of the hat block that initiates the logic.
2. **'logic'**: a natural language breakdown of each step taken after the event, formatted as a multi-line string representing pseudo-code. Ensure clarity and granularity—each described action should map closely to a Scratch block or tight sequence.
- Use 'forever: ...' or 'repeat(10): ...' to prefix repeating logic suitable taking reference from the C blocks.
- Use Scratch-consistent verbs: 'move', 'change', 'wait', 'hide', 'show', 'say', 'glide', etc.
- For coordinates or numeric values within the logic string, use them directly (e.g., 'x: 240 y: -100' or 'change y by 10'). The entire 'logic' field itself must be a single string.
3. **Opcode Lists**: include relevant Scratch opcodes grouped under `motion`, `control`, `operator`, `sensing`, `looks`, `sounds`, `events`, and `data`. List only the non-empty categories. Use exact opcodes including shadow/helper blocks (e.g., 'math_number').
4. Few Example of content of logics inside for a specific plan:
- example 1[continues moving objects]: "when green flag clicked
go to x: 240 y: -100 // Start off to the right, on the ground
set [obstacle speed v] to -5
show
forever
change x by (obstacle speed)
if <(x position) < -240> then // If off-screen left
go to x: 240 y: -100 // Reset to right again
end
end
"
- example 2[jumping script of an plan]: "when [space v] key pressed
if <(y position) = -100> then // Only jump if on the ground
repeat (5)
change y by (100) // Wave paw
wait (0.1) seconds
change y by (-100) // Back to normal
wait (0.1) seconds
end
end
"
5. Use target names exactly as listed in `Targets in Game`. Do NOT rename or invent new targets.
6. Ensure the plan reflects accurate opcode usage derived strictly from the block reference above.
7. Few shot Example structure for 'action_overall_flow':
```json
{{
"action_overall_flow": {{
"Stage": {{
"description": "Background and global game state management, including broadcasts, rewards, and score.",
"plans": [
{{
"event": "event_whenflagclicked",
"logic": "when green flag clicked\n switch backdrop to [backdrop1 v]\n set [score v] to 0\n show variable [score v]\n broadcast [Game Start v]",
"motion": [],
"control": [],
"operator": [],
"sensing": [],
"looks": [
"looks_switchbackdropto"
],
"sounds": [],
"events": [
"event_broadcast"
],
"data": [
"data_setvariableto",
"data_showvariable"
]
}},
{{
"event": "event_whenbroadcastreceived",
"logic": "when I receive [Game Over v]\n if <(score) > (High Score)> then\n set [High Score v] to (score)\n end\n switch backdrop to [HighScore v]",
"motion": [],
"control": [
"control_if"
],
"operator": [
"operator_gt"
],
"sensing": [],
"looks": [
"looks_switchbackdropto"
],
"sounds": [],
"events": [],
"data": [
"data_setvariableto"
]
}}
]
}},
"Sprite1": {{
"description": "Main character (cat) actions",
"plans": [
{{
"event": "event_whenflagclicked",
"logic": "when green flag clicked\n go to x: 240 y: -100\n end\n.",
"motion": [
"motion_gotoxy"
],
"control": [],
"operator": [],
"sensing": [],
"looks": [],
"sounds": [],
"events": [],
"data": []
}},
{{
"event": "event_whenkeypressed",
"logic": "when [space v] key pressed\n repeat (10)\n change y by (20)\n wait (0.1) seconds\n change y by (-20)\n end",
"motion": [
"motion_changeyby"
],
"control": [
"control_repeat",
"control_wait"
],
"operator": [],
"sensing": [],
"looks": [],
"sounds": [],
"events": [],
"data": []
}}
]
}},
"soccer ball": {{
"description": "Obstacle movement and interaction",
"plans": [
{{
"event": "event_whenflagclicked",
"logic": "when green flag clicked\n go to x: 240 y: -135\n forever\n glide 2 seconds to x: -240 y: -135\n if <(x position) < -235> then\n set x to 240\n end\n if <touching [Sprite1 v]?> then\n broadcast [Game Over v]\n stop [all v]\n end\n end",
"motion": [
"motion_gotoxy",
"motion_glidesecstoxy",
"motion_xposition",
"motion_setx"
],
"control": [
"control_forever",
"control_if",
"control_stop"
],
"operator": [
"operator_lt"
],
"sensing": [
"sensing_istouching",
"sensing_touchingobjectmenu"
],
"looks": [],
"sounds": [],
"events": [
"event_broadcast"
],
"data": []
}}
]
}}
}}
}}
```
8. Based on the provided context, generate the `action_overall_flow`.
- Maintain the **exact JSON structure** shown above.
- All `logic` fields must be **clear and granular**.
- Only include opcode categories that contain relevant opcodes.
- Ensure that each opcode matches its intended Scratch functionality.
- If feedback suggests major change, **rethink the entire plan** for the affected sprite(s).
- If feedback is minor, make precise, minimal improvements only.
"""
try:
response = agent.invoke({"messages": [{"role": "user", "content": planning_prompt}]})
print("Raw response from LLM [OverallPlannerNode 1]:",response)
raw_response = response["messages"][-1].content#strip_noise(response["messages"][-1].content)
print("Raw response from LLM [OverallPlannerNode 2]:", raw_response) # Uncomment for debugging
# json debugging and solving
try:
overall_plan = extract_json_from_llm_response(raw_response)
except json.JSONDecodeError as error_json:
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.")
# Use the JSON resolver agent to fix the response
correction_prompt = (
"Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n"
"Carefully review the JSON for any errors, especially focusing on the reported error at:\n"
f"- **Error Details**: {error_json}\n\n"
"**Strict Instructions for your response:**\n"
"1. **ONLY** output the corrected JSON. Do not include any other text, comments, or explanations outside the JSON.\n"
"2. Ensure all property names (keys) are enclosed in **double quotes**.\n"
"3. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\`) are properly **escaped**.\n"
"4. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n"
"5. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n"
"6. **Crucially, remove any extraneous characters or duplicate closing braces outside the main JSON object.**\n"
"7. The corrected JSON must be a **complete and valid** JSON object.\n\n"
"Here is the problematic JSON string to correct:\n"
"```json\n"
f"{raw_response}\n"
"```\n"
"Corrected JSON:\n"
)
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
print(f"[JSON CORRECTOR RESPONSE AT OVERALLPLANNERNODE ]: {correction_response['messages'][-1].content}")
overall_plan= extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content))
state["action_plan"] = overall_plan
logger.info("Overall plan generated by OverallPlannerNode.")
#with open("debug_state.json", "w", encoding="utf-8") as f:
# json.dump(state, f, indent=2, ensure_ascii=False)
return state
except Exception as e:
logger.error(f"Error in OverallPlannerNode: {e}")
raise
# Node 6: Logic updating if any issue here
def plan_logic_aligner_node(state: GameState):
logger.info("--- Running plan_logic_aligner_node ---")
game_description = state.get("description", "No game description provided.")
action_plan = state.get("action_plan", {})
# Ensure action_plan exists and contains "action_overall_flow"
# if not action_plan or "action_overall_flow" not in action_plan:
# logger.warning("No 'action_overall_flow' found in state['action_plan']. Skipping plan refinement and block relation analysis.")
# return state
all_catalogs = [
hat_block_data,
boolean_block_data,
c_block_data,
cap_block_data,
reporter_block_data,
stack_block_data
]
refined_overall_flow = {}
if action_plan.get("action_overall_flow", {}) == {}:
plan_data = action_plan.items()
else:
plan_data = action_plan.get("action_overall_flow", {}).items()
for target_name, target_data in plan_data:
refined_target_plans = []
overall_plans_in_sprite = []
# for getting overall plan for not getting deviated
for plan in target_data.get("plans", []):
original_logic = plan.get("logic", "")
overall_plans_in_sprite.append(original_logic)
print(f"[overall_plans_in_sprite AT PLAN ALIGNER]: {overall_plans_in_sprite}")
for plan in target_data.get("plans", []):
original_logic = plan.get("logic", "")
event = plan.get("event", "unknown_event")
opcodes = {
"motion": plan.get("motion", plan.get("motions", [])),
"control": plan.get("control", plan.get("controls", [])),
"operator": plan.get("operator", plan.get("operators", [])),
"sensing": plan.get("sensing", []),
"looks": plan.get("looks", plan.get("look", [])),
"sounds": plan.get("sounds", plan.get("sound", [])),
"events": plan.get("events", []),
"data": plan.get("data", [])
}
motion_opcodes = plan.get("motion", plan.get("motions", [])) # handling of motion and motions by llm without re
control_opcodes = plan.get("control", plan.get("controls", [])) # handling of control and controls by llm without re
operator_opcodes = plan.get("operator", plan.get("operators", [])) # handling of operator and operators by llm without re
sensing_opcodes = plan.get("sensing", [])
looks_opcodes = plan.get("looks", plan.get("look", [])) # handling of look and looks by llm without re
sound_opcodes = plan.get("sounds", plan.get("sound", [])) # handling of sound and sounds by llm without re
events_opcodes = plan.get("events", [])
events_opcodes_2_raw = plan.get("event", []) # Get the value as is
events_opcodes_2 = [events_opcodes_2_raw] if isinstance(events_opcodes_2_raw, str) else events_opcodes_2_raw
data_opcodes = plan.get("data", [])
needed_opcodes = (
motion_opcodes + control_opcodes + operator_opcodes +
sensing_opcodes + looks_opcodes + sound_opcodes +
events_opcodes + events_opcodes_2 + data_opcodes
)
needed_opcodes = list(set(needed_opcodes))
# 2) build filtered runtime catalog (if you still need it)
filtered_catalog = {
op: ALL_SCRATCH_BLOCKS_CATALOG[op]
for op in needed_opcodes
if op in ALL_SCRATCH_BLOCKS_CATALOG
}
# 3) merge human-written catalog + runtime entry for each opcode
combined_blocks = {}
for op in needed_opcodes:
catalog_def = find_block_in_all(op, all_catalogs) or {}
runtime_def = ALL_SCRATCH_BLOCKS_CATALOG.get(op, {})
# merge: catalog fields first, then runtime overrides/adds
merged = {**catalog_def, **runtime_def}
combined_blocks[op] = merged
print("Combined blocks for this script:", json.dumps(combined_blocks, indent=2))
refinement_prompt = f"""
You are an expert in Scratch 3.0 game development, specializing in understanding block relationships (stacked, nested).
Review the following plan for '{target_name}' triggered by '{event}'.
Game Description:
{game_description}
--- Scratch 3.0 Block Reference ---
### Hat Blocks
Description: {hat_description}
Blocks:
{hat_opcodes_functionalities}
### Boolean Blocks
Description: {boolean_description}
Blocks:
{boolean_opcodes_functionalities}
### C Blocks
Description: {c_description}
Blocks:
{c_opcodes_functionalities}
### Cap Blocks
Description: {cap_description}
Blocks:
{cap_opcodes_functionalities}
### Reporter Blocks
Description: {reporter_description}
Blocks:
{reporter_opcodes_functionalities}
### Stack Blocks
Description: {stack_description}
Blocks:
{stack_opcodes_functionalities}
-----------------------------------
Current Plan Details:
- Event (Hat Block Opcode): {event}
- Overall plans made on {target_name} are in the list: {overall_plans_in_sprite}
- Associated Opcodes by Category: {json.dumps(opcodes, indent=2)}
- Reference code and functionality and logics script for to check is script correct: {combined_blocks}
- Current Logic Description to Update if required: "{original_logic}"
Your task is to:
1. **Refine the 'Logic'**: Make it more precise, accurate, and fully aligned with the game mechanics described in the 'Game Description'. Ensure it uses Scratch-consistent verbs and phrasing. **Do NOT** use double quotes within the 'logic' string itself for values.
2. The logic should build flow which can correctly describe motion or flow for e.g jumping, running, flying and other mechanics.
3. **'logic'**: a natural language breakdown of each step taken after the event, formatted as a multi-line string representing pseudo-code. Ensure clarity and granularity—each described action should map closely to a Scratch block or tight sequence.
4. Few Example of content of logics inside for a specific plan:
- example 1[continuos moving objects]: "when green flag clicked
go to x: 240 y: -100 // Start off to the right, on the ground
set [obstacle speed v] to -5
show
forever
change x by (obstacle speed)
if <(x position) < -240> then // If off-screen left
go to x: 240 y: -100 // Reset to right again
end
end
"
- example 2[jumping script of an plan]: " when [space v] key pressed
if <(y position) = -100> then // Only jump if on the ground
repeat (5)
change y by (100) // Wave paw
wait (0.1) seconds
change y by (-100) // Back to normal
wait (0.1) seconds
end
end
"
- example 3[score decrement on collision]: "when green flag clicked
set [score v] to 0
forever
if <touching [other sprite v] ?> then
change [score v] by (-1)
wait (0.1) seconds // To prevent rapid score decrement
end
5. Donot add any explaination of logic or comments to justify or explain just put the logic content in the json.
6. Output a JSON object and [NOTE: use double quates always ""]:
- `refined_logic`: The refined logic string.
Output ONLY the JSON object.
"""
refined_logic = original_logic # Default to original
#block_relationships = [] # Default to empty
try:
# Invoke the main agent for logic refinement and relationship identification
response = agent.invoke({"messages": [{"role": "user", "content": refinement_prompt}]})
llm_output_raw = response["messages"][-1].content
parsed_llm_output = extract_json_from_llm_response(llm_output_raw)
refined_logic = parsed_llm_output.get("refined_logic", original_logic)
#block_relationships = parsed_llm_output.get("block_relationships", [])
logger.info(f"Successfully processed plan for {target_name} - {event}.")
except json.JSONDecodeError as error_json:
logger.error(f"JSON Decode Error for {target_name} - {event}: {error_json}. Attempting correction.")
# If JSON parsing fails, use the json resolver agent
correction_prompt = (
"Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n"
"It must be a JSON object with `refined_logic` (string) and `block_relationships` (array of objects).\n"
f"- **Error Details**: {error_json}\n\n"
"**Strict Instructions for your response:**\n"
"1. **ONLY** output the corrected JSON. Do not include any other text or explanations.\n"
"2. Ensure all keys and string values are enclosed in **double quotes**. Escape internal quotes (`\\`).\n"
"3. No trailing commas. Correct nesting.\n\n"
"Here is the problematic JSON string to correct:\n"
f"```json\n{llm_output_raw}\n```\n"
"Corrected JSON:\n"
)
try:
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
corrected_output = extract_json_from_llm_response(correction_response["messages"][-1].content)
refined_logic = corrected_output.get("refined_logic", original_logic)
#block_relationships = corrected_output.get("block_relationships", [])
logger.info(f"Successfully corrected JSON output for {target_name} - {event}.")
except Exception as e_corr:
logger.error(f"Failed to correct JSON output for {target_name} - {event} even after retry: {e_corr}")
# Final fallback if correction fails
refined_logic = original_logic
#block_relationships = []
except Exception as e:
logger.error(f"General error invoking LLM for {target_name} - {event}: {e}")
# Fallback to original logic and empty relationships on any other error
refined_logic = original_logic
#block_relationships = []
plan["logic"] = refined_logic
#plan["block_relationships"] = block_relationships
refined_target_plans.append(plan)
refined_overall_flow[target_name] = {"description": target_data.get("description"), "plans": refined_target_plans}
# Update the original action_plan in the state with the refined version
state["action_plan"]["action_overall_flow"] = refined_overall_flow
print(f"[OVREALL REFINED LOGIC]: {refined_overall_flow}")
logger.info("Plan refinement and block relation analysis completed for all plans.")
return state
# Node 7: Sprite Plan Verification Node
def plan_verification_node(state: GameState):
"""
Validates the generated action plan/blocks, identifies missing logic,
provides feedback, and determines if further improvements are needed.
Also manages the iteration count for the improvement loop.
"""
logger.info(f"--- Running VerificationNode (Iteration: {state.get('iteration_count', 0)}) ---")
MAX_IMPROVEMENT_ITERATIONS = 1 # Set a sensible limit to prevent infinite loops
current_iteration = state.get("iteration_count", 0)
project_json = state["project_json"]
action_plan = state.get("action_plan", {})
print(f"[action_plan before verification] on ({current_iteration}): {json.dumps(action_plan, indent=2)}")
# --- Optimized Logic: Check iteration count BEFORE LLM invocation ---
if current_iteration >= MAX_IMPROVEMENT_ITERATIONS:
logger.warning(f"Max improvement iterations ({MAX_IMPROVEMENT_ITERATIONS}) reached. Skipping LLM call and forcing 'needs_improvement' to False.")
state["needs_improvement"] = False
state["plan_validation_feedback"] = "Max iterations reached, stopping further improvements."
state["iteration_count"] = 0 # Reset iteration count as improvement loop is stopping
print(f"[updated action_plan after verification] on ({current_iteration}): {json.dumps(state.get('action_plan', {}), indent=2)}")
#with open("debug_state.json", "w", encoding="utf-8") as f:
# json.dump(state, f, indent=2, ensure_ascii=False)
return state
# --- End of Optimized Logic ---
# Corrected validation_prompt (ensure all variables are passed or mocked for demonstration)
# For a real scenario, make sure these are populated from the state or other sources
hat_description = "Blocks that start a script when an event happens."
hat_opcodes_functionalities = "event_whenflagclicked (when green flag clicked), event_whenkeypressed (when key pressed), event_whenthisspriteclicked (when this sprite clicked)"
boolean_description = "Blocks that report true or false."
boolean_opcodes_functionalities = "operator_and, operator_or, operator_not, sensing_touchingobject, sensing_keypressed"
c_description = "Blocks that create a control flow, like loops or conditionals."
c_opcodes_functionalities = "control_if, control_if_else, control_repeat, control_forever"
cap_description = "Blocks that stop a script."
cap_opcodes_functionalities = "control_stop"
reporter_description = "Blocks that report a value (numbers, strings)."
reporter_opcodes_functionalities = "motion_xposition, motion_yposition, sensing_askandwait, data_variable"
stack_description = "Blocks that perform actions sequentially."
stack_opcodes_functionalities = "motion_movesteps, looks_sayforsecs, sound_playuntildone, control_wait"
validation_prompt = f"""You are an AI validator for Scratch project plans and generated blocks. \
Your task is to review the current state of the game's action plan and block structure. \
Critically analyze if there are any missing logic, structural inconsistencies, or unclear intentions. \
Provide **precise** and **constructive** feedback for improvement.
**Game Description:**
{state.get('description', '')}
**Current Action Plan (High-Level Logic):**
```json
{json.dumps(action_plan, indent=2)}
```
**Current Project JSON (Generated Blocks):**
```json
{json.dumps(project_json, indent=2)}
```
**Previous Feedback (if any):**
{state.get('plan_validation_feedback', 'None')}
--- Scratch 3.0 Block Reference ---
This section provides a comprehensive reference of Scratch 3.0 blocks, categorized by shape, including their opcodes and functional descriptions. Use this to accurately identify block types and behavior.
### Hat Blocks
Description: {hat_description}
Blocks:
{hat_opcodes_functionalities}
### Boolean Blocks
Description: {boolean_description}
Blocks:
{boolean_opcodes_functionalities}
### C Blocks
Description: {c_description}
Blocks:
{c_opcodes_functionalities}
### Cap Blocks
Description: {cap_description}
Blocks:
{cap_opcodes_functionalities}
### Reporter Blocks
Description: {reporter_description}
Blocks:
{reporter_opcodes_functionalities}
### Stack Blocks
Description: {stack_description}
Blocks:
{stack_opcodes_functionalities}
**Check** if a any opcode related to logic inside the plans is missing or invalid opcode used, if any update it where required.
Based on the above, return a response strictly in the following JSON format:
```json
{{
"feedback": "Detailed comments on any missing logic, inconsistencies, or unclear intent. Be concise but specific. If everything is perfect, state that explicitly.",
"needs_improvement": true,
"suggested_description_updates": "Concise revision of the game description if needed. Use an empty string if no change is required."
}}
```
**Important:**
- The `needs_improvement` field must be strictly `true` or `false` (boolean). Do **not** include any other text or explanation inside the JSON.
- Be strict in evaluation. If **any** part of the plan or block logic appears incomplete, ambiguous, or incorrect, set `needs_improvement` to `true`."""
try:
response = agent.invoke({"messages": [{"role": "user", "content": validation_prompt}]})
raw_response = response["messages"][-1].content #strip_noise(response["messages"][-1].content)
logger.info(f"Raw response from LLM [VerificationNode]: {raw_response[:500]}...")
# json debugging and solving
try:
validation_result = extract_json_from_llm_response(raw_response)
except json.JSONDecodeError as error_json:
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.")
# Use the JSON resolver agent to fix the response
correction_prompt = (
"Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n"
"Carefully review the JSON for any errors, especially focusing on the reported error at:\n"
f"- **Error Details**: {error_json}\n\n"
"**Strict Instructions for your response:**\n"
"1. **ONLY** output the corrected JSON. Do not include any other text, comments, or explanations outside the JSON.\n"
"2. Ensure all property names (keys) are enclosed in **double quotes**.\n"
"3. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\`) are properly **escaped**.\n"
"4. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n"
"5. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n"
"6. **Crucially, remove any extraneous characters or duplicate closing braces outside the main JSON object.**\n" # Added instruction
"7. The corrected JSON must be a **complete and valid** JSON object.\n\n"
"Here is the problematic JSON string to correct:\n"
"```json\n"
f"{raw_response}\n"
"```\n"
"Corrected JSON:\n"
)
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
print(f"[JSON CORRECTOR RESPONSE AT PLANVERIFICATIONNODE ]: {correction_response['messages'][-1].content}")
validation_result = extract_json_from_llm_response(correction_response["messages"][-1].content) #strip_noise(correction_response["messages"][-1].content))
# Update state with feedback and improvement flag
state["plan_validation_feedback"] = validation_result.get("feedback", "No specific feedback provided.")
state["needs_improvement"] = validation_result.get("needs_improvement", False)
suggested_description_updates = validation_result.get("suggested_description_updates", "")
if suggested_description_updates:
# You might want to append or intelligently merge this with the existing detailed_game_description
# For simplicity, let's just append for now or update a specific field
#current_description = state.get("detailed_game_description", state.get("description", ""))
current_description = state.get("description", "") # Corrected to get from 'description' key
state["description"] = f"{current_description}\n\nSuggested Update: {suggested_description_updates}"
logger.info("Updated detailed game description based on validation feedback.")
# Manage iteration count
if state["needs_improvement"]:
state["iteration_count"] = current_iteration + 1
# The check for MAX_IMPROVEMENT_ITERATIONS is now done at the beginning of the function
else:
state["iteration_count"] = 0 # Reset if no more improvement needed
logger.info(f"Verification completed. Needs Improvement: {state['needs_improvement']}. Feedback: {state['plan_validation_feedback'][:100]}...")
print(f"[updated action_plan after verification] on ({current_iteration}): {json.dumps(state.get('action_plan', {}), indent=2)}")
#with open("debug_state.json", "w", encoding="utf-8") as f:
# json.dump(state, f, indent=2, ensure_ascii=False)
return state
except Exception as e:
logger.error(f"Error in VerificationNode: {e}")
state["needs_improvement"] = False # Force end loop on error
state["plan_validation_feedback"] = f"Validation error: {e}"
raise
# Node 8: Refine planner node
def refined_planner_node(state: GameState):
"""
Refines the action plan based on validation feedback and game description.
"""
logger.info("--- Running RefinedPlannerNode ---")
#detailed_game_description = state.get("detailed_game_description", state.get("description", "A game."))
detailed_game_description = state.get("description","")
current_action_plan = state.get("action_plan", {})
print(f"[current_action_plan before refinement] on ({state.get('iteration_count', 0)}): {json.dumps(current_action_plan, indent=2)}")
plan_validation_feedback = state.get("plan_validation_feedback", "No specific feedback provided. Assume general refinement is needed.")
project_json = state["project_json"]
target_names = [t["name"] for t in project_json["targets"]]
# MODIFICATION 2: Get sprite positions, providing default for Stage as it doesn't have x,y
sprite_positions = {}
for target in project_json["targets"]:
if not target["isStage"]:
sprite_positions[target["name"]] = {"x": target.get("x", 0), "y": target.get("y", 0)}
else:
sprite_positions[target["name"]] = {"x": "N/A", "y": "N/A"} # Stage doesn't have positional coordinates
declaration_plan = state["declaration_plan"]
refinement_prompt = f"""Refine the existing action plan for the game's sprites based on the detailed game description and the validation feedback provided.
**Game Description:** '{detailed_game_description}'
**Targets in Game (Sprites and Stage):** {', '.join(target_names)}
**Current Target Positions (Sprites have x/y, Stage is N/A):** {json.dumps(sprite_positions)}
Here is the overall declaration of variable, broadcast and monitors to look for and utilized as per requirment.
**Current Declaration Plan:** {json.dumps(declaration_plan)}
**Validation Feedback:**
'{plan_validation_feedback}'
--- Scratch 3.0 Block Reference ---
### Hat Blocks
Description: {hat_description}
Blocks:
{hat_opcodes_functionalities}
### Boolean Blocks
Description: {boolean_description}
Blocks:
{boolean_opcodes_functionalities}
### C Blocks
Description: {c_description}
Blocks:
{c_opcodes_functionalities}
### Cap Blocks
Description: {cap_description}
Blocks:
{cap_opcodes_functionalities}
### Reporter Blocks
Description: {reporter_description}
Blocks:
{reporter_opcodes_functionalities}
### Stack Blocks
Description: {stack_description}
Blocks:
{stack_opcodes_functionalities}
-----------------------------------
* **Your task is to align to description, refine and correct the JSON object 'action_overall_flow'.**
Use sprite names exactly as provided in `sprite_names` (e.g., 'Sprite1', 'soccer ball'); and also the stage, do **NOT** rename them.
1. **'event'**: the exact `opcode` of the hat block that initiates the logic.
2. **'logic'**: a natural language breakdown of each step taken after the event, formatted as a multi-line string representing pseudo-code. Ensure clarity and granularity—each described action should map closely to a Scratch block or tight sequence.
- Use 'forever: ...' or 'repeat(10): ...' to prefix repeating logic suitable taking reference from the C blocks.
- Use Scratch-consistent verbs: 'move', 'change', 'wait', 'hide', 'show', 'say', 'glide', etc.
- For coordinates or numeric values within the logic string, use them directly (e.g., 'x: 240 y: -100' or 'change y by 10'). The entire 'logic' field itself must be a single string.
3. **Opcode Lists**: include relevant Scratch opcodes grouped under `motion`, `control`, `operator`, `sensing`, `looks`, `sounds`, `events`, and `data`. List only the non-empty categories. Use exact opcodes including shadow/helper blocks (e.g., 'math_number').
4. Few Example of content of logics inside for a specific plan:
- example 1[continues moving objects]: "when green flag clicked
go to x: 240 y: -100 // Start off to the right, on the ground
set [obstacle speed v] to -5
show
forever
change x by (obstacle speed)
if <(x position) < -240> then // If off-screen left
go to x: 240 y: -100 // Reset to right again
end
end
"
- example 2[jumping script of an plan]: "when [space v] key pressed
if <(y position) = -100> then // Only jump if on the ground
repeat (5)
change y by (100) // Wave paw
wait (0.1) seconds
change y by (-100) // Back to normal
wait (0.1) seconds
end
end
"
5. Use target names exactly as listed in `Targets in Game`. Do NOT rename or invent new targets.
6. Ensure the plan reflects accurate opcode usage derived strictly from the block reference above.
7. Few shot Example structure for 'action_overall_flow':
```json
{{
"action_overall_flow": {{
"Stage": {{
"description": "Background and global game state management, including broadcasts, rewards, and score.",
"plans": [
{{
"event": "event_whenflagclicked",
"logic": "when green flag clicked\n switch backdrop to [backdrop1 v]\n set [score v] to 0\n show variable [score v]\n broadcast [Game Start v]",
"motion": [],
"control": [],
"operator": [],
"sensing": [],
"looks": [
"looks_switchbackdropto"
],
"sounds": [],
"events": [
"event_broadcast"
],
"data": [
"data_setvariableto",
"data_showvariable"
]
}},
{{
"event": "event_whenbroadcastreceived",
"logic": "when I receive [Game Over v]\n if <(score) > (High Score)> then\n set [High Score v] to (score)\n end\n switch backdrop to [HighScore v]",
"motion": [],
"control": [
"control_if"
],
"operator": [
"operator_gt"
],
"sensing": [],
"looks": [
"looks_switchbackdropto"
],
"sounds": [],
"events": [],
"data": [
"data_setvariableto"
]
}}
]
}},
"Sprite1": {{
"description": "Main character (cat) actions",
"plans": [
{{
"event": "event_whenflagclicked",
"logic": "when green flag clicked\n go to x: 240 y: -100\n end\n.",
"motion": [
"motion_gotoxy"
],
"control": [],
"operator": [],
"sensing": [],
"looks": [],
"sounds": [],
"events": [],
"data": []
}},
{{
"event": "event_whenkeypressed",
"logic": "when [space v] key pressed\n repeat (10)\n change y by (20)\n wait (0.1) seconds\n change y by (-20)\n end",
"motion": [
"motion_changeyby"
],
"control": [
"control_repeat",
"control_wait"
],
"operator": [],
"sensing": [],
"looks": [],
"sounds": [],
"events": [],
"data": []
}}
]
}},
"soccer ball": {{
"description": "Obstacle movement and interaction",
"plans": [
{{
"event": "event_whenflagclicked",
"logic": "when green flag clicked\n go to x: 240 y: -135\n forever\n glide 2 seconds to x: -240 y: -135\n if <(x position) < -235> then\n set x to 240\n end\n if <touching [Sprite1 v]?> then\n broadcast [Game Over v]\n stop [all v]\n end\n end",
"motion": [
"motion_gotoxy",
"motion_glidesecstoxy",
"motion_xposition",
"motion_setx"
],
"control": [
"control_forever",
"control_if",
"control_stop"
],
"operator": [
"operator_lt"
],
"sensing": [
"sensing_istouching",
"sensing_touchingobjectmenu"
],
"looks": [],
"sounds": [],
"events": [
"event_broadcast"
],
"data": []
}}
]
}}
}}
}}
```
8. Use the validation feedback to address errors, fill in missing logic, or enhance clarity.
example of few possible improvements: 1.event_whenflagclicked is used to control sprite but its used for actual start scratch project and reset scratch. 2. looping like forever used where we should use iterative. 3. missing of for variable we used in the block
- Maintain the **exact JSON structure** shown above.
- All `logic` fields must be **clear and granular**.
- Only include opcode categories that contain relevant opcodes.
- Ensure that each opcode matches its intended Scratch functionality.
- If feedback suggests major change, **rethink the entire plan** for the affected sprite(s).
- If feedback is minor, make precise, minimal improvements only.
"""
try:
response = agent.invoke({"messages": [{"role": "user", "content": refinement_prompt}]})
raw_response = response["messages"][-1].content#strip_noise(response["messages"][-1].content)
logger.info(f"Raw response from LLM [RefinedPlannerNode]: {raw_response[:500]}...")
# json debugging and solving
try:
refined_plan = extract_json_from_llm_response(raw_response)
except json.JSONDecodeError as error_json:
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.")
# Use the JSON resolver agent to fix the response
correction_prompt = (
"Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n"
"Carefully review the JSON for any errors, especially focusing on the reported error at:\n"
f"- **Error Details**: {error_json}\n\n"
"**Strict Instructions for your response:**\n"
"1. **ONLY** output the corrected JSON. Do not include any other text, comments, or explanations outside the JSON.\n"
"2. Ensure all property names (keys) are enclosed in **double quotes**.\n"
"3. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\`) are properly **escaped**.\n"
"4. IN `logic` field make sure content enclosed in **double quotes** should not have invalid **double quotes**, **eliminate** all quotes inside the content if any. "
"4. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n"
"5. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n"
"6. **Crucially, remove any extraneous characters or duplicate closing braces outside the main JSON object.**\n" # Added instruction
"7. The corrected JSON must be a **complete and valid** JSON object.\n\n"
"Here is the problematic JSON string to correct:\n"
"```json\n"
f"{raw_response}\n"
"```\n"
"Corrected JSON:\n"
)
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
print(f"[JSON CORRECTOR RESPONSE AT REFINEPLANNER ]: {correction_response['messages'][-1].content}")
refined_plan = extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content))
logger.info("Refined plan corrected by JSON resolver agent.")
if refined_plan:
#state["action_plan"] = refined_plan.get("action_overall_flow", {}) # Update to the key 'action_overall_flow' [error]
state["action_plan"] = refined_plan.get("action_overall_flow", {}) # Update the main the prompt includes updated only
logger.info("Action plan refined by RefinedPlannerNode.")
else:
logger.warning("RefinedPlannerNode did not return a valid 'action_overall_flow' structure. Keeping previous plan.")
print("[Refined Action Plan]:", json.dumps(state["action_plan"], indent=2))
#print("[current state after refinement]:", json.dumps(state, indent=2))
#with open("debug_state.json", "w", encoding="utf-8") as f:
# json.dump(state, f, indent=2, ensure_ascii=False)
return state
except Exception as e:
logger.error(f"Error in RefinedPlannerNode: {e}")
raise
# Node 9:plan with exact count of the opcode used per logic
def plan_opcode_counter_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
For each plan in state["action_plan"]["action_overall_flow"], calls the LLM agent
to analyze the `logic` string and return a list of {opcode, count} for each category.
"""
logger.info("=== Running OPCODE COUTER LOGIC with LLM counts ===")
game_description = state.get("description", "No game description provided.")
sprite_name = {}
project_json = state["project_json"]
for target in project_json["targets"]:
sprite_name[target["name"]] = target["name"]
action_flow = state.get("action_plan", state.get("action_plan", {}).get("action_overall_flow", {}))
# if not action_flow:
# logger.warning("No action_overall_flow found; skipping.")
# return state
all_catalogs = [
hat_block_data,
boolean_block_data,
c_block_data,
cap_block_data,
reporter_block_data,
stack_block_data
]
if action_flow.get("action_overall_flow", {}) == {}:
plan_data = action_flow.items()
else:
plan_data = action_flow.get("action_overall_flow", {}).items()
refined_flow: Dict[str, Any] = {}
for sprite, sprite_data in plan_data:
refined_plans = []
for plan in sprite_data.get("plans", []):
logic = plan.get("logic", "")
event = plan.get("event", "")
# Pre‑extract the candidate opcodes per category
opcodes = {
"motion": plan.get("motion", []),
"control": plan.get("control", []),
"operator": plan.get("operator", []),
"sensing": plan.get("sensing", []),
"looks": plan.get("looks", []),
"sounds": plan.get("sounds", []),
"events": plan.get("events", []) + ([event] if isinstance(event, str) else []),
"data": plan.get("data", []),
}
# Build a prompt to the LLM, providing the logic and asking for opcode counts
refinement_prompt = f"""
You are a Scratch 3.0 expert with deep knowledge of block types, nesting and stack relationships.
Your job: read the plan logic below and decide exactly which blocks (and how many of each) are required to implement it.
Review the following plan for '{sprite}' triggered by '{event}'.
Game Description:
{game_description}
--- Scratch 3.0 Block Reference ---
### Hat Blocks
Description: {hat_description}
Blocks:
{hat_opcodes_functionalities}
### Boolean Blocks
Description: {boolean_description}
Blocks:
{boolean_opcodes_functionalities}
### C Blocks
Description: {c_description}
Blocks:
{c_opcodes_functionalities}
### Cap Blocks
Description: {cap_description}
Blocks:
{cap_opcodes_functionalities}
### Reporter Blocks
Description: {reporter_description}
Blocks:
{reporter_opcodes_functionalities}
### Stack Blocks
Description: {stack_description}
Blocks:
{stack_opcodes_functionalities}
-----------------------------------
Current Plan Details:
- Event (Hat Block Opcode): {event}
- Associated Opcodes by Category: {json.dumps(opcodes, indent=2)}
── Game Context ──
Sprite: "{sprite_name}"
Description: {game_description}
── Current Plan ──
Event (hat block): {event}
Logic (pseudo‑Scratch): {logic}
Plan : {plan}
── Opcode Candidates ──
Motion: {opcodes["motion"]}
Control: {opcodes["control"]}
Operator: {opcodes["operator"]}
Sensing: {opcodes["sensing"]}
Looks: {opcodes["looks"]}
Sounds: {opcodes["sounds"]}
Events: {opcodes["events"]}
Data: {opcodes["data"]}
── Your Task ──
1. Analyze the “Logic” steps and choose exactly which opcodes from each category are needed.
2. For each category, return a JSON list of objects:
{{ "opcode": "<opcode_name>", "count": <integer> }}
3. Return a top‑level JSON object with exactly these eight keys:
"motion", "control", "operator", "sensing", "looks", "sounds", "events", "data".
4. Use only double quotes and ensure valid JSON.
Example output:
```json
{{
"event": "event_whenflagclicked",
"logic": "when green flag clicked\n switch backdrop to [backdrop1 v]\n set [score v] to 0\n set [speed v] to 0\n show variable [score v]\n show variable [speed v]\n broadcast [Game Start v]",
"motion": [{{"opcode":"motion_gotoxy","count":1}}],
"control": [{{"opcode":"control_forever","count":1}},{{"opcode":"control_if","count":1}}],
"operator": [],
"sensing": [],
"looks": [{{"opcode":"looks_switchbackdropto","count":1}}],
"sounds": [],
"events": [{{"opcode":"event_broadcast","count":1}}],
"data": [{{"opcode":"data_setvariableto","count":2}},{{"opcode":"data_showvariable","count":2}}],
}},
{{
"event": [{{"opcode":"event_whenflagclicked","count":1}}],
"logic": "when green flag clicked\n go to x: 240 y: -135\n set [score v] to +1\n set [speed v] to +1\n show variable [score v]\n show variable [speed v]\n forever\n glide 2 seconds to x: -240 y: -135\n if <(x position) < -235> then\n set x to 240\n end\n if <touching [Sprite1 v]?> then\n broadcast [Game Over v]\n stop [all v]\n end\n end",
"motion": [
{{"opcode":"motion_gotoxy","count":1}},
{{"opcode":"motion_glidesecstoxy","count":1}},
{{"opcode":"motion_xposition","count":1}},
{{"opcode":"motion_setx","count":1}},
],
"control": [
{{"opcode":"control_forever","count":1}},
{{"opcode":"control_if","count":1}},
{{"opcode":"control_stop","count":1}},
],
"operator": [
{{"opcode":"operator_lt","count":1}},
],
"sensing": [
{{"opcode":"sensing_istouching","count":1}},
{{"opcode":"sensing_touchingobjectmenu","count":1}},
],
"looks": [],
"sounds": [],
"events": [
{{"opcode":"event_broadcast","count":1}},
],
"data": [
{{"opcode":"data_setvariableto","count":2}},
{{"opcode":"data_showvariable","count":2}}
],
}}
```
"""
try:
# Invoke the main agent for logic refinement and relationship identification
response = agent.invoke({"messages": [{"role": "user", "content": refinement_prompt}]})
llm_output = response["messages"][-1].content
#print(f"[RAW RESPONSE OPCODE COUTER LOGIC]: {response}")
llm_json = extract_json_from_llm_response(llm_output)
logger.info(f"Successfully analyse the opcode requirement for {sprite} - {event}.")
except json.JSONDecodeError as error_json:
logger.error(f"JSON Decode Error for {sprite} - {event}: {error_json}. Attempting correction.")
# If JSON parsing fails, use the json resolver agent
correction_prompt = (
"Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n"
"It must be a JSON object with `refined_logic` (string) and `block_relationships` (array of objects).\n"
f"- **Error Details**: {error_json}\n\n"
"**Strict Instructions for your response:**\n"
"1. **ONLY** output the corrected JSON. Do not include any other text or explanations.\n"
"2. Ensure all keys and string values are enclosed in **double quotes**. Escape internal quotes (`\\`).\n"
"3. No trailing commas. Correct nesting.\n\n"
"Here is the problematic JSON string to correct:\n"
f"```json\n{llm_output}\n```\n"
"Corrected JSON:\n"
)
try:
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
llm_json = extract_json_from_llm_response(correction_response["messages"][-1].content)
logger.info(f"Successfully corrected JSON output for {sprite} - {event}.")
except Exception as e_corr:
logger.error(f"Failed to correct JSON output for {sprite} - {event} even after retry: {e_corr}")
return state
# Replace each category in the plan with the LLM’s output (or empty list fallback)
plan["motion"] = llm_json.get("motion", llm_json.get("motions", [])) # handling of motion and motions by llm without re
plan["control"] = llm_json.get("control", llm_json.get("controls", [])) # handling of control and controls by llm without re
plan["operator"] = llm_json.get("operator", llm_json.get("operators", [])) # handling of operator and operators by llm without re
plan["sensing"] = llm_json.get("sensing", [])
plan["looks"] = llm_json.get("looks", llm_json.get("look", [])) # handling of look and looks by llm without re
plan["sounds"] = llm_json.get("sounds", llm_json.get("sound", [])) # handling of sound and sounds by llm without re
plan["events"] = llm_json.get("events", [])
plan["data"] = llm_json.get("data", [])
refined_plans.append(plan)
refined_flow[sprite] = {
"description": sprite_data.get("description", ""),
"plans": refined_plans
}
state["temporary_node"] = refined_flow
print(f"[OPCODE COUTER LOGIC]: {refined_flow}")
#state["action_plan"]["action_overall_flow"] = refined_flow
logger.info("=== OPCODE COUTER LOGIC completed ===")
return state
# Node 10: to refine plans and identify block relationships
def block_relationship_node(state: GameState):
logger.info("--- Running PlanBlockRelationNode ---")
game_description = state.get("description", "No game description provided.")
action_plan = state.get("action_plan", {})
# # Ensure action_plan exists and contains "action_overall_flow"
# if not action_plan or "action_overall_flow" not in action_plan:
# logger.warning("No 'action_overall_flow' found in state['action_plan']. Skipping plan refinement and block relation analysis.")
# return state
refined_overall_flow = {}
if action_plan.get("action_overall_flow", {}) == {}:
plan_data = action_plan.items()
else:
plan_data = action_plan.get("action_overall_flow", {}).items()
for target_name, target_data in plan_data:
refined_target_plans = []
for plan in target_data.get("plans", []):
original_logic = plan.get("logic", "")
event = plan.get("event", "unknown_event")
opcodes = {
"motion": plan.get("motion", []),
"control": plan.get("control", []),
"operator": plan.get("operator", []),
"sensing": plan.get("sensing", []),
"looks": plan.get("looks", []),
"sounds": plan.get("sounds", []),
"events": plan.get("events", []),
"data": plan.get("data", [])
}
refinement_prompt = f"""
You are an expert in Scratch 3.0 game development, specializing in understanding block relationships (stacked, nested).
Review the following plan for '{target_name}' triggered by '{event}'.
Game Description:
{game_description}
--- Scratch 3.0 Block Reference ---
### Hat Blocks
Description: {hat_description}
### Boolean Blocks
Description: {boolean_description}
### C Blocks
Description: {c_description}
### Cap Blocks
Description: {cap_description}
### Reporter Blocks
Description: {reporter_description}
### Stack Blocks
Description: {stack_description}
-----------------------------------
Current Plan Details:
- Event (Hat Block Opcode): {event}
- Current Logic Description: "{original_logic}"
- Associated Opcodes by Category: {json.dumps(opcodes, indent=2)}
Your task is to:
1. **Identify Block Relationships**: Based on the 'Refined Logic' and 'Associated Opcodes', describe the relationships between the Scratch blocks implied by this plan. Identify if blocks are 'stacked' (one after another in a script) or 'nested' (one block plugged into another's input or within a C-block). For nested relationships, specify the 'parent' and 'child' or 'children_opcodes'.
2. Donot add any explaination of logic or relation or comments to justify or explain, just put the logic content in the json.
Output a JSON object and [NOTE: use double quates always ""]:
- `block_relationships`: An array of dictionaries, each describing a relationship.
- `refined_logic`: A concise, refined description of the plan's logic, focusing on the sequence and nesting of blocks.
Examples for `block_relationships`:
- **Example 1: Simple Stacked Blocks**
- Scenario: A sprite moves 10 steps, then says "Hello!" for 2 seconds.
- Opcodes: ["motion_movesteps", "looks_sayforsecs"]
- `refined_logic`: "The sprite moves forward and then says 'Hello!' for a duration."
- `block_relationships`: [
{{ "type": "stacked", "blocks": ["motion_movesteps", "looks_sayforsecs"] }}
]
- **Example 2: Nested C-block (Forever Loop) with Stacked Blocks inside**
- Scenario: A sprite continuously moves 10 steps and then bounces if on edge.
- Opcodes: ["control_forever", "motion_movesteps", "motion_ifonedgebounce"]
- `refined_logic`: "The sprite forever moves and bounces off the edge if it touches."
- `block_relationships`: [
{{ "type": "nested_c_block_body", "parent": "control_forever", "child_opcodes": ["motion_movesteps", "motion_ifonedgebounce"] }}
]
- **Example 3: Nested Conditional (If-Then) with Boolean Input and Stacked Blocks**
- Scenario: If the sprite is touching the mouse-pointer, it says "You found me!"
- Opcodes: ["control_if", "sensing_touchingobject", "looks_say"]
- `refined_logic`: "If the sprite touches the mouse pointer, it says a specific phrase."
- `block_relationships`: [
{{ "type": "nested_input", "parent_opcode": "control_if", "input_slot_name": "CONDITION", "child_opcode": "sensing_touchingobject" }},
{{ "type": "nested_c_block_body", "parent": "control_if", "child_opcodes": ["looks_say"] }}
]
- **Example 4: Nested Arithmetic Operation (Reporter Block) as Input**
- Scenario: A sprite moves steps equal to its x-position plus 50.
- Opcodes: ["motion_movesteps", "operator_add", "motion_xposition"]
- `refined_logic`: "The sprite moves a number of steps calculated by adding 50 to its current x-position."
- `block_relationships`: [
{{ "type": "nested_input", "parent_opcode": "motion_movesteps", "input_slot_name": "STEPS", "child_opcode": "operator_add" }},
{{ "type": "nested_input", "parent_opcode": "operator_add", "input_slot_name": "NUM1", "child_opcode": "motion_xposition" }},
{{ "type": "nested_literal_input", "parent_opcode": "operator_add", "input_slot_name": "NUM2", "value": "50" }}
]
- **Example 5: Nested If-Else Block with Boolean Input**
- Scenario: If score is greater than 10, say "High Score!", else say "Keep Playing!".
- Opcodes: ["control_if_else", "operator_gt", "data_variable", "looks_say"]
- `refined_logic`: "Checks if the score is high; provides different messages based on the condition."
- `block_relationships`: [
{{ "type": "nested_input", "parent_opcode": "control_if_else", "input_slot_name": "CONDITION", "child_opcode": "operator_gt" }},
{{ "type": "nested_input", "parent_opcode": "operator_gt", "input_slot_name": "OPERAND1", "child_opcode": "data_variable" }},
{{ "type": "nested_literal_input", "parent_opcode": "operator_gt", "input_slot_name": "OPERAND2", "value": "10" }},
{{ "type": "nested_c_block_body", "parent": "control_if_else", "child_opcodes": ["looks_say"] }},
{{ "type": "nested_c_block_body", "parent": "control_if_else", "child_opcodes": ["looks_say"] }}
]
Ensure the `block_relationships` array is comprehensive for this plan based on the detailed logic and opcodes.
Output ONLY the JSON object.
"""
refined_logic = original_logic # Default to original
block_relationships = [] # Default to empty
try:
# Invoke the main agent for logic refinement and relationship identification
response = agent.invoke({"messages": [{"role": "user", "content": refinement_prompt}]})
llm_output_raw = response["messages"][-1].content
parsed_llm_output = extract_json_from_llm_response(llm_output_raw)
refined_logic = parsed_llm_output.get("refined_logic", original_logic)
block_relationships = parsed_llm_output.get("block_relationships", [])
logger.info(f"Successfully processed plan for {target_name} - {event}.")
except json.JSONDecodeError as error_json:
logger.error(f"JSON Decode Error for {target_name} - {event}: {error_json}. Attempting correction.")
# If JSON parsing fails, use the json resolver agent
correction_prompt = (
"Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n"
"It must be a JSON object with `refined_logic` (string) and `block_relationships` (array of objects).\n"
f"- **Error Details**: {error_json}\n\n"
"**Strict Instructions for your response:**\n"
"1. **ONLY** output the corrected JSON. Do not include any other text or explanations.\n"
"2. Ensure all keys and string values are enclosed in **double quotes**. Escape internal quotes (`\\`).\n"
"3. No trailing commas. Correct nesting.\n\n"
"Here is the problematic JSON string to correct:\n"
f"```json\n{llm_output_raw}\n```\n"
"Corrected JSON:\n"
)
try:
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
corrected_output = extract_json_from_llm_response(correction_response["messages"][-1].content)
refined_logic = corrected_output.get("refined_logic", original_logic)
block_relationships = corrected_output.get("block_relationships", [])
logger.info(f"Successfully corrected JSON output for {target_name} - {event}.")
except Exception as e_corr:
logger.error(f"Failed to correct JSON output for {target_name} - {event} even after retry: {e_corr}")
# Final fallback if correction fails
refined_logic = original_logic
block_relationships = []
except Exception as e:
logger.error(f"General error invoking LLM for {target_name} - {event}: {e}")
# Fallback to original logic and empty relationships on any other error
refined_logic = original_logic
block_relationships = []
plan["logic"] = refined_logic
plan["block_relationships"] = block_relationships
refined_target_plans.append(plan)
refined_overall_flow[target_name] = {"description": target_data.get("description"), "plans": refined_target_plans}
# Update the original action_plan in the state with the refined version
state["action_plan"]["action_overall_flow"] = refined_overall_flow
print(f"[OVREALL REFINED RELATIONSHIP]: {refined_overall_flow}")
logger.info("Plan refinement and block relation analysis completed for all plans.")
return state
# Helper function to get a block by its opcode from a single catalog
def get_block_by_opcode(catalog_data: dict, opcode: str) -> dict | None:
"""
Search a single catalog (with keys "description" and "blocks": List[dict])
for a block whose 'op_code' matches the given opcode.
Returns the block dict or None if not found.
"""
for block in catalog_data["blocks"]:
if block.get("op_code") == opcode:
return block
return None
# Helper function to find a block in all catalogs by opcode
def find_block_in_all(opcode: str, all_catalogs: list[dict]) -> dict | None:
"""
Search across multiple catalogs for a given opcode.
Returns the first matching block dict or None.
"""
for catalog in all_catalogs:
blk = get_block_by_opcode(catalog, opcode)
if blk is not None:
return blk
return None
# Node 11: Overall Block Builder Node
def overall_block_builder_node(state: dict):
logger.info("--- Running OverallBlockBuilderNode ---")
project_json = state["project_json"]
targets = project_json["targets"]
monitors = project_json["monitors"]
# --- Sprite and Stage Target Mapping ---
sprite_map = {target["name"]: target for target in targets if not target["isStage"]}
stage_target = next((target for target in targets if target["isStage"]), None)
if stage_target:
sprite_map[stage_target["name"]] = stage_target
# --- Pre-load all block-catalog JSONs once ---
all_catalogs = [
hat_block_data,
boolean_block_data,
c_block_data,
cap_block_data,
reporter_block_data,
stack_block_data
]
action_plan = state.get("action_plan", {})
print("[Overall Action Plan received at the block generator]:", json.dumps(action_plan, indent=2))
if not action_plan:
logger.warning("No action plan found in state. Skipping OverallBlockBuilderNode.")
return state
# Initialize offsets for script placement on the Scratch canvas
script_y_offset = {}
script_x_offset_per_sprite = {name: 0 for name in sprite_map.keys()}
# This handles potential variations in the action_plan structure.
if action_plan.get("action_overall_flow", {}) == {}:
plan_data = action_plan.items()
else:
plan_data = action_plan.get("action_overall_flow", {}).items()
# --- Extract global project context for LLM ---
all_sprite_names = list(sprite_map.keys())
all_variable_names = {}
all_list_names = {}
all_broadcast_messages = {}
for target in targets:
for var_id, var_info in target.get("variables", {}).items():
all_variable_names[var_info[0]] = var_id # Store name -> ID mapping (e.g., "myVariable": "myVarId123")
for list_id, list_info in target.get("lists", {}).items():
all_list_names[list_info[0]] = list_id # Store name -> ID mapping
for broadcast_id, broadcast_name in target.get("broadcasts", {}).items():
all_broadcast_messages[broadcast_name] = broadcast_id # Store name -> ID mapping
# --- Process each sprite's action plan ---
for sprite_name, sprite_actions_data in plan_data:
if sprite_name in sprite_map:
current_sprite_target = sprite_map[sprite_name]
if "blocks" not in current_sprite_target:
current_sprite_target["blocks"] = {}
if sprite_name not in script_y_offset:
script_y_offset[sprite_name] = 0
for plan_entry in sprite_actions_data.get("plans", []):
event_opcode = plan_entry["event"]
logic_sequence = plan_entry["logic"]
block_relationship = plan_entry["block_relationships"]
# Gather all opcodes expected to be used in this script
needed_opcodes = []
needed_opcodes.extend(plan_entry.get("motion", plan_entry.get("motions", []))) # handling of motion and motions by llm without re
needed_opcodes.extend(plan_entry.get("control", plan_entry.get("controls", []))) # handling of control and controls by llm without re
needed_opcodes.extend(plan_entry.get("operator", plan_entry.get("operators", []))) # handling of operator and operators by llm without re
needed_opcodes.extend(plan_entry.get("sensing", []))
needed_opcodes.extend(plan_entry.get("looks", plan_entry.get("look", []))) # handling of look and looks by llm without re
needed_opcodes.extend(plan_entry.get("sounds", plan_entry.get("sound", []))) # handling of sound and sounds by llm without re
needed_opcodes.extend(plan_entry.get("events", [])) # there is no need of that here
needed_opcodes.extend(plan_entry.get("event", [])) # hat_block also required
needed_opcodes.extend(plan_entry.get("data", []))
needed_opcodes = list(set(needed_opcodes)) # Remove duplicates
# Merge human-written catalog and runtime catalog for comprehensive block definitions
combined_blocks = {}
for op in needed_opcodes:
catalog_def = find_block_in_all(op, all_catalogs) or {}
runtime_def = ALL_SCRATCH_BLOCKS_CATALOG.get(op, {})
merged = {**catalog_def, **runtime_def}
combined_blocks[op] = merged
print(f"[Combined blocks for this script for event {event_opcode}]: {json.dumps(combined_blocks, indent=2)}")
#with open("debug_combine_blocks.json", "w", encoding="utf-8") as f:
# json.dump(combined_blocks, f, indent=2, ensure_ascii=False)
# --- LLM Block Generation Prompt (Self-Contained and Explicit) ---
# All necessary instructions and context are directly included in the prompt.
llm_block_generation_prompt = f"""You are an AI assistant generating a **single complete Scratch 3.0 script** in JSON format.
The current sprite is '{sprite_name}'.
This script must start with the event block (Hat Block) for opcode '{event_opcode}'.
The sequential logic and the block relationship(e.g. stacking and nested logic) to implement for this script is:
Logic: {logic_sequence}
block relationships: {block_relationship}
**Project Context:**
Here is a summary of the current Scratch project's elements. You MUST reference these when generating blocks that interact with sprites, variables, lists, or broadcasts to use it in block where you requird. Do NOT invent new names if an existing one is suitable.
- **All Sprite Names:** {json.dumps(all_sprite_names)}
- **Existing Variables (Name: ID):** {json.dumps(all_variable_names)} **USE**: You can use this variable in blocks e.g. to update score & highscore, power ups, etc.
- **Existing Lists (Name: ID):** {json.dumps(all_list_names)} **USE**: You can use this List variable in blocks e.g. to show inventory, tools, etc.
- **Existing Broadcast Messages (Name: ID):** {json.dumps(all_broadcast_messages)} **USE** : you can use to broadcast messages in blocks e.g. game over, game start, next level, etc.
**Required Block Opcodes & Catalog:**
Based on the planning, the following specific Scratch block opcodes are expected to be used. You MUST use these opcodes where applicable. Here is the comprehensive catalog for these blocks, including their structure and required inputs/fields:
```json
{json.dumps(combined_blocks, indent=2)}
````
Current Scratch project JSON for this sprite (provided for context; you are generating a NEW, complete script):
```json
{json.dumps(current_sprite_target, indent=2)}
```
**CRITICAL INSTRUCTIONS FOR GENERATING THE BLOCK JSON (READ CAREFULLY AND FOLLOW PRECISELY):**
1. **Unique Block IDs:** Generate a **globally unique ID** for EVERY block (main and shadow blocks) within the entire JSON output for this script. Example: 'myBlockID123'. These IDs must be unique within the *entire* generated JSON for this script.
2. **Script Initiation (Hat Block - VERY IMPORTANT):**
* The **first block** of the script (the Hat block, opcode '{event_opcode}') MUST have `"topLevel": true` and `"parent": null`.
* ONLY this Hat block should have `"topLevel": true`. All other blocks in the script MUST have `"topLevel": false`.
* Set its `x` and `y` coordinates (e.g., `x: 0, y: 0` or similar for clear placement).
3. **Strict Block Chaining (`next` and `parent` - ABSOLUTELY CRITICAL):**
* The `next` field is used ONLY for connecting **Stack Blocks** (blocks that perform an action and can be stacked vertically).
* **NEVER** set the `next` field of a block to point to a **Reporter Block** (blocks that report a value, like `operator_gt`, `data_variable`, `math_number`). Reporter blocks (used as nested always) are placed *inside* the `inputs` of other blocks.
* Use the `next` field to point to the ID of the Stack Block DIRECTLY BELOW it in the stack. If a block has a `next`, its `next` block's `parent` MUST point back to the current block's ID.
* Use the `parent` field to point to the ID of the Stack Block DIRECTLY ABOVE it in the stack (or the C-block that contains it for substacks). If a block has a `parent`, then that `parent` block's `next` MUST point to the current block's ID (unless it's the first block in a substack, in which case the C-block's `SUBSTACK` input points to it).
* The **last stack block** in a linear stack MUST have `"next": null`.
* Blocks plugged into inputs (e.g., a reporter block into a number input) or substacks (e.g., inside a `control_forever` block) DO NOT use `next` to connect to their *parent* block; their connection is solely via the `inputs` field of their parent. They *do* use `next` for subsequent blocks *within their own stack*.
4. **`inputs` Field Structure (ABSOLUTELY CRITICAL - ADHERE TO THIS RIGIDLY):**
* The value for ANY key within the `inputs` dictionary MUST be an **array of EXACTLY two elements**: `[type_code, value_or_block_id]`.
* **`type_code` Guidance:**
* `1`: Use when connecting a block (e.g., a reporter block, boolean block, or stack block if the input expects it) to an input slot. This is the most common `type_code` when referencing a separate block ID.
* `2`: Use specifically for C-shaped substacks (e.g., the `SUBSTACK` input of `control_forever`, `control_if`, `control_repeat`). The second element is the ID of the first block within that substack.
* **NEVER embed direct primitive values or nested arrays** inside the second element of `inputs`. For example, **FORBID** `["STEPS": [1, ["math_number","10"]]]`. This is incorrect.
* **ALWAYS** generate a separate shadow block with its own unique ID for every literal value (numbers, strings, booleans) or dropdown/menu selection that is an input. Reference that shadow block ID in the parent `inputs`.
* **Correct Example for Numerical Input (e.g., for `motion_movesteps` STEPS, or `motion_gotoxy` X/Y):**
* If you need to input the number `10` into a block, you MUST create a separate `math_number` shadow block for it, and then reference its ID.
```json
// Main block using the number
"mainBlockID": {{
"opcode": "motion_movesteps",
"inputs": {{
"STEPS": [1, "shadowNumID"]
}},
"next": "nextBlockID",
"parent": "parentBlockID",
"shadow": false,
"topLevel": false
}},
// The separate shadow block for the number '10'
"shadowNumID": {{
"opcode": "math_number",
"fields": {{
"NUM": ["10", null]
}},
"shadow": true,
"parent": "mainBlockID",
"topLevel": false
}}
```
* **Correct Example for Dropdown/Menu Input (e.g., `sensing_touchingobject` with 'edge'):**
* If you need to select 'edge' for the `touching ()?` block, you MUST create a separate `sensing_touchingobjectmenu` shadow block and reference its ID.
```json
// Main block using the dropdown selection
"touchingBlockID": {{
"opcode": "sensing_touchingobject",
"inputs": {{
"TOUCHINGOBJECTMENU": [1, "shadowEdgeMenuID"]
}},
"next": null,
"parent": "someParentBlockID",
"shadow": false,
"topLevel": false
}},
// The separate shadow block for the 'edge' menu option
"shadowEdgeMenuID": {{
"opcode": "sensing_touchingobjectmenu",
"fields": {{
"TOUCHINGOBJECTMENU": ["_edge_", null]
}},
"shadow": true,
"parent": "touchingBlockID",
"topLevel": false
}}
```
* **Correct Example for C-block (e.g., `control_forever`):**
* The `control_forever` block MUST have a `SUBSTACK` input pointing to the first block inside its loop. The value for `SUBSTACK` must be an array: `[2, "FIRST_BLOCK_IN_FOREVER_LOOP_ID"]`.
```json
"foreverBlockID": {{
"opcode": "control_forever",
"inputs": {{
"SUBSTACK": [2, "firstBlockInsideForeverID"]
}},
"next": null,
"parent": "blockAboveForeverID",
"shadow": false,
"topLevel": false
}},
"firstBlockInsideForeverID": {{
// ... definition of the first block inside the forever loop
"parent": "foreverBlockID",
"next": "secondBlockInsideForeverID" // if there's another block
}}
```
* **Correct Example for C-block with condition (e.g., `control_if`):**
* The `control_if` block MUST have a `CONDITION` input (typically `type_code: 1` referencing a boolean reporter block) and a `SUBSTACK` input (`type_code: 2` referencing the first block inside the if-body).
```json
"ifBlockID": {{
"opcode": "control_if",
"inputs": {{
"CONDITION": [1, "conditionBlockID"],
"SUBSTACK": [2, "firstBlockInsideIfID"]
}},
"next": "blockAfterIfID",
"parent": "blockAboveIfID",
"shadow": false,
"topLevel": false
}},
"conditionBlockID": {{
"opcode": "sensing_touchingobject", // Example condition block
// ... definition for condition block, parent should be "ifBlockID"
"shadow": false,
"topLevel": false
}},
"firstBlockInsideIfID": {{
// ... definition of the first block inside the if body
"parent": "ifBlockID",
"next": null // or next block if more
}}
```
* **Correct Example for Broadcast Blocks (`event_broadcast`, `event_broadcast_and_wait`):**
* The main broadcast block (e.g., `event_broadcast`) has an input named `"BROADCAST_INPUT"`. Its value must be `[1, "ID_OF_BROADCAST_MENU_SHADOW_BLOCK"]`.
* The corresponding shadow block (`event_broadcast_menu`) for selecting the message has a `fields` dictionary with a key named `"BROADCAST"`. The value for `"BROADCAST"` must be an array `["Message Name", "MESSAGE_ID_FROM_TARGET_BROADCASTS"]`.
* If the broadcast message does not exist in `all_broadcast_messages`, generate a *new unique ID* for it within your generated blocks and use the desired message name. The external system will be responsible for registering this new broadcast in the overall Scratch project JSON.
* Example for a broadcast named "Game Over" with a hypothetical ID:
```json
"myBroadcastBlockID": {{
"opcode": "event_broadcast",
"inputs": {{
"BROADCAST_INPUT": [1, "myBroadcastMenuShadowID"]
}},
"next": null,
"parent": "someParentBlockID",
"shadow": false,
"topLevel": false
}},
"myBroadcastMenuShadowID": {{
"opcode": "event_broadcast_menu",
"fields": {{
"BROADCAST": ["Game Over", "my_game_over_message_ID_from_target"]
}},
"shadow": true,
"parent": "myBroadcastBlockID",
"topLevel": false
}}
```
* **Correct Example for Sensing Blocks (`sensing_touchingobject`, `sensing_touchingcolor`, etc.):**
* For blocks like `sensing_touchingobject`, the input is typically `"TOUCHINGOBJECTMENU"`. Its value must be `[1, "ID_OF_SENSING_MENU_SHADOW_BLOCK"]`.
* The corresponding shadow block (e.g., `sensing_touchingobjectmenu`) has a `fields` dictionary with a key matching the input name (e.g., `"TOUCHINGOBJECTMENU"`). The value for this field must be an array `["_mouse_", null]` or `["_edge_", null]` for specific options, or `["SpriteName", "SPRITE_ID_FROM_TARGETS"]` for other sprites.
* Ensure the string value in the `fields` array for these menus matches the exact Scratch internal string for the option (e.g., `"_edge_"` not `"edge"` unless specifically allowed). Refer to `ALL_SCRATCH_BLOCKS_CATALOG` for exact field values.
* Example for touching 'edge':
```json
"myTouchingBlockID": {{
"opcode": "sensing_touchingobject",
"inputs": {{
"TOUCHINGOBJECTMENU": [1, "myTouchingObjectMenuShadowID"]
}},
"next": null,
"parent": "someParentBlockID",
"shadow": false,
"topLevel": false
}},
"myTouchingObjectMenuShadowID": {{
"opcode": "sensing_touchingobjectmenu",
"fields": {{
"TOUCHINGOBJECTMENU": ["_edge_", null]
}},
"shadow": true,
"parent": "myTouchingBlockID",
"topLevel": false
}}
```
* **Correct Example for Clone Blocks (`control_create_clone_of`, `control_when_start_as_clone`, `control_delete_this_clone`):**
* `control_create_clone_of` has an input named `"CLONE_TARGET"`. Its value must be `[1, "ID_OF_CLONE_TARGET_MENU_SHADOW_BLOCK"]`.
* The corresponding shadow block (`control_create_clone_of_menu`) has a `fields` dictionary with a key named `"CLONE_TARGET"`. The value for `"CLONE_TARGET"` must be an array `["_myself_", null]` or `["SpriteName", "SPRITE_ID_FROM_TARGETS"]`.
* `control_when_start_as_clone` is a hat block and thus `topLevel: true`, `parent: null`, and `next: null`.
* `control_delete_this_clone` is a stack block and will have `next` and `parent` connections.
* Example for creating a clone of 'myself':
```json
"myCloneBlockID": {{
"opcode": "control_create_clone_of",
"inputs": {{
"CLONE_TARGET": [1, "myCloneTargetMenuShadowID"]
}},
"next": null,
"parent": "someParentBlockID",
"shadow": false,
"topLevel": false
}},
"myCloneTargetMenuShadowID": {{
"opcode": "control_create_clone_of_menu",
"fields": {{
"CLONE_TARGET": ["_myself_", null]
}},
"shadow": true,
"parent": "myCloneBlockID",
"topLevel": false
}}
```
5. **Define ALL Shadow Blocks Separately (THIS IS ESSENTIAL):** Every time a block's input requires a number, string literal, or a selection from a dropdown/menu, you MUST define a **separate block entry** in the top-level blocks dictionary for that shadow. Each shadow block MUST have `"shadow": true` and `"topLevel": false`.
6. **`fields` for direct values, Variables, and Lists:**
* Use the `fields` dictionary ONLY IF the block directly embeds a dropdown value, a text field, a variable reference, or a list reference without an `inputs` connection.
* The value for a field is typically an array `["Value", null]` (for simple text/dropdowns) or `["Name", "ID"]` (for variables, lists, or broadcast messages).
* **For Variables and Lists:** When referencing an existing variable or list (from `all_variable_names` or `all_list_names`), the `fields` entry MUST be `\"VARIABLE\": [\"VariableName\", \"VariableID\"]` (for a variable) or `\"LIST\": [\"ListName\", \"ListID\"]` (for a list). If the `logic_sequence` implies the use of a variable or list that does not exist, you *must* generate a *new unique ID* for it within your generated blocks, and use its name in the `fields` as specified. The external system (e.g., the `block_builder` or `project_json` integration) will be responsible for registering this new element in the overall Scratch project JSON.
* **Example for using an existing Variable (e.g., `data_setvariableto`):**
```json
"setVariableBlockID": {{
"opcode": "data_setvariableto",
"fields": {{
"VARIABLE": ["myVariable", "myVariableID_from_project_context"]
}},
"inputs": {{
"VALUE": [1, "shadowValueID"]
}},
"next": null,
"parent": "someParentBlockID",
"shadow": false,
"topLevel": false
}},
"shadowValueID": {{
"opcode": "math_number",
"fields": {{
"NUM": ["0", null]
}},
"shadow": true,
"parent": "setVariableBlockID",
"topLevel": false
}}
```
* **Example for referencing a Variable (e.g., `data_variable` reporter block):**
```json
"variableReporterID": {{
"opcode": "data_variable",
"fields": {{
"VARIABLE": ["myVariable", "myVariableID_from_project_context"]
}},
"shadow": false,
"parent": "blockUsingReporterID",
"topLevel": false
}}
```
7. **`topLevel: true` for hat blocks:** Only the starting (hat) block of a script should have `"topLevel": true`.
8. **Ensure Unique Block IDs:** Every block you generate (main blocks and shadow blocks) must have a unique ID within the entire script's block dictionary.
9. **Strictly Use Catalog Opcodes:** You MUST only use `opcode` values that are present in the provided `ALL_SCRATCH_BLOCKS_CATALOG`. Do NOT use unlisted opcodes like `motion_jump`.
10. **Return ONLY the JSON object representing all the blocks for THIS SINGLE SCRIPT.** Do NOT wrap it in a 'blocks' key or the full project JSON. The output should be a dictionary where keys are block IDs and values are block definitions. You need to ensure that the generated blocks fulfill the `logic_sequence` for the `event_opcode`.
"""
try:
response = agent.invoke({"messages": [{"role": "user", "content": llm_block_generation_prompt}]})
raw_response = response["messages"][-1].content
logger.info(f"Raw response from LLM [OverallBlockBuilderNode - {sprite_name} - {event_opcode}]: {raw_response[:500]}...")
print(f"Raw response from LLM [OverallBlockBuilderNode - {sprite_name} - {event_opcode}]: {raw_response}")
try:
generated_blocks = extract_json_from_llm_response(raw_response)
except json.JSONDecodeError as error_json:
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.")
# --- JSON Correction Agent ---
correction_prompt = (
"Your primary goal is to generate JSON that precisely defines Scratch 3.0 blocks. "
"The output MUST be a single, valid JSON object, enclosed within a '```json' markdown block.\n"
"**STRICT JSON SYNTAX RULES:**\n"
"1. **NO EXTRA TEXT, COMMENTS, OR CONVERSATIONAL FILLER.** Only the JSON.\n"
"2. All keys MUST be enclosed in double quotes.\n"
"3. String values MUST be enclosed in double quotes. Internal double quotes must be escaped (e.g., `\\\"`). Newlines (`\\n`) and carriage returns (`\\r`) in strings must be escaped.\n"
"4. No trailing commas.\n"
"5. Correct nesting of braces `{}` and brackets `[]`.\n"
"6. **CRITICAL: Understand and strictly follow the Scratch block input array format.**\n"
" - When an input refers to another top-level block (like a variable block or a number block that is defined separately):\n"
" `\"INPUT_NAME\": [1, \"ID_OF_REFERENCED_BLOCK\"]`\n"
" - When an input *directly embeds* a literal value (like a number, string, or boolean shadow block) as per the Scratch schema (which is less common for simple literals but necessary for certain types):\n"
" `\"INPUT_NAME\": [1, [\"math_number\", \"10\"]]` (for numbers, as a shadow block definition)\n"
" `\"INPUT_NAME\": [1, [\"text\", \"hello\"]]` (for strings, as a shadow block definition)\n"
" `\"INPUT_NAME\": [1, [\"boolean\", true]]` (for booleans, as a shadow block definition)\n"
" **However, for most literal inputs, the standard is to define a separate shadow block and reference its ID.** Refer to the earlier prompt for the preferred method using separate shadow blocks. The nested array format `[1, [\"opcode\", \"value\"]]` should generally be avoided unless specifically indicated by the Scratch schema for certain block types where the shadow block is *always* an implicit part of the input. **For clarity and consistency, prioritize creating separate shadow blocks and referencing their IDs in the input array as `[1, \"ShadowBlockID\"]`.**\n"
" - **Do NOT** generate patterns like `some_id[\"value\", null]]` or `\"\"\": [1, some_id[\"\", null]]`.\n"
" - Ensure `\"X\"` and `\"Y\"` inputs for `motion_gotoxy` and `motion_glidesecstoxy` correctly reference a block ID (usually a math_number shadow block).\n"
"7. The 'logic' field should contain a plain string describing the action, without any embedded JSON or Scratch block syntax (like `\"forever\":\"` which breaks the string).\n\n"
"Here is the problematic JSON string that needs correction, including the error details:\n"
f"- **Error Details**: {error_json}\n"
"```json\n"
f"{raw_response}\n"
"```\n"
"Your corrected, valid JSON response:\n"
"```json\n"
)
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
print(f"[JSON CORRECTOR RESPONSE AT OVERALLBLOCKBUILDER ]: {correction_response['messages'][-1].content}")
generated_blocks = extract_json_from_llm_response(correction_response["messages"][-1].content)
if "blocks" in generated_blocks and isinstance(generated_blocks["blocks"], dict):
logger.warning(f"LLM returned nested 'blocks' key for {sprite_name}. Unwrapping.")
generated_blocks = generated_blocks["blocks"]
# Update block positions for top-level script
for block_id, block_data in generated_blocks.items():
if block_data.get("topLevel"):
block_data["x"] = script_x_offset_per_sprite.get(sprite_name, 0)
block_data["y"] = script_y_offset[sprite_name]
script_y_offset[sprite_name] += 150 # Increment for next script
current_sprite_target["blocks"].update(generated_blocks)
state["iteration_count"] = 0
logger.info(f"Action blocks added for sprite '{sprite_name}', script '{event_opcode}' by OverallBlockBuilderNode.")
except Exception as e:
logger.error(f"Error generating blocks for sprite '{sprite_name}', script '{event_opcode}': {e}")
raise
state["project_json"] = project_json
logger.info("Updated project JSON with action nodes.")
#with open("debug_state.json", "w", encoding="utf-8") as f:
# json.dump(state, f, indent=2, ensure_ascii=False)
print("Updated project JSON with action nodes:", json.dumps(project_json, indent=2))
return state
#helper function to identify the shape of block utilized by the block verifier
def get_block_type(opcode: str) -> str:
"""Determines the general type of a Scratch block based on its opcode."""
if not opcode:
return "unknown"
if opcode.startswith("event_when") or opcode == "control_start_as_clone":
return "hat"
elif opcode.startswith("control_") and ("if" in opcode or "repeat" in opcode or "forever" in opcode):
return "c_block"
elif opcode in ["operator_equals", "operator_gt", "operator_lt", "operator_and", "operator_or", "operator_not"] or \
(opcode.startswith("sensing_") and ("mousedown" in opcode or "keypressed" in opcode or "touching" in opcode)):
return "boolean"
elif opcode.endswith("menu"): # For dropdown shadow blocks, treat as reporter for type checking
return "reporter"
elif any(s in opcode for s in ["position", "direction", "size", "volume", "costume", "backdrop", "random", "add", "subtract", "multiply", "divide", "length", "item", "of"]):
# A more comprehensive check for reporters
return "reporter"
elif "stop_all" in opcode or "delete_this_clone" in opcode or "procedures_definition" in opcode:
return "cap"
# Default to stack for most command blocks if not explicitly defined
return ALL_SCRATCH_BLOCKS_CATALOG.get(opcode, {}).get("blockType", "stack")
def filter_script_blocks(all_blocks: dict, hat_block_id: str) -> dict:
"""
Filters and returns only the blocks that are part of a specific script
starting from the given hat_block_id, including connected reporters/shadows.
"""
script_blocks = {}
q = [hat_block_id]
visited = set()
while q:
current_block_id = q.pop(0)
if current_block_id in visited:
continue
visited.add(current_block_id)
block_data = all_blocks.get(current_block_id)
if not block_data:
continue
script_blocks[current_block_id] = block_data
# Add next block in sequence (only for stack/c-blocks)
if get_block_type(block_data.get("opcode")) in ["stack", "c_block", "hat"]: # Hat blocks have next too!
next_id = block_data.get("next")
if next_id and next_id in all_blocks:
q.append(next_id)
# Add blocks connected via inputs (e.g., reporters, shadow blocks, substacks)
if "inputs" in block_data:
for input_name, input_value in block_data["inputs"].items():
if isinstance(input_value, list) and len(input_value) >= 2:
value_or_block_id = input_value[1]
if isinstance(value_or_block_id, str) and value_or_block_id in all_blocks:
q.append(value_or_block_id)
# For type code 3 (reporter with default value), the third element might be a connected block
if len(input_value) >= 3 and isinstance(input_value[2], str) and input_value[2] in all_blocks:
q.append(input_value[2])
# For C-blocks, add blocks in substacks (if present)
if get_block_type(block_data.get("opcode")) == "c_block":
for input_key, input_val in block_data.get("inputs", {}).items():
# Check for substack inputs (type code 2, typically starts with SUBSTACK)
if isinstance(input_val, list) and len(input_val) >= 2 and input_val[0] == 2 and isinstance(input_val[1], str) and input_val[1] in all_blocks:
q.append(input_val[1])
return script_blocks
def analyze_script_structure(
script_blocks: dict,
hat_block_id: str,
sprite_name: str,
sprite_variables: dict,
sprite_lists: dict,
sprite_broadcasts: dict,
sprite_custom_blocks: dict # proccode -> block_id mapping
) -> list:
"""
Analyzes the structure of a single Scratch script for common errors.
Returns a list of issue strings.
"""
issues = []
# 1. Validate the hat block
hat_block = script_blocks.get(hat_block_id)
if not hat_block:
issues.append(f"Script for sprite '{sprite_name}' (hat ID: {hat_block_id}) has no hat block data.")
return issues # Cannot proceed without a hat block
if not hat_block.get("topLevel") or hat_block.get("parent") is not None:
issues.append(f"Hat block '{hat_block_id}' for sprite '{sprite_name}' is not marked as topLevel or has a parent, which is incorrect for a script start.")
# Hat blocks can and do have a 'next' connection to the first block in their script.
# The previous check was a false positive. Removed the explicit "should not have next" check for hat blocks.
# 2. Check all blocks within the script
for block_id, block_data in script_blocks.items():
opcode = block_data.get("opcode")
if not opcode:
issues.append(f"Block '{block_id}' for sprite '{sprite_name}' is missing an opcode.")
continue
# Check if opcode exists in the catalog
if opcode not in ALL_SCRATCH_BLOCKS_CATALOG:
issues.append(f"Block '{block_id}' for sprite '{sprite_name}' has unknown opcode '{opcode}'. Recommendation: Verify against Scratch 3.0 block reference.")
# Parent-Child and Next-Previous Linkage
parent_id = block_data.get("parent")
next_id = block_data.get("next")
current_block_type = get_block_type(opcode)
if parent_id:
parent_block = script_blocks.get(parent_id)
if not parent_block:
issues.append(f"Block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' references non-existent parent '{parent_id}'.")
else:
# Validate the parent-child connection type based on block types
parent_block_type = get_block_type(parent_block.get("opcode"))
if parent_block_type in ["stack", "hat", "c_block"]:
# If parent is a stack/hat/C-block, this block should be its 'next' or an input/substack
if parent_block.get("next") == block_id:
# Correct sequential connection
pass
else:
found_as_input = False
for input_name, input_value in parent_block.get("inputs", {}).items():
if isinstance(input_value, list) and len(input_value) >= 2 and input_value[1] == block_id:
found_as_input = True
break
if not found_as_input:
issues.append(f"Block '{block_id}' (opcode: {opcode}) has parent '{parent_id}' but is not a sequential 'next' block nor connected via an input slot. Potential broken linkage.")
elif parent_block_type in ["reporter", "boolean"]:
# If parent is a reporter/boolean, this block must be its child (e.g., a shadow block or another reporter)
# and connected via an input.
found_as_input = False
for input_name, input_value in parent_block.get("inputs", {}).items():
if isinstance(input_value, list) and len(input_value) >= 2 and input_value[1] == block_id:
found_as_input = True
break
if not found_as_input:
issues.append(f"Block '{block_id}' (opcode: {opcode}) has reporter/boolean parent '{parent_id}' but is not linked via an input. Potential broken linkage.")
else:
issues.append(f"Block '{block_id}' (opcode: {opcode}) has an unexpected parent block type '{parent_block_type}' for parent '{parent_id}'.")
# Check next linkage: only cap blocks should not have a next. Shadow blocks also shouldn't have next.
if next_id:
next_block = script_blocks.get(next_id)
if not next_block:
issues.append(f"Block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' references non-existent next block '{next_id}'.")
elif next_block.get("parent") != block_id:
issues.append(f"Block '{block_id}' (opcode: {opcode})'s next block '{next_id}' does not link back to it via 'parent'.")
# Cap blocks should not have 'next'
if current_block_type == "cap":
issues.append(f"Cap block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' should not have a 'next' connection as it ends a script.")
# Shadow blocks should not have 'next'
if block_data.get("shadow"):
issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' should not have a 'next' connection.")
# Input validation
if "inputs" in block_data:
for input_name, input_value in block_data["inputs"].items():
if not isinstance(input_value, list) or len(input_value) < 2:
issues.append(f"Block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' has malformed input '{input_name}': {input_value}.")
continue
type_code = input_value[0]
value_or_block_id = input_value[1]
# Check type code validity
if type_code not in [1, 2, 3]:
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' has invalid type code: {type_code}. Expected 1, 2, or 3.")
if isinstance(value_or_block_id, str):
# It's a block ID, check if it exists and its parent link is correct
connected_block = script_blocks.get(value_or_block_id)
if not connected_block:
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' references non-existent block ID '{value_or_block_id}'.")
continue # Add this line to skip further processing if block is not found
elif connected_block.get("parent") != block_id:
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' connects to '{value_or_block_id}', but '{value_or_block_id}'s parent is not '{block_id}'.")
connected_block_type = get_block_type(connected_block.get("opcode"))
# Refined type code consistency checks
if type_code == 2: # Block ID (substack or reporter/boolean plugged in, without shadow)
if input_name.startswith("SUBSTACK"):
if connected_block_type != "stack":
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' (type 2, SUBSTACK) expects a stack block, but connected block '{value_or_block_id}' is of type '{connected_block_type}'.")
elif connected_block_type not in ["reporter", "boolean"]:
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' (type 2, non-SUBSTACK) expects a reporter/boolean, but connected block '{value_or_block_id}' is of type '{connected_block_type}'.")
elif type_code == 1: # Literal value or shadow block ID (possibly with an attached block)
if isinstance(value_or_block_id, str) and connected_block_type not in ["reporter", "boolean"] and not connected_block.get("shadow"):
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' (type 1) expects a literal or reporter/boolean/shadow, but connected block '{value_or_block_id}' is of type '{connected_block_type}' and not a shadow.")
elif type_code == 3: # Shadow block ID with default value, possibly with attached block
if isinstance(value_or_block_id, str) and not connected_block.get("shadow"):
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' (type 3) expects a shadow block, but connected block '{value_or_block_id}' is not a shadow.")
# Specific checks for C-blocks' SUBSTACK
if get_block_type(opcode) == "c_block" and input_name.startswith("SUBSTACK"):
# For a C-block, SUBSTACK input must have type code 2 and link to a valid block ID
if not (isinstance(value_or_block_id, str) and script_blocks.get(value_or_block_id) and type_code == 2):
issues.append(f"C-block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' has an invalid or missing SUBSTACK input configuration.")
# Shadow block specific checks
if block_data.get("shadow"):
if block_data.get("topLevel"):
issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' is incorrectly marked as topLevel.")
if not block_data.get("parent"):
issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' is missing a parent.")
if block_data.get("next"): # Shadow blocks should never have 'next'
issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' should not have a 'next' connection.")
# Enhanced field validation for specific shadow block opcodes
if opcode == "math_number":
num_field = block_data.get("fields", {}).get("NUM")
if not (isinstance(num_field, list) and len(num_field) >= 1 and isinstance(num_field[0], (str, int, float))):
issues.append(f"Math_number shadow block '{block_id}' has malformed 'NUM' field: {num_field}. Expected [value, ID_or_null].")
elif opcode == "event_broadcast_menu":
broadcast_field = block_data.get("fields", {}).get("BROADCAST_OPTION") # Standard field name for broadcast menu
if not (isinstance(broadcast_field, list) and len(broadcast_field) >= 1 and isinstance(broadcast_field[0], str) and broadcast_field[0]): # Ensure broadcast name is a non-empty string
issues.append(f"Broadcast menu shadow block '{block_id}' has malformed or empty 'BROADCAST_OPTION' field: {broadcast_field}. Expected ['message name', 'ID_or_null'].")
if broadcast_field and len(broadcast_field) > 1 and broadcast_field[1] is None:
# This indicates a 'default option' is null, which is often a problem if it's meant to be a pre-defined ID.
# Flag this as a potential issue, indicating a missing default option.
issues.append(f"Broadcast menu shadow block '{block_id}' has a 'BROADCAST_OPTION' field with a null default ID: {broadcast_field}. This might indicate a missing pre-defined broadcast ID.")
elif opcode == "sensing_touchingobjectmenu":
touching_field = block_data.get("fields", {}).get("TOUCHINGOBJECTMENU")
if not (isinstance(touching_field, list) and len(touching_field) >= 1 and isinstance(touching_field[0], str) and touching_field[0]):
issues.append(f"Touching object menu shadow block '{block_id}' has malformed or empty 'TOUCHINGOBJECTMENU' field: {touching_field}. Expected ['object name', 'ID_or_null'].")
if touching_field and len(touching_field) > 1 and touching_field[1] is None:
issues.append(f"Touching object menu shadow block '{block_id}' has a 'TOUCHINGOBJECTMENU' field with a null default ID: {touching_field}. This might indicate a missing pre-defined object ID for 'edge', 'mouse-pointer', or a sprite.")
# Add more specific checks for other menu-type shadow blocks if needed
# Validate references to variables, lists, and broadcasts
if "fields" in block_data:
# Variable references (data_setvariableto, data_changevariableby, data_variable)
if opcode in ["data_setvariableto", "data_changevariableby", "data_variable"]:
var_name_field = block_data["fields"].get("VARIABLE")
if var_name_field and isinstance(var_name_field, list) and len(var_name_field) >= 1 and isinstance(var_name_field[0], str):
var_name = var_name_field[0]
if var_name not in sprite_variables:
issues.append(f"Block '{block_id}' (opcode: {opcode}) references non-existent variable '{var_name}'. Recommendation: Ensure '{var_name}' is defined.")
else:
issues.append(f"Block '{block_id}' (opcode: {opcode}) has malformed 'VARIABLE' field: {var_name_field}.")
# List references (data_addtolist, data_deleteoflist, data_itemoflist, data_listcontents)
if opcode in ["data_addtolist", "data_deleteoflist", "data_itemoflist", "data_listcontents"]:
list_name_field = block_data["fields"].get("LIST")
if list_name_field and isinstance(list_name_field, list) and len(list_name_field) >= 1 and isinstance(list_name_field[0], str):
list_name = list_name_field[0]
if list_name not in sprite_lists:
issues.append(f"Block '{block_id}' (opcode: {opcode}) references non-existent list '{list_name}'. Recommendation: Ensure '{list_name}' is defined.")
else:
issues.append(f"Block '{block_id}' (opcode: {opcode}) has malformed 'LIST' field: {list_name_field}.")
# Broadcast references in 'when I receive' hat blocks
if opcode == "event_whenbroadcastreceived":
broadcast_name_field = block_data["fields"].get("BROADCAST_OPTION")
if broadcast_name_field and isinstance(broadcast_name_field, list) and len(broadcast_name_field) >= 1 and isinstance(broadcast_name_field[0], str):
broadcast_name = broadcast_name_field[0]
if broadcast_name not in sprite_broadcasts:
issues.append(f"Hat block '{block_id}' (opcode: {opcode}) listens for non-existent broadcast message '{broadcast_name}'. Recommendation: Ensure '{broadcast_name}' is broadcast elsewhere.")
else:
issues.append(f"Hat block '{block_id}' (opcode: {opcode}) has malformed 'BROADCAST_OPTION' field: {broadcast_name_field}.")
# Custom block call validation
if opcode == "procedures_call" and "mutation" in block_data:
proccode = block_data["mutation"].get("proccode")
if proccode and proccode not in sprite_custom_blocks:
issues.append(f"Custom block call '{block_id}' (proccode: {proccode}) references non-existent custom block definition. Recommendation: Define custom block '{proccode}'.")
if not proccode:
issues.append(f"Custom block call '{block_id}' is missing 'proccode' in its mutation.")
# Check for event_broadcast blocks having empty message (via malformed shadow)
if opcode == "event_broadcast" or opcode == "event_broadcastandwait":
broadcast_input = block_data.get("inputs", {}).get("BROADCAST_INPUT")
if broadcast_input and isinstance(broadcast_input, list) and len(broadcast_input) >= 2:
# If connected to a shadow block
if isinstance(broadcast_input[1], str) and script_blocks.get(broadcast_input[1]) and script_blocks[broadcast_input[1]].get("shadow"):
shadow_block = script_blocks[broadcast_input[1]]
if shadow_block.get("opcode") == "event_broadcast_menu":
broadcast_field = shadow_block.get("fields", {}).get("BROADCAST_OPTION")
if not (broadcast_field and isinstance(broadcast_field, list) and len(broadcast_field) >= 1 and isinstance(broadcast_field[0], str) and broadcast_field[0].strip()):
issues.append(f"Broadcast block '{block_id}' uses a shadow block '{shadow_block.get('id', 'unknown')}' that has an empty or malformed broadcast message. Recommendation: Ensure the broadcast message is a non-empty string.")
# If connected to a literal string (less common for broadcast but possible)
elif isinstance(broadcast_input[1], list) and len(broadcast_input[1]) >= 2 and isinstance(broadcast_input[1][1], str) and not broadcast_input[1][1].strip():
issues.append(f"Broadcast block '{block_id}' has an empty literal broadcast message. Recommendation: Provide a non-empty message.")
else:
issues.append(f"Broadcast block '{block_id}' has a missing or malformed 'BROADCAST_INPUT'. Recommendation: Ensure a valid message or shadow block is connected.")
return issues
# Node 12: Verification Node
def block_verification_node(state: dict) -> dict:
"""
The block verifier check for any improvements needed through logical if-else
and then adds them to the improvement_plan.
After the improvement plan, the LLM reviewer node also checks for other
errors or issues, if any, and finally provides the review as feedback.
Args:
state (dict): The current state dictionary containing project_json, etc.
Returns:
dict: The updated state dictionary with verification feedback.
"""
logger.info(f"--- Running BlockVerificationNode (Iteration: {state.get('iteration_count', 0)}) ---")
# Increased MAX_IMPROVEMENT_ITERATIONS to allow for more verification and improvement cycles.
# Set to 5 to allow multiple detection-improvement loops.
MAX_IMPROVEMENT_ITERATIONS = 3
current_iteration = state.get("iteration_count", 0)
project_json = state["project_json"]
targets = project_json["targets"]
# Initialize needs_improvement for the current run
state["needs_improvement"] = False
block_validation_feedback_overall = []
improvement_plan = {"sprite_issues": {}}
state["improvement_plan"] = improvement_plan # Ensure it's in state even if empty
for target in targets:
sprite_name = target["name"]
all_blocks_for_sprite = target.get("blocks", {})
# Correctly extract variables (name -> ID mapping)
sprite_variables = {var_data[0]: var_id for var_id, var_data in target.get("variables", {}).items()}
# Correctly extract lists (name -> ID mapping)
sprite_lists = {list_data[0]: list_id for list_id, list_data in target.get("lists", {}).items()}
# Broadcasts are typically collected from the 'event_whenbroadcastreceived' blocks
# and potentially from the 'event_broadcast_menu' shadow blocks.
sprite_broadcasts = set()
for block_id, block_data in all_blocks_for_sprite.items():
if block_data.get("opcode") == "event_whenbroadcastreceived" and "BROADCAST_OPTION" in block_data.get("fields", {}):
broadcast_name_field = block_data["fields"]["BROADCAST_OPTION"]
if isinstance(broadcast_name_field, list) and len(broadcast_name_field) >= 1 and isinstance(broadcast_name_field[0], str):
sprite_broadcasts.add(broadcast_name_field[0])
elif block_data.get("opcode") == "event_broadcast_menu" and "BROADCAST_OPTION" in block_data.get("fields", {}) and block_data.get("shadow"):
broadcast_name_field = block_data["fields"]["BROADCAST_OPTION"]
if isinstance(broadcast_name_field, list) and len(broadcast_name_field) >= 1 and isinstance(broadcast_name_field[0], str):
sprite_broadcasts.add(broadcast_name_field[0])
# Extract custom block definitions (proccodes)
sprite_custom_blocks = {}
for block_id, block_data in all_blocks_for_sprite.items():
if block_data.get("opcode") == "procedures_definition" and "mutation" in block_data:
proccode = block_data["mutation"].get("proccode")
if proccode:
sprite_custom_blocks[proccode] = block_id
if not all_blocks_for_sprite:
logger.info(f"Sprite '{sprite_name}' has no blocks. Skipping verification.")
continue
sprite_issues = []
hat_block_ids = [
block_id for block_id, block_data in all_blocks_for_sprite.items()
if block_data.get("topLevel") and get_block_type(block_data.get("opcode")) == "hat"
]
processed_script_blocks = set()
if not hat_block_ids:
sprite_issues.append("No top-level hat blocks found for this sprite. Scripts may not run automatically.")
for hat_id in hat_block_ids:
logger.info(f"Verifying script starting with hat block '{hat_id}' for sprite '{sprite_name}'.")
current_script_blocks = filter_script_blocks(all_blocks_for_sprite, hat_id)
processed_script_blocks.update(current_script_blocks.keys())
script_issues = analyze_script_structure(
current_script_blocks,
hat_id,
sprite_name,
sprite_variables,
sprite_lists,
sprite_broadcasts,
sprite_custom_blocks
)
if script_issues:
sprite_issues.append(f"Issues in script starting with '{hat_id}':")
sprite_issues.extend([f" - {issue}" for issue in script_issues])
else:
logger.info(f"Script starting with '{hat_id}' for sprite '{sprite_name}' passed basic verification.")
# Identify truly orphaned blocks (not top-level, not part of any script, no parent, not shadow)
orphaned_blocks_overall = {
block_id for block_id in all_blocks_for_sprite.keys()
if block_id not in processed_script_blocks
and not all_blocks_for_sprite[block_id].get("topLevel")
and not all_blocks_for_sprite[block_id].get("parent")
and not all_blocks_for_sprite[block_id].get("shadow")
}
if orphaned_blocks_overall:
sprite_issues.append(f"Found {len(orphaned_blocks_overall)} truly orphaned blocks not connected to any valid script: {', '.join(list(orphaned_blocks_overall)[:5])}{'...' if len(orphaned_blocks_overall) > 5 else ''}.")
# Also check for top-level blocks that are not hat blocks and are not processed
for block_id in all_blocks_for_sprite.keys():
block_data = all_blocks_for_sprite[block_id]
if block_data.get("topLevel") and get_block_type(block_data.get("opcode")) != "hat" and block_id not in processed_script_blocks:
sprite_issues.append(f"Top-level block '{block_id}' (opcode: {block_data.get('opcode')}) is not a hat block and is not part of any script, so it will not run automatically.")
if sprite_issues:
improvement_plan["sprite_issues"][sprite_name] = sprite_issues
logger.warning(f"Verification found issues for sprite '{sprite_name}'.")
block_validation_feedback_overall.append(f"Issues for {sprite_name}:\n" + "\n".join([f"- {issue}" for issue in sprite_issues]))
state["needs_improvement"] = True
print(f"\n--- Verification Report (Issues Found for {sprite_name}) ---")
print(json.dumps({sprite_name: sprite_issues}, indent=2))
else:
logger.info(f"Sprite '{sprite_name}' passed all verification checks.")
# Consolidate feedback for the LLM
state["block_validation_feedback"] = "\n\n".join(block_validation_feedback_overall)
print(f"[OVERALL IMPROVEMENT PLAN ON ITERATION {current_iteration}]: {improvement_plan}")
if state["needs_improvement"]:
llm_reviewer_prompt = f"""You are an expert Scratch project reviewer. Your task is to analyze the provided \
structural issues found in a Scratch project's sprites and suggest improvements or \
further insights. Focus on clarity, accuracy, and actionable advice.
Here are the detected structural issues:
{json.dumps(improvement_plan['sprite_issues'], indent=2)} \n
Here are the block validation feedback:
{json.dumps(state["block_validation_feedback"], indent=2)} \n
Please review these issues and provide a consolidated report with potential causes \
and recommendations for fixing them. If an issue is minor or expected in certain \
scenarios (e.g., hidden blocks for backward compatibility), please note that.
Structure your response as a JSON object with 'review_summary' as the key, \
containing a dictionary where keys are sprite names and values are lists of suggested improvements.
Example:
```json
{{
"review_summary": {{
"Sprite1": [
"Issue: Hat block 'abc' is not topLevel. Recommendation: Ensure all scripts start with a top-level hat block that has no parent.",
"Issue: Block 'xyz' has unknown opcode 'motion_nonexistent'. Recommendation: Verify the opcode against the Scratch 3.0 block reference. This might be a typo or a deprecated block.",
"Issue: Block 'var_block_id' references non-existent variable 'myVariable'. Recommendation: Ensure all variables used in blocks are defined in the sprite's variable list."
],
"Sprite2": [
"Issue: Found 3 orphaned blocks. Recommendation: Reconnect these blocks to existing scripts or remove them if no longer needed."
]
}}
}}
```
"""
try:
response = agent.invoke({"messages": [{"role": "user", "content": llm_reviewer_prompt}]})
raw_review_response = response["messages"][-1].content
try:
state["review_block_feedback"] = extract_json_from_llm_response(raw_review_response)
except json.JSONDecodeError as error_json:
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.")
# Use the JSON resolver agent to fix the response
correction_prompt = (
"Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n"
"Carefully review the JSON for any errors, especially focusing on the reported error at:\n"
f"- **Error Details**: {error_json}\n\n"
"**Strict Instructions for your response:**\n"
"1. **ONLY** output the corrected JSON. Do not include any other text, comments, or explanations outside the JSON.\n"
"2. Ensure all property names (keys) are enclosed in **double quotes**.\n"
"3. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\`) are properly **escaped**.\n"
"4. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n"
"5. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n"
"6. **Crucially, remove any extraneous characters or duplicate closing braces outside the main JSON object.**\n"
"7. The corrected JSON must be a **complete and valid** JSON object.\n\n"
"Here is the problematic JSON string to correct:\n"
"```json\n"
f"{raw_review_response}\n"
"```\n"
"Corrected JSON:\n"
)
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
print(f"[JSON CORRECTOR RESPONSE AT BLOCKVERFIER ]: {correction_response['messages'][-1].content}")
state["review_block_feedback"] = extract_json_from_llm_response(correction_response["messages"][-1].content)
logger.info("Agent review feedback added to the state.")
print("\n--- Agent Review Feedback ---")
print(json.dumps(state["review_block_feedback"], indent=2))
except Exception as e:
logger.error(f"Error invoking agent for review in BlockVerificationNode: {e}")
# Fallback for LLM error: populate with a general error message
state["review_block_feedback"] = {"review_summary": {"Overall": [f"Error during LLM review: {e}"]}}
else:
logger.info("BlockVerificationNode completed: No issues found in any sprite blocks.")
print("\n--- Verification Report (No Issues Found) ---")
state["block_validation_feedback"] = "No issues found in sprite blocks."
state["review_block_feedback"] = {"review_summary": {"Overall": ["No issues found in sprite blocks. All good!"]}}
# Manage iteration count based on overall needs_improvement flag
if state["needs_improvement"]:
# Increment iteration count *only if* improvements are needed for the next cycle
state["iteration_count"] = current_iteration + 1
if state["iteration_count"] >= MAX_IMPROVEMENT_ITERATIONS:
logger.warning(f"Max improvement iterations ({MAX_IMPROVEMENT_ITERATIONS}) reached for block verification. Forcing 'needs_improvement' to False.")
state["needs_improvement"] = False
state["block_validation_feedback"] += "\n(Note: Max iterations reached for block verification, stopping further improvements.)"
# Ensure 'Overall' key exists before attempting to append
if "review_summary" not in state["review_block_feedback"]:
state["review_block_feedback"]["review_summary"] = {}
if "Overall" not in state["review_block_feedback"]["review_summary"]:
state["review_block_feedback"]["review_summary"]["Overall"] = []
state["review_block_feedback"]["review_summary"]["Overall"].append("Max iterations reached for block verification, stopping further improvements based on this run.")
else:
# Reset if no more improvement needed for blocks, indicating successful verification
state["iteration_count"] = 0
logger.info(f"Block verification completed. Needs Improvement: {state['needs_improvement']}. Feedback: {state['block_validation_feedback'][:100]}...")
print("===========================================================================")
print(f"[BLOCK VERIFICATION NODE: (improvement_plan)]:{state.get('improvement_plan')}")
print(f"[BLOCK VERIFICATION NODE: (review_block_feedback)]:{state.get('review_block_feedback')}")
#with open("debug_state.json", "w", encoding="utf-8") as f:
# json.dump(state, f, indent=2, ensure_ascii=False)
return state
# Node 13: Improvised block builder Node
def improvement_block_builder_node(state: GameState):
logger.info("--- Running ImprovementBlockBuilderNode ---")
project_json = state["project_json"]
targets = project_json["targets"]
sprite_map = {target["name"]: target for target in targets if not target["isStage"]}
# Also get the Stage target
stage_target = next((target for target in targets if target["isStage"]), None)
if stage_target:
sprite_map[stage_target["name"]] = stage_target
# Pre-load all block-catalog JSONs once
all_catalogs = [
hat_block_data,
boolean_block_data,
c_block_data,
cap_block_data,
reporter_block_data,
stack_block_data
]
# Renamed from improvement_plan as it's not directly used here for iteration
# improvement_plan = state.get("improvement_plan", {})
block_verification_feedback = state.get("block_validation_feedback", "no feedback")
review_block_feedback = state.get("review_block_feedback", {}) # This node is combination of function base issue finder + LLM issue finder.
action_plan = state.get("action_plan", {})
if not action_plan: # Check if action_plan is empty instead of review_block_feedback
logger.warning("No action plan found in state. Skipping ImprovementBlockBuilderNode.")
return state
script_y_offset = {}
script_x_offset_per_sprite = {name: 0 for name in sprite_map.keys()}
# This is the handler which ensure if somehow json response changed it handle it.[DONOT REMOVE BELOW LOGIC]
if action_plan.get("action_overall_flow", {}) == {}:
plan_data = action_plan.items()
else:
plan_data = action_plan.get("action_overall_flow", {}).items()
for sprite_name, sprite_improvements_data in plan_data:
if sprite_name in sprite_map:
current_sprite_target = sprite_map[sprite_name]
# Ensure 'blocks' key exists for the current sprite
if "blocks" not in current_sprite_target:
current_sprite_target["blocks"] = {}
# Fetch existing blocks for the current sprite
existing_sprite_blocks = current_sprite_target.get("blocks", {})
if sprite_name not in script_y_offset:
script_y_offset[sprite_name] = 0
# Get review feedback specific to this sprite
sprite_review_summary = review_block_feedback.get("review_summary", state.get(sprite_name, [])).get(sprite_name, []) # get review_summary or direct_issues if any
sprite_review_feedback_str = "\n".join([f"- {issue}" for issue in sprite_review_summary]) if sprite_review_summary else "No specific issues reported for this sprite in the review."
for plan_entry in sprite_improvements_data.get("plans", []):
event_opcode = plan_entry["event"] # This is now expected to be an opcode
logic_sequence = plan_entry["logic"] # This is the semicolon-separated string
# Extract the new opcode lists from the plan_entry
motion_opcodes = plan_entry.get("motion", plan_entry.get("motions", [])) # handling of motion and motions by llm without re
control_opcodes = plan_entry.get("control", plan_entry.get("controls", [])) # handling of control and controls by llm without re
operator_opcodes = plan_entry.get("operator", plan_entry.get("operators", [])) # handling of operator and operators by llm without re
sensing_opcodes = plan_entry.get("sensing", [])
looks_opcodes = plan_entry.get("looks", plan_entry.get("look", [])) # handling of look and looks by llm without re
sound_opcodes = plan_entry.get("sounds", plan_entry.get("sound", [])) # handling of sound and sounds by llm without re
events_opcodes = plan_entry.get("events", []) # for stack block
events_opcodes_2_raw = plan_entry.get("event", []) # for hat block [NOTE: both are different]
events_opcodes_2 = [events_opcodes_2_raw] if isinstance(events_opcodes_2_raw, str) else events_opcodes_2_raw
data_opcodes = plan_entry.get("data", [])
needed_opcodes = (
motion_opcodes + control_opcodes + operator_opcodes +
sensing_opcodes + looks_opcodes + sound_opcodes +
events_opcodes + events_opcodes_2 + data_opcodes
)
needed_opcodes = list(set(needed_opcodes))
# 2) build filtered runtime catalog (if you still need it)
filtered_catalog = {
op: ALL_SCRATCH_BLOCKS_CATALOG[op]
for op in needed_opcodes
if op in ALL_SCRATCH_BLOCKS_CATALOG
}
# 3) merge human-written catalog + runtime entry for each opcode
combined_blocks = {}
for op in needed_opcodes:
catalog_def = find_block_in_all(op, all_catalogs) or {}
runtime_def = ALL_SCRATCH_BLOCKS_CATALOG.get(op, {})
# merge: catalog fields first, then runtime overrides/adds
merged = {**catalog_def, **runtime_def}
combined_blocks[op] = merged
print("Combined blocks for this script:", json.dumps(combined_blocks, indent=2))
llm_block_generation_prompt = f"""You are an AI assistant generating Scratch 3.0 block JSON for a single script based on an improvement plan.
The current sprite is '{sprite_name}'.
The specific script to generate blocks for is for the event with opcode '{event_opcode}'.
The sequential logic to implement is:
Logic: {logic_sequence}
**Scratch 3.0 Block Shape Description**
### Hat Blocks Description: {hat_description}
### Boolean Blocks Description: {boolean_description}
### C Blocks Description: {c_description}
### Cap Blocks Description: {cap_description}
### Reporter Blocks Description: {reporter_description}
### Stack Blocks Description: {stack_description}
**Based on the planning, the following Scratch block opcodes are expected to be used to implement this logic. Focus on using these specific opcodes where applicable, and refer to the ALL_SCRATCH_BLOCKS_CATALOG for their full structure and required inputs/fields:**
Here is the comprehensive catalog of required Scratch 3.0 blocks:
```json
{json.dumps(combined_blocks, indent=2)}
```
Here is general feedback and suggestions you should take care of:
suggestion:{block_verification_feedback}
**VERY IMPORTANT: Here is the specific review feedback for this sprite. You MUST address these points in your block generation:**
```
{sprite_review_feedback_str}
```
Current Scratch project JSON for this sprite (for context, its existing blocks if any. Note: If this is a hat block, you should be generating a new script entirely, otherwise, integrate or modify existing blocks as per the action plan and review feedback):
```json
{json.dumps(existing_sprite_blocks, indent=2)}
```
**Instructions for generating the block JSON (EXTREMELY IMPORTANT - FOLLOW THESE EXAMPLES PRECISELY):**
1. **Start with the event block (Hat Block):** This block's `topLevel` should be `true` and `parent` should be `null`. Its `x` and `y` coordinates should be set (e.g., `x: 0, y: 0` or reasonable offsets for multiple scripts).
2. **Generate a sequence of connected blocks:** For each block, generate a **unique block ID** (e.g., 'myBlockID123').
3. **Link blocks correctly:**
* Use the `next` field to point to the ID of the block directly below it in the stack.
* Use the `parent` field to point to the ID of the block directly above it in the stack.
* For the last block in a stack, `next` should be `null`.
4. **Handle `inputs` for parameters and substacks (CRITICAL DETAIL - PAY CLOSE ATTENTION TO EXAMPLES):**
* The value for any key within the `inputs` dictionary MUST always be an **array** of two elements: `[type_code, value_or_block_id]`.
* **STRICTLY FORBIDDEN MISTAKE:** DO NOT put an array like `["num", "value"]` or `["_edge_", null]` directly as the `value_or_block_id`. This is the source of past errors.
* The `type_code` (first element of the array) indicates the nature of the input:
* `1`: For a primitive value or a shadow block ID (e.g., a number, string, boolean, or reference to a shadow block). This is the most common type for direct values.
* `2`: For a block ID where another block is directly plugged into this input (e.g., an operator block connected to an `if` condition, or the first block of a C-block's substack).
* **Correct Example for Numerical Input (e.g., for `motion_movesteps` STEPS, or `motion_gotoxy` X/Y):**
If you need to input the number `10` into a block, you MUST create a separate `math_number` shadow block for it, and then reference its ID.
```json
// Main block using the number
"mainBlockID": {{
"opcode": "motion_movesteps",
"inputs": {{
"STEPS": [1, "shadowNumID"]
}},
// ... other fields
}},
// The separate shadow block for the number '10'
"shadowNumID": {{
"opcode": "math_number",
"fields": {{
"NUM": ["10", null]
}},
"shadow": true,
"parent": "mainBlockID",
"topLevel": false
}}
```
* **Correct Example for Dropdown/Menu Input (e.g., `sensing_touchingobject` with 'edge'):**
If you need to select 'edge' for the `touching ()?` block, you MUST create a separate `sensing_touchingobjectmenu` shadow block and reference its ID.
```json
// Main block using the dropdown selection
"touchingBlockID": {{
"opcode": "sensing_touchingobject",
"inputs": {{
"TOUCHINGOBJECTMENU": [1, "shadowEdgeMenuID"]
}},
// ... other fields
}},
// The separate shadow block for the 'edge' menu option
"shadowEdgeMenuID": {{
"opcode": "sensing_touchingobjectmenu",
"fields": {{
"TOUCHINGOBJECTMENU": ["_edge_", null]
}},
"shadow": true,
"parent": "touchingBlockID",
"topLevel": false
}}
```
* **Correct Example for C-block (e.g., `control_forever`):**
The `control_forever` block MUST have a `SUBSTACK` input pointing to the first block inside its loop. The value for `SUBSTACK` must be an array: `[2, "FIRST_BLOCK_IN_FOREVER_LOOP_ID"]`.
```json
"foreverBlockID": {{
"opcode": "control_forever",
"inputs": {{
"SUBSTACK": [2, "firstBlockInsideForeverID"]
}},
"next": null,
"parent": "blockAboveForeverID",
"shadow": false,
"topLevel": false
}},
"firstBlockInsideForeverID": {{
// ... definition of the first block inside the forever loop
"parent": "foreverBlockID",
"next": "secondBlockInsideForeverID" // if there's another block
}}
```
* **Correct Example for C-block with condition (e.g., `control_if`):**
The `control_if` block MUST have a `CONDITION` input (typically `type_code: 1` referencing a boolean reporter block) and a `SUBSTACK` input (`type_code: 2` referencing the first block inside the if-body).
```json
"ifBlockID": {{
"opcode": "control_if",
"inputs": {{
"CONDITION": [1, "conditionBlockID"],
"SUBSTACK": [2, "firstBlockInsideIfID"]
}},
"next": "blockAfterIfID",
"parent": "blockAboveIfID",
"shadow": false,
"topLevel": false
}},
"conditionBlockID": {{
"opcode": "sensing_touchingobject", // Example condition block
// ... definition for condition block, parent should be "ifBlockID"
}},
"firstBlockInsideIfID": {{
// ... definition of the first block inside the if body
"parent": "ifBlockID",
"next": null // or next block if more
}}
```
* **Correct Example for Sensing Blocks (`sensing_touchingobject`, `sensing_touchingcolor`, etc.):**
For blocks like `sensing_touchingobject`, the input is typically `"TOUCHINGOBJECTMENU"`. Its value must be `[1, "ID_OF_SENSING_MENU_SHADOW_BLOCK"]`.
The corresponding shadow block (e.g., `sensing_touchingobjectmenu`) has a `fields` dictionary with a key matching the input name (e.g., `"TOUCHINGOBJECTMENU"`). The value for this field must be an array `["_mouse_", null]` or `["_edge_", null]` for specific options, or `["SpriteName", "SPRITE_ID_FROM_TARGETS"]` for other sprites.
Ensure the string value in the `fields` array for these menus matches the exact Scratch internal string for the option (e.g., `"_edge_"` not `"edge"` unless specifically allowed). Refer to `ALL_SCRATCH_BLOCKS_CATALOG` for exact field values.
Example for touching 'edge':
```json
"myTouchingBlockID": {{
"opcode": "sensing_touchingobject",
"inputs": {{
"TOUCHINGOBJECTMENU": [1, "myTouchingObjectMenuShadowID"]
}},
"next": null,
"parent": "someParentBlockID",
"shadow": false,
"topLevel": false
}},
"myTouchingObjectMenuShadowID": {{
"opcode": "sensing_touchingobjectmenu",
"fields": {{
"TOUCHINGOBJECTMENU": ["_edge_", null]
}},
"shadow": true,
"parent": "myTouchingBlockID",
"topLevel": false
}}
```
* **Correct Example for Clone Blocks (`control_create_clone_of`, `control_when_start_as_clone`, `control_delete_this_clone`):**
`control_create_clone_of` has an input named `"CLONE_TARGET"`. Its value must be `[1, "ID_OF_CLONE_TARGET_MENU_SHADOW_BLOCK"]`.
The corresponding shadow block (`control_create_clone_of_menu`) has a `fields` dictionary with a key named `"CLONE_TARGET"`. The value for `"CLONE_TARGET"` must be an array `["_myself_", null]` or `["SpriteName", "SPRITE_ID_FROM_TARGETS"]`.
`control_when_start_as_clone` is a hat block and thus `topLevel: true`, `parent: null`, and `next: null`.
`control_delete_this_clone` is a stack block and will have `next` and `parent` connections.
Example for creating a clone of 'myself':
```json
"myCloneBlockID": {{
"opcode": "control_create_clone_of",
"inputs": {{
"CLONE_TARGET": [1, "myCloneTargetMenuShadowID"]
}},
"next": null,
"parent": "someParentBlockID",
"shadow": false,
"topLevel": false
}},
"myCloneTargetMenuShadowID": {{
"opcode": "control_create_clone_of_menu",
"fields": {{
"CLONE_TARGET": ["_myself_", null]
}},
"shadow": true,
"parent": "myCloneBlockID",
"topLevel": false
}}
```
5. **Define ALL Shadow Blocks Separately (THIS IS ESSENTIAL):** Every time a block's input requires a number, string literal, or a selection from a dropdown/menu, you MUST define a **separate block entry** in the top-level blocks dictionary for that shadow. Each shadow block MUST have `"shadow": true` and `"topLevel": false`.
6. **`fields` for direct dropdown values/text:** Use the `fields` dictionary ONLY IF the block directly embeds a dropdown value or text field without an `inputs` connection (e.g., the `KEY_OPTION` field within the `event_whenkeypressed` shadow block itself, or the `variable` field on a `data_setvariableto` block). Example: `"fields": {{"KEY_OPTION": ["space", null]}}`.
7. **`topLevel: true` for hat blocks:** Only the starting (hat) block of a script should have `"topLevel": true`.
8. **Ensure Unique Block IDs:** Every block you generate (main blocks and shadow blocks) must have a unique ID within the entire script's block dictionary.
9. **Strictly Use Catalog Opcodes:** You MUST only use `opcode` values that are present in the provided `ALL_SCRATCH_BLOCKS_CATALOG`. Do NOT use unlisted opcodes like `motion_jump`.
10. **Return ONLY the JSON object representing all the blocks for THIS SINGLE SCRIPT.** Do NOT wrap it in a 'blocks' key or the full project JSON. The output should be a dictionary where keys are block IDs and values are block definitions. You need to ensure that the generated blocks address the provided `sprite_review_feedback_str` and integrate with or replace existing blocks as necessary to fulfill the `logic_sequence` for the `event_opcode`.
11. **Additional Reminders for Review Feedback:**
* Pay close attention to issues like "Hat block ... have a 'next' connection" – hat blocks *must* have `next: null`.
* Address "Blocks ... have reporter/boolean parents but are not linked via an input" by ensuring `inputs` dictionary correctly links to child blocks.
* For "Menu shadow blocks ... have malformed fields," verify the exact structure of the `fields` array for that specific opcode (e.g., `["VALUE", null]` or `["VALUE", "ID"]`).
* For "Block ... references non-existent variable 'lives'," ensure that any variable or list referenced by a block (e.g., `data_variable`, `data_setvariableto`) exists in the sprite's `variables` or `lists` section of `project_json`. If it doesn't, the LLM should generate the necessary variable/list definition at the sprite level (though this current node focuses on blocks, the LLM should ideally be aware of project-level data).
"""
try:
response = agent.invoke({"messages": [{"role": "user", "content": llm_block_generation_prompt}]})
raw_response = response["messages"][-1].content#strip_noise(response["messages"][-1].content)
logger.info(f"Raw response from LLM [ImprovementBlockBuilderNode - {sprite_name} - {event_opcode}]: {raw_response[:500]}...")
try:
generated_blocks = extract_json_from_llm_response(raw_response)
except json.JSONDecodeError as error_json:
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.")
# Use the JSON resolver agent to fix the response
correction_prompt = (
"Your primary goal is to generate JSON that precisely defines Scratch 3.0 blocks. "
"The output MUST be a single, valid JSON object, enclosed within a '```json' markdown block.\n"
"**STRICT JSON SYNTAX RULES:**\n"
"1. **NO EXTRA TEXT, COMMENTS, OR CONVERSATIONAL FILLER.** Only the JSON.\n"
"2. All keys MUST be enclosed in double quotes.\n"
"3. String values MUST be enclosed in double quotes. Internal double quotes must be escaped (e.g., `\\`). Newlines (`\\n`) and carriage returns (`\\r`) in strings must be escaped.\n"
"4. No trailing commas.\n"
"5. Correct nesting of braces `{}` and brackets `[]`.\n"
"6. **CRITICAL: Understand and strictly follow the Scratch block input array format.**\n"
" - When an input refers to another top-level block (like a variable block or a number block that is defined separately):\n"
" `\"INPUT_NAME\": [1, \"ID_OF_REFERENCED_BLOCK\"]`\n"
" - When an input *directly embeds* a literal value (like a number, string, or boolean shadow block):\n"
" `\"INPUT_NAME\": [1, [\"math_number\", \"10\"]]` (for numbers)\n"
" `\"INPUT_NAME\": [1, [\"text\", \"hello\"]]` (for strings)\n"
" `\"INPUT_NAME\": [1, [\"boolean\", true]]` (for booleans)\n"
" - **Do NOT** generate patterns like `some_id[\"value\", null]]` or `\"\"\": [1, some_id[\"\", null]]`.\n"
" - Ensure `\"/X\"` and `\"Y\"` inputs for `motion_gotoxy` and `motion_glidesecstoxy` correctly reference a block ID or directly embed a number block.\n"
"7. The 'logic' field should contain a plain string describing the action, without any embedded JSON or Scratch block syntax (like `\"forever\":\"` which breaks the string).\n\n"
"Here is the problematic JSON string that needs correction, including the error details:\n"
f"- **Error Details**: {error_json}\n" # Keep the error details for context
"```json\n"
f"{raw_response}\n"
"```\n"
"Your corrected, valid JSON response:\n"
"```json\n"
)
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]})
print(f"[JSON CORRECTOR RESPONSE AT IMPROVEMENTBLOCKBUILDER ]: {correction_response['messages'][-1].content}")
generated_blocks = extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content))
if "blocks" in generated_blocks and isinstance(generated_blocks["blocks"], dict):
logger.warning(f"LLM returned nested 'blocks' key for {sprite_name}. Unwrapping.")
generated_blocks = generated_blocks["blocks"]
# Update block positions for top-level script
for block_id, block_data in generated_blocks.items():
if block_data.get("topLevel"):
block_data["x"] = script_x_offset_per_sprite.get(sprite_name, 0)
block_data["y"] = script_y_offset[sprite_name]
script_y_offset[sprite_name] += 150 # Increment for next script
# Merge newly generated blocks with existing ones.
# This is a crucial step for improvement. New blocks will overwrite existing ones with the same ID.
# If the intention is to completely replace an existing script, the LLM should generate the full new script.
# If it's to add/modify, the LLM should be aware of existing IDs to avoid conflicts or use them for parent/next links.
current_sprite_target["blocks"].update(generated_blocks)
logger.info(f"Improvement blocks added/updated for sprite '{sprite_name}', script '{event_opcode}' by ImprovementBlockBuilderNode.")
except Exception as e:
logger.error(f"Error generating blocks for sprite '{sprite_name}', script '{event_opcode}': {e}")
raise
state["project_json"] = project_json # Update the state with the modified project_json
logger.info("Updated project JSON with improvement nodes.")
print("Updated project JSON with improvement nodes:", json.dumps(project_json, indent=2)) # Print for direct visibility
#with open("debug_state.json", "w", encoding="utf-8") as f:
# json.dump(state, f, indent=2, ensure_ascii=False)
return state
#temporarry time delay for handling TPM issue
import time
def delay_for_tpm_node(state: GameState):
logger.info("--- Running DelayForTPMNode ---")
time.sleep(80) # Adjust the delay as needed
logger.info("Delay completed.")
return state
# Build the LangGraph workflow
workflow = StateGraph(GameState)
# Add all nodes to the workflow
workflow.add_node("parse_query", parse_query_and_set_initial_positions)
workflow.add_node("game_description", game_description_node)
workflow.add_node("declarations_plan", declaration_planner_node)
workflow.add_node("add_declaration", declaration_builder_node)
workflow.add_node("time_delay_1", delay_for_tpm_node) # this is a temporary node to handle TPM issues
workflow.add_node("time_delay_2", delay_for_tpm_node) # this is a temporary node to handle TPM issues
workflow.add_node("time_delay_3", delay_for_tpm_node) # this is a temporary node to handle TPM issues
workflow.add_node("time_delay_4", delay_for_tpm_node) # this is a temporary node to handle TPM issues
workflow.add_node("time_delay_5", delay_for_tpm_node) # this is a temporary node to handle TPM issues
workflow.add_node("time_delay_6", delay_for_tpm_node) # this is a temporary node to handle TPM issues
#workflow.add_node("time_delay_7", delay_for_tpm_node) # this is a temporary node to handle TPM issues
workflow.add_node("initial_plan_build", overall_planner_node) # High-level planning node
workflow.add_node("logic_alignment", plan_logic_aligner_node)
workflow.add_node("plan_verifier", plan_verification_node)
workflow.add_node("refined_planner", refined_planner_node) # Refines the action plan
#workflow.add_node("opcode_counter", plan_opcode_counter_node)
workflow.add_node("relation_builder", block_relationship_node)
# Verifies the high-level plan
workflow.add_node("block_builder", overall_block_builder_node) # Builds blocks from a plan
# workflow.add_node("block_verifier", block_verification_node) # Verifies the generated blocks
# workflow.add_node("improved_block_builder", improvement_block_builder_node) # For specific block-level improvements
# Set the entry point
workflow.set_entry_point("game_description")
# Define the standard initial flow
workflow.add_edge("game_description", "parse_query")
workflow.add_edge("parse_query", "time_delay_1")
workflow.add_edge("time_delay_1", "declarations_plan")
workflow.add_edge("declarations_plan", "add_declaration")
workflow.add_edge("add_declaration", "time_delay_2")
workflow.add_edge("time_delay_2", "initial_plan_build")
workflow.add_edge("initial_plan_build", "time_delay_3")
workflow.add_edge("time_delay_3", "logic_alignment")
workflow.add_edge("logic_alignment", "time_delay_4")
workflow.add_edge("time_delay_4", "plan_verifier")
# Define the conditional logic after plan_verifier (for high-level plan issues)
def decide_next_step_after_plan_verification(state: GameState):
if state.get("needs_improvement", False):
# If the plan needs refinement, go to the refined_planner
return "refined_planner"
else:
# If the plan is good, proceed to building blocks from this plan
return "relation_builder"#"block_builder"
workflow.add_conditional_edges(
"plan_verifier",
decide_next_step_after_plan_verification,
{
"refined_planner": "refined_planner", # Path if plan needs refinement
"relation_builder": "relation_builder"#"block_builder": "block_builder" # Path if plan is approved, proceeds to block building
}
)
# --- CRITICAL CHANGE FOR THE PLAN REFINEMENT LOOP ---
# After refining the plan, it should go back to plan_verifier for re-verification.
workflow.add_edge("refined_planner", "time_delay_5")
workflow.add_edge("time_delay_5", "plan_verifier") # This closes the loop for plan refinement and re-verification.
workflow.add_edge("relation_builder", "time_delay_6")
workflow.add_edge("time_delay_6", "block_builder")
workflow.add_edge("block_builder", END)
# Note: The original code had workflow.add_edge("time_delay", "block_builder") here,
# but after refined_planner -> time_delay -> plan_verifier, the decision is made by plan_verifier.
# So, this edge might be redundant or incorrect depending on the desired flow.
# Assuming the intent is for plan_verifier to always decide the next step.
# After blocks are built, they need to be verified
#workflow.add_edge("time_delay_3", "block_builder")
# workflow.add_edge("block_builder", "time_delay_5")
# workflow.add_edge("time_delay_5", "block_verifier")
# # Define the conditional logic after block_verifier (for generated blocks issues)
# def decide_after_block_verification(state: GameState):
# if state.get("needs_improvement", False):
# # If blocks need improvement, go to improved_block_builder.
# # This assumes improved_block_builder handles specific block-level fixes.
# return "improved_block_builder"
# else:
# # If blocks are good, end the workflow
# return "END"
# workflow.add_conditional_edges(
# "block_verifier",
# decide_after_block_verification,
# {
# "improved_block_builder": "improved_block_builder", # Path if blocks need improvement
# "END": END # Path if blocks are good
# }
# )
# # Create the loop: If blocks improved, re-verify them
# #workflow.add_edge("time_delay_4", "improved_block_builder")
# workflow.add_edge("improved_block_builder", "time_delay_6")
# workflow.add_edge("time_delay_6", "block_verifier")
# Compile the workflow graph
app_graph = workflow.compile()
# # Build the LangGraph workflow
# workflow = StateGraph(GameState)
# # Add all nodes to the workflow
# workflow.add_node("parse_query", parse_query_and_set_initial_positions)
# workflow.add_node("game_description", game_description_node)
# workflow.add_node("declarations_plan", declaration_planner_node)
# workflow.add_node("add_declaration", declaration_builder_node)
# workflow.add_node("initial_plan_build", overall_planner_node) # High-level planning node
# workflow.add_node("plan_verifier", plan_verification_node) # Verifies the high-level plan
# workflow.add_node("refined_planner", refined_planner_node) # Refines the action plan
# workflow.add_node("block_builder", overall_block_builder_node) # Builds blocks from a plan
# workflow.add_node("block_verifier", block_verification_node) # Verifies the generated blocks
# workflow.add_node("improved_block_builder", improvement_block_builder_node) # For specific block-level improvements
# # Set the entry point
# workflow.set_entry_point("game_description")
# # Define the standard initial flow
# workflow.add_edge("game_description", "parse_query")
# workflow.add_edge("parse_query", "declarations_plan")
# workflow.add_edge("declarations_plan", "add_declaration")
# workflow.add_edge("add_declaration", "initial_plan_build")
# workflow.add_edge("initial_plan_build", "plan_verifier")
# # Define the conditional logic after plan_verifier (for high-level plan issues)
# def decide_next_step_after_plan_verification(state: GameState):
# if state.get("needs_improvement", False):
# # If the plan needs refinement, go to the refined_planner
# return "refined_planner"
# else:
# # If the plan is good, proceed to building blocks from this plan
# return "block_builder"
# workflow.add_conditional_edges(
# "plan_verifier",
# decide_next_step_after_plan_verification,
# {
# "refined_planner": "refined_planner", # Path if plan needs refinement
# "block_builder": "block_builder" # Path if plan is approved, proceeds to block building
# }
# )
# # --- CRITICAL CHANGE FOR THE PLAN REFINEMENT LOOP ---
# # After refining the plan, it should go back to plan_verifier for re-verification.
# workflow.add_edge("refined_planner", "plan_verifier") # This closes the loop for plan refinement and re-verification.
# # After blocks are built, they need to be verified
# workflow.add_edge("block_builder", "block_verifier")
# # Define the conditional logic after block_verifier (for generated blocks issues)
# def decide_after_block_verification(state: GameState):
# if state.get("needs_improvement", False):
# # If blocks need improvement, go to improved_block_builder.
# # This assumes improved_block_builder handles specific block-level fixes.
# return "improved_block_builder"
# else:
# # If blocks are good, end the workflow
# return "END"
# workflow.add_conditional_edges(
# "block_verifier",
# decide_after_block_verification,
# {
# "improved_block_builder": "improved_block_builder", # Path if blocks need improvement
# "END": END # Path if blocks are good
# }
# )
# # Create the loop: If blocks improved, re-verify them
# workflow.add_edge("improved_block_builder", "block_verifier")
# # Compile the workflow graph
# app_graph = workflow.compile()
# --- Serve the form ---
@app.route("/", methods=["GET"])
def index():
return render_template("index4.html")
# --- List static assets for the front-end ---
@app.route("/list_assets", methods=["GET"])
def list_assets():
bdir = os.path.join(app.static_folder, "assets", "backdrops")
sdir = os.path.join(app.static_folder, "assets", "sprites")
sound_dir = os.path.join(app.static_folder, "assets", "sounds") # New sound directory
backdrops = []
sprites = []
sounds = [] # List to store sound files
try:
if os.path.isdir(bdir):
backdrops = [f for f in os.listdir(bdir) if f.lower().endswith((".svg", ".png", ".jpg", ".jpeg"))] # Include common image formats
if os.path.isdir(sdir):
sprites = [f for f in os.listdir(sdir) if f.lower().endswith((".svg", ".png", ".jpg", ".jpeg"))] # Include common image formats
if os.path.isdir(sound_dir): # Check and list sound files
sounds = [f for f in os.listdir(sound_dir) if f.lower().endswith((".wav", ".mp3", ".aiff"))] # Add more formats as needed
logger.info("Successfully listed static assets.")
except Exception as e:
logger.error(f"Error listing static assets: {e}")
return jsonify({"error": "Failed to list assets"}), 500
return jsonify(backdrops=backdrops, sprites=sprites, sounds=sounds) # Return sounds as well
# --- Helper: build costume entries ---
def make_costume_entry(folder, filename, name):
asset_id = filename.rsplit(".", 1)[0]
entry = {
"name": name,
"bitmapResolution": 1,
"dataFormat": filename.rsplit(".", 1)[1],
"assetId": asset_id,
"md5ext": filename,
}
path = os.path.join(app.static_folder, "assets", folder, filename)
# Check for image file extensions
if filename.lower().endswith((".svg", ".png", ".jpg", ".jpeg")):
if filename.lower().endswith(".svg"): # SVG files
# For SVGs, default Scratch values are typically 240x180 for stage, others 0,0
if folder == "backdrops":
entry["rotationCenterX"] = 240
entry["rotationCenterY"] = 180
else:
entry["rotationCenterX"] = 0
entry["rotationCenterY"] = 0
else: # Raster image files
try:
img = Image.open(path)
w, h = img.size
entry["rotationCenterX"] = w // 2
entry["rotationCenterY"] = h // 2
except Exception as e:
logger.warning(f"Could not determine image dimensions for {filename}: {e}. Setting center to 0,0.")
entry["rotationCenterX"] = 0
entry["rotationCenterY"] = 0
else:
logger.warning(f"Unknown asset type for {filename}. Setting center to 0,0.")
entry["rotationCenterX"] = 0
entry["rotationCenterY"] = 0 # Default if not an image
return entry
# --- Helper: build sound entries ---
def make_sound_entry(filename, name):
asset_id = filename.rsplit(".", 1)[0]
entry = {
"name": name,
"dataFormat": filename.rsplit(".", 1)[1],
"rate": 44100, # Common sample rate for sounds
"sampleCount": 0, # This would typically be calculated from the audio file
"assetId": asset_id,
"md5ext": filename,
}
# For a real implementation, you might want to use a library like pydub to get
# the sampleCount and duration if needed for more complex sound handling.
return entry
# --- New function to create .sb3 archive ---
def create_sb3_archive(project_folder, project_id):
"""
Zips the project folder and renames it to an .sb3 file.
Args:
project_folder (str): The path to the directory containing the project.json and assets.
project_id (str): The unique ID for the project, used for naming the .sb3 file.
Returns:
str: The path to the created .sb3 file, or None if an error occurred.
"""
#output_filename = os.path.join("generated_projects", project_id)
output_filename = GEN_PROJECT_DIR / project_id
zip_path = None
sb3_path = None
try:
# 1. Zip the folder, which will create a .zip file
# shutil.make_archive creates a .zip, .tar, etc. archive.
# The base_name is the path and name of the archive to create (without extension).
# The format is 'zip', 'tar', 'gztar', 'bztar', or 'xztar'.
# The root_dir is the directory to start archiving from.
# The base_dir is the directory whose contents are archived.
zip_path = shutil.make_archive(output_filename, 'zip', root_dir=project_folder)
logger.info(f"Project folder zipped to: {zip_path}")
# 2. Rename the .zip file to .sb3
sb3_path = f"{output_filename}.sb3"
os.rename(zip_path, sb3_path)
logger.info(f"Renamed {zip_path} to {sb3_path}")
# Optionally, remove the original project folder after creating the .sb3
# Be cautious with this, as you might want to keep the unzipped contents for debugging or other purposes.
# shutil.rmtree(project_folder)
# logger.info(f"Removed original project folder: {project_folder}")
return sb3_path
except Exception as e:
logger.error(f"Error creating SB3 archive for {project_id}: {e}")
# Clean up any partial files if an error occurs
if zip_path and os.path.exists(zip_path):
os.remove(zip_path)
if sb3_path and os.path.exists(sb3_path):
os.remove(sb3_path)
return None
# --- New endpoint to fetch project.json ---
@app.route("/get_project/<project_id>", methods=["GET"])
def get_project(project_id):
#project_folder = os.path.join("generated_projects", project_id)
project_folder = GEN_PROJECT_DIR / project_id
project_folder.mkdir(parents=True, exist_ok=True)
#project_folder = send_from_directory(directory="generated_projects", project_id)
project_json_path = os.path.join(project_folder, "project.json")
try:
if os.path.exists(project_json_path):
logger.info(f"Serving project.json for project ID: {project_id}")
return send_from_directory(project_folder, "project.json", as_attachment=True, download_name=f"{project_id}.json")
else:
logger.warning(f"Project JSON not found for ID: {project_id}")
return jsonify({"error": "Project not found"}), 404
except Exception as e:
logger.error(f"Error serving project.json for ID {project_id}: {e}")
return jsonify({"error": "Failed to retrieve project"}), 500
# --- New endpoint to fetch assets ---
@app.route("/get_asset/<project_id>/<filename>", methods=["GET"])
def get_asset(project_id, filename):
#project_folder = os.path.join("generated_projects", project_id)
project_folder = GEN_PROJECT_DIR / project_id
project_folder.mkdir(parents=True, exist_ok=True)
#project_folder = send_from_directory(directory="generated_projects", project_id)
asset_path = os.path.join(project_folder, filename)
try:
if os.path.exists(asset_path):
logger.info(f"Serving asset '{filename}' for project ID: {project_id}")
return send_from_directory(project_folder, filename)
else:
logger.warning(f"Asset '{filename}' not found for project ID: {project_id}")
return jsonify({"error": "Asset not found"}), 404
except Exception as e:
logger.error(f"Error serving asset '{filename}' for project ID {project_id}: {e}")
return jsonify({"error": "Failed to retrieve asset"}), 500
# This part is just for demonstration purposes to show app_graph workflow
# In a real application, app_graph would be properly initialized.
try:
png_bytes = app_graph.get_graph().draw_mermaid_png()
#with open("langgraph_workflow.png", "wb") as f:
# f.write(png_bytes)
except Exception as e:
logger.warning(f"Could not draw or save LangGraph workflow diagram: {e}. This might be expected if app_graph is a mock.")
# --- New endpoint to download the .sb3 file ---
@app.route("/download_sb3/<project_id>", methods=["GET"])
def download_sb3(project_id):
"""
Allows users to download the generated .sb3 Scratch project file.
"""
sb3_filename = f"{project_id}.sb3"
#sb3_filepath = os.path.join("generated_projects", sb3_filename)
sb3_filepath = GEN_PROJECT_DIR / sb3_filename
#sb3_filepath = send_from_directory(directory="generated_projects", sb3_filename)
try:
if os.path.exists(sb3_filepath):
logger.info(f"Serving SB3 file for project ID: {project_id}")
# send_from_directory serves the file and handles content-disposition for download
return send_from_directory(
directory="generated_projects",
path=sb3_filename,
as_attachment=True, # This makes the browser download the file
download_name=sb3_filename # This sets the filename for the download
)
else:
logger.warning(f"SB3 file not found for ID: {project_id}")
return jsonify({"error": "Scratch project file not found"}), 404
except Exception as e:
logger.error(f"Error serving SB3 file for ID {project_id}: {e}")
return jsonify({"error": "Failed to retrieve Scratch project file"}), 500
# --- Modified `generate_game` Endpoint ---
@app.route("/generate_game", methods=["POST"])
def generate_game():
payload = request.json
logger.info(f"Received payload: {json.dumps(payload, indent=2)}")
desc = payload.get("description", "")
backdrops = payload.get("backdrops", [])
sprites = payload.get("sprites", [])
backdrop_sounds = payload.get("backdrop_sounds", {})
sprite_sounds = payload.get("sprite_sounds", {})
logger.info(f"Backdrops received: {backdrops}")
logger.info(f"Sprites received: {sprites}")
logger.info(f"Backdrop sounds received: {backdrop_sounds}")
logger.info(f"Sprite sounds received: {sprite_sounds}")
logger.info(f"Starting game generation for description: '{desc}'")
# 1) Initial skeleton generation
project_skeleton = {
"targets": [
{
"isStage": True,
"name":"Stage",
"objName": "Stage",
"variables":{},
"lists":{}, "broadcasts":{},
"blocks":{}, "comments":{},
"currentCostume": len(backdrops)-1 if backdrops else 0,
"costumes": [make_costume_entry("backdrops",b["filename"],b["name"])
for b in backdrops],
"sounds": [], # Initialize an empty list to be populated with all unique backdrop sounds
"volume":100,"layerOrder":0,
"tempo":60,"videoTransparency":50,"videoState":"on",
"textToSpeechLanguage": None
}
],
"monitors": [], "extensions": [], "meta":{
"semver":"3.0.0","vm":"11.1.0",
"agent": request.headers.get("User-Agent","")
}
}
# Populate sounds for the Stage target by consolidating sounds from all backdrops
stage_target_sounds = []
seen_sound_md5ext = set() # To track unique sounds by their md5ext (filename)
for b_data in backdrops:
backdrop_name = b_data["name"]
if backdrop_name in backdrop_sounds:
for s in backdrop_sounds[backdrop_name]:
sound_entry = make_sound_entry(s["filename"], s["name"])
if sound_entry["md5ext"] not in seen_sound_md5ext:
stage_target_sounds.append(sound_entry)
seen_sound_md5ext.add(sound_entry["md5ext"])
project_skeleton["targets"][0]["sounds"] = stage_target_sounds # Assign the collected unique sounds
for idx, s_data in enumerate(sprites, start=1):
costume = make_costume_entry("sprites", s_data["filename"], s_data["name"])
sprite_name = s_data["name"]
# Get sounds for the current sprite
current_sprite_sounds = sprite_sounds.get(sprite_name, [])
sprite_sounds_list = [make_sound_entry(s["filename"], s["name"]) for s in current_sprite_sounds]
project_skeleton["targets"].append({
"isStage": False,
"name": sprite_name,
"objName": sprite_name,
"variables":{}, "lists":{}, "broadcasts":{},
"blocks":{},
"comments":{},
"currentCostume":0,
"costumes":[costume],
"sounds": sprite_sounds_list, # Add sprite sounds
"volume":100,
"layerOrder": idx+1,
"visible":True, "x":0,"y":0,"size":100,"direction":90,
"draggable":False, "rotationStyle":"all around"
})
logger.info("Initial project skeleton created with sounds for stage and sprites.")
project_id = str(uuid.uuid4())
#project_folder = os.path.join("generated_projects", project_id)
#project_folder = send_from_directory(directory="generated_projects", project_id)
project_folder = GEN_PROJECT_DIR / project_id
project_folder.mkdir(parents=True, exist_ok=True)
try:
os.makedirs(project_folder, exist_ok=True)
# Save initial skeleton and copy assets
project_json_path = os.path.join(project_folder, "project.json")
with open(project_json_path, "w") as f:
json.dump(project_skeleton, f, indent=2)
logger.info(f"Initial project skeleton saved to {project_json_path}")
# Copy backdrops
for b in backdrops:
src_path = os.path.join(app.static_folder, "assets", "backdrops", b["filename"])
dst_path = os.path.join(project_folder, b["filename"])
if os.path.exists(src_path):
shutil.copy(src_path, dst_path)
else:
logger.warning(f"Source backdrop asset not found: {src_path}")
# Copy sprites
for s in sprites:
src_path = os.path.join(app.static_folder, "assets", "sprites", s["filename"])
dst_path = os.path.join(project_folder, s["filename"])
if os.path.exists(src_path):
shutil.copy(src_path, dst_path)
else:
logger.warning(f"Source sprite asset not found: {src_path}")
# Copy backdrop sounds
for backdrop_name, sounds_list in backdrop_sounds.items():
for s in sounds_list:
src_path = os.path.join(app.static_folder, "assets", "sounds", s["filename"])
dst_path = os.path.join(project_folder, s["filename"])
if os.path.exists(src_path):
shutil.copy(src_path, dst_path)
else:
logger.warning(f"Source sound asset for backdrop '{backdrop_name}' not found: {src_path}")
# Copy sprite sounds
for sprite_name, sounds_list in sprite_sounds.items():
for s in sounds_list:
src_path = os.path.join(app.static_folder, "assets", "sounds", s["filename"])
dst_path = os.path.join(project_folder, s["filename"])
if os.path.exists(src_path):
shutil.copy(src_path, dst_path)
else:
logger.warning(f"Source sound asset for sprite '{sprite_name}' not found: {src_path}")
logger.info("Assets (including sounds for backdrops and sprites) copied to project folder.")
# Initialize the state for LangGraph as a dictionary matching the TypedDict structure
initial_state_dict = {
"project_json": project_skeleton,
"description": desc,
"project_id": project_id,
"sprite_initial_positions": {},
"action_plan": {},
"improvement_plan": {},
"needs_improvement": False,
"plan_validation_feedback": {},
"iteration_count": 0,
"review_block_feedback": {},
"declaration_plan": {},
"temporary_node": {},
}
final_state_dict = app_graph.invoke(initial_state_dict) # Pass dictionary
final_project_json = final_state_dict['project_json'] # Access as dict
# Save the *final* filled project JSON, overwriting the skeleton
with open(project_json_path, "w") as f:
json.dump(final_project_json, f, indent=2)
logger.info(f"Final project JSON saved to {project_json_path}")
# --- Call the new function to create the .sb3 file ---
sb3_file_path = create_sb3_archive(project_folder, project_id)
if sb3_file_path:
logger.info(f"Successfully created SB3 file: {sb3_file_path}")
# Instead of returning the local path, return a URL to the download endpoint
download_url = f"/download_sb3/{project_id}"
return jsonify({"message": "Game generated successfully", "project_id": project_id, "download_url": download_url})
else:
return jsonify(error="Failed to create SB3 archive"), 500
except Exception as e:
logger.error(f"Error during game generation workflow for project ID {project_id}: {e}", exc_info=True)
return jsonify(error=f"Error generating game: {e}"), 500
if __name__=="__main__":
logger.info("Starting Flask application...")
# Create the 'generated_projects' and 'static/assets' directories if they don't exist
#os.makedirs("generated_projects", exist_ok=True)
#os.makedirs("static/assets/backdrops", exist_ok=True)
#os.makedirs("static/assets/sprites", exist_ok=True)
#os.makedirs("static/assets/sounds", exist_ok=True)
for d in (GEN_PROJECT_DIR, BACKDROP_DIR, SPRITE_DIR, SOUND_DIR):
d.mkdir(parents=True, exist_ok=True)
app.run(debug=True,port=5000)