from flask import Flask, request, jsonify, render_template, send_from_directory from langgraph.prebuilt import create_react_agent from langchain_groq import ChatGroq #from langgraph.graph import draw from langchain.chat_models import ChatOpenAI from dotenv import load_dotenv from langchain_core.utils.utils import secret_from_env from langchain_openai import ChatOpenAI from pydantic import Field, SecretStr from PIL import Image import os, json, re import shutil import uuid from langgraph.graph import StateGraph, END import logging from typing import Dict, TypedDict, Optional, Any, List from pathlib import Path # --- Configure logging --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) app = Flask(__name__) # Configurations BLOCKS_FOLDER = "blocks/" GEN_PROJECT_FOLDER = "generated_projects/" STATIC_FOLDER = "static/" BASE_DIR = Path(__file__).parent BLOCKS_DIR = BASE_DIR / "blocks" STATIC_DIR = BASE_DIR / "static" GEN_PROJECT_DIR = BASE_DIR / "generated_projects" ASSET_DIR = STATIC_DIR / "assets" BACKDROP_DIR = ASSET_DIR / "backdrops" SPRITE_DIR = ASSET_DIR / "sprites" SOUND_DIR = ASSET_DIR / "sounds" # Create them if they're missing: for d in (BLOCKS_DIR, STATIC_DIR, GEN_PROJECT_DIR, ASSET_DIR, BACKDROP_DIR, SPRITE_DIR, SOUND_DIR): d.mkdir(parents=True, exist_ok=True) # --- LLM / Vision model setup --- #os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY_2", "default_key_or_placeholder") os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY", "default_key_or_placeholder") os.environ["OPENROUTER_API_KEY"] = os.getenv("OPENROUTER_API_KEY", "default_key_or_placeholder") os.environ["OPENROUTER_BASE_URL"] = os.getenv("OPENROUTER_BASE_URL", "default_key_or_placeholder") groq_key = os.environ["GROQ_API_KEY"] if not groq_key or groq_key == "default_key_or_placeholder": logger.critical("GROQ_API_KEY environment variable is not set or invalid. Please set it to proceed.") raise ValueError("GROQ_API_KEY environment variable is not set or invalid.") #Main LLM for the SCRATCH 3.0 Agent llm = ChatGroq( #model="deepseek-r1-distill-llama-70b", #model="llama-3.3-70b-versatile", #model="gemma2-9b-it", #model="compound-beta-mini", #model="meta-llama/llama-4-maverick-17b-128e-instruct", model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.0, ) # Json debugger [temporary] llm2 = ChatGroq( model="deepseek-r1-distill-llama-70b", #model="llama-3.3-70b-versatile", #model="gemma2-9b-it", #model="compound-beta-mini", #model="meta-llama/llama-4-maverick-17b-128e-instruct", #model="meta-llama/llama-4-scout-17b-16e-instruct", temperature=0.0, ) class ChatOpenRouter(ChatOpenAI): openai_api_key: Optional[SecretStr] = Field( alias="api_key", default_factory=secret_from_env("OPENROUTER_API_KEY", default=None), ) @property def lc_secrets(self) -> dict[str, str]: return {"openai_api_key": "OPENROUTER_API_KEY"} def __init__(self, openai_api_key: Optional[str] = None, **kwargs): openai_api_key = ( openai_api_key or os.environ.get("OPENROUTER_API_KEY") ) super().__init__( base_url="https://openrouter.ai/api/v1", openai_api_key=openai_api_key, **kwargs ) # llm = ChatOpenRouter( # #model_name="deepseek/deepseek-r1-0528:free", # #model_name="google/gemini-2.0-flash-exp:free", # #model_name="deepseek/deepseek-v3-base:free", # model_name="deepseek/deepseek-r1:free" # ) # Refined SYSTEM_PROMPT with more explicit Scratch JSON rules, especially for variables SYSTEM_PROMPT = """ You are an expert AI assistant named GameScratchAgent, specialized in generating and modifying Scratch-VM 3.x game project JSON. Your core task is to process game descriptions and existing Scratch JSON structures, then produce or update JSON segments accurately. You possess deep knowledge of Scratch 3.0 project schema, informed by comprehensive reference materials. When generating or modifying the `blocks` section, pay extremely close attention to the following: **Scratch Project JSON Schema Rules:** 1. **Target Structure (`project.json`'s `targets` array):** * Each object in the `targets` array represents a Stage or a Sprite. * `isStage`: A boolean indicating if the target is the Stage (`true`) or a Sprite (`false`). * `name`: The name of the Stage (e.g., `"Stage"`) or the Sprite (e.g., `"Cat"`). This property replaces `objName` found in older Scratch versions. * `variables` dictionary: This dictionary maps unique variable IDs to arrays `[variable_name, initial_value, isCloudVariable?]`. * `variable_name`: The user-defined name of the variable. * `initial_value`: The variable's initial value, which can be a number or a string. * `isCloudVariable?`: (Optional) A boolean indicating if it's a cloud variable (`true`) or a local variable (`false` or absent for regular variables). * Example: `"myVarId123": ["score", 0]`, `"cloudVarId456": ["☁ High Score", "54", true]` * `lists` dictionary: This dictionary maps unique list IDs to arrays `[list_name, [item1, item2, ...]]`. * Example: `"myListId789": ["my list", ["apple", "banana"]]` * `broadcasts` dictionary: This dictionary maps unique broadcast IDs to their names. * Example: `"myBroadcastId": "Game Over"` * `blocks` dictionary: This dictionary contains all the blocks belonging to this target. Keys are block IDs, values are block objects. 2. **Block Structure (within a `target`'s `blocks` dictionary):** * Every block object must have the following core properties: * [cite_start]`opcode`: A unique internal identifier for the block's specific functionality (e.g., `"motion_movesteps"`, `"event_whenflagclicked"`)[cite: 31, 18, 439, 452]. * `parent`: The ID of the block directly above it in the script stack (or `null` for a top-level block). * `next`: The ID of the block directly below it in the script stack (or `null` for the end of a stack). * `inputs`: An object defining values or blocks plugged into the block's input slots. Values are **arrays**. * `fields`: An object defining dropdown menu selections or direct internal values within the block. Values are **arrays**. * `shadow`: `true` if it's a shadow block (e.g., a default number input that can be replaced by another block), `false` otherwise. * `topLevel`: `true` if it's a hat block or a standalone block (not connected to a parent), `false` otherwise. 3. **`inputs` Property Details (for blocks plugged into input slots):** * **Direct Block Connection (Reporter/Boolean block plugged in):** * Format: `"": [1, ""]` * Example: `"CONDITION": [1, "someBooleanBlockId"]` (e.g., for an `if` block). * **Literal Value Input (Shadow block with a literal):** * Format: `"": [1, [, ""]]` * `type_code`: A numeric code representing the data type. Common codes include: `4` for number, `7` for string/text, `10` for string/message. * `value_string`: The literal value as a string. * Examples: * Number: `"STEPS": [1, [4, "10"]]` (for `move 10 steps` block). * String/Text: `"MESSAGE": [1, [7, "Hello"]]` (for `say Hello` block). * String/Message (common for text inputs): `"MESSAGE": [1, [10, "Hello!"]]` (for `say Hello! for 2 secs`). * **C-Block Substack (blocks within a loop or conditional):** * Format: `"": [2, ""]` * Common `SUBSTACK_NAME` values are `SUBSTACK` (for `if`, `forever`, `repeat`) and `SUBSTACK2` (for `else` in `if else`). * Example: `"SUBSTACK": [2, "firstBlockInLoopId"]` 4. **`fields` Property Details (for dropdowns or direct internal values):** * Used for dropdown menus, variable names, list names, or other static selections directly within the block. * Format: `"": ["", null]` * Examples: * Dropdown: `"KEY_OPTION": ["space", null]` (for `when space key pressed`). * Variable Name: `"VARIABLE": ["score", null]` (for `set score to 0`). * Direction (specific motion block): `"FORWARD_BACKWARD": ["forward", null]` (for `go forward layers`). 5. **Unique IDs:** * All block IDs, variable IDs, and list IDs must be unique strings (e.g., "myBlock123", "myVarId456", "myListId789"). Do NOT use placeholder strings like "block_id_here". 6. **No Nested `blocks` Dictionary:** * The `blocks` dictionary should only appear once per `target` (sprite/stage). Do NOT nest a `blocks` dictionary inside an individual block definition. Blocks that are part of a substack are linked via the `SUBSTACK` input. 7. **Asset Properties (for Costumes/Sounds):** * `assetId`, `md5ext`, `bitmapResolution`, `rotationCenterX`/`rotationCenterY` should be correctly associated with costume and sound objects within the `costumes` and `sounds` arrays. **General Principles and Important Considerations:** * **Backward Compatibility:** Adhere strictly to existing Scratch 3.0 opcodes and schema to ensure backward compatibility with older projects. [cite_start]Opcodes must remain consistent to prevent previously saved projects from failing to load or behaving unexpectedly[cite: 18, 19, 25, 65]. * **Forgiving Inputs:** Recognize that Scratch is designed to be "forgiving in its interpretation of inputs." [cite_start]The Scratch VM handles potentially "invalid" inputs gracefully (e.g., converting a number to a string if expected, returning default values like zero or empty strings, or performing no action) rather than crashing[cite: 20, 21, 22, 38, 39, 41]. This implies that precise type matching for inputs might be handled internally by Scratch, allowing for some flexibility in how values are provided, but the agent should aim for the most common and logical type. """ SYSTEM_PROMPT_JSON_CORRECTOR =""" You are an assistant that outputs JSON responses strictly following the given schema. If the JSON you produce has any formatting errors, missing required fields, or invalid structure, you must identify the problems and correct them. Always return only valid JSON that fully conforms to the schema below, enclosed in triple backticks (```), without any extra text or explanation. If you receive an invalid or incomplete JSON response, fix it by: - Adding any missing required fields with appropriate values. - Correcting syntax errors such as missing commas, brackets, or quotes. - Ensuring the JSON structure matches the schema exactly. Remember: Your output must be valid JSON only, ready to be parsed without errors. """ # Main agent of the system agent for Scratch 3.0 agent = create_react_agent( model=llm, tools=[], # No specific tools are defined here, but could be added later prompt=SYSTEM_PROMPT ) # debugger and resolver agent for Scratch 3.0 agent_json_resolver = create_react_agent( model=llm, tools=[], # No specific tools are defined here, but could be added later prompt=SYSTEM_PROMPT_JSON_CORRECTOR ) ### LangGraph Workflow Definition # Define a state for your graph using TypedDict class GameState(TypedDict): project_json: dict description: str project_id: str sprite_initial_positions: dict # Add this as well, as it's part of your state action_plan: Optional[Dict] #behavior_plan: Optional[Dict] improvement_plan: Optional[Dict] needs_improvement: bool plan_validation_feedback: Optional[Dict] iteration_count: int # Track the number of iterations for improvements review_block_feedback: Optional[Dict] # Feedback from the agent on the blocks after verification declaration_plan: Optional[Dict] temporary_node: Optional[Dict] # Helper function to update project JSON with sprite positions import copy def update_project_with_sprite_positions(project_json: dict, sprite_positions: dict) -> dict: """ Update the 'x' and 'y' coordinates of sprites in the Scratch project JSON. Args: project_json (dict): Original Scratch project JSON. sprite_positions (dict): Dict mapping sprite names to {'x': int, 'y': int}. Returns: dict: Updated project JSON with new sprite positions. """ updated_project = copy.deepcopy(project_json) for target in updated_project.get("targets", []): if not target.get("isStage", False): sprite_name = target.get("name") if sprite_name in sprite_positions: pos = sprite_positions[sprite_name] if "x" in pos and "y" in pos: target["x"] = pos["x"] target["y"] = pos["y"] return updated_project # Helper function to load the block catalog from a JSON file def _load_block_catalog(block_type: str) -> Dict: """ Loads the Scratch block catalog named '{block_type}_blocks.json' from the /blocks/ folder. Returns {} on any error. """ catalog_path = BLOCKS_DIR / f"{block_type}.json" try: text = catalog_path.read_text() # will raise FileNotFoundError if missing catalog = json.loads(text) # will raise JSONDecodeError if malformed logger.info(f"Successfully loaded block catalog from {catalog_path}") return catalog except FileNotFoundError: logger.error(f"Error: Block catalog file not found at {catalog_path}") except json.JSONDecodeError as e: logger.error(f"Error decoding JSON from {catalog_path}: {e}") except Exception as e: logger.error(f"Unexpected error loading {catalog_path}: {e}") # In all error cases, return an empty dict so the rest of your app can continue return {} # --- Global variable for the block catalog --- ALL_SCRATCH_BLOCKS_CATALOG = {} #BLOCK_CATALOG_PATH = r"blocks\blocks.json" # Define the path to your JSON file #HAT_BLOCKS_PATH = r"blocks\hat_blocks.json" # Path to the hat blocks JSON file #STACK_BLOCKS_PATH = r"blocks\stack_blocks.json" # Path to the stack blocks JSON file #REPORTER_BLOCKS_PATH = r"blocks\reporter_blocks.json" # Path to the reporter blocks JSON file #BOOLEAN_BLOCKS_PATH = r"blocks\boolean_blocks.json" # Path to the boolean blocks JSON file #C_BLOCKS_PATH = r"blocks\c_blocks.json" # Path to the C blocks JSON file #CAP_BLOCKS_PATH = r"blocks\cap_blocks.json" # Path to the cap blocks JSON file BLOCK_CATALOG_PATH = "blocks" # Define the path to your JSON file HAT_BLOCKS_PATH = "hat_blocks" # Path to the hat blocks JSON file STACK_BLOCKS_PATH = "stack_blocks" # Path to the stack blocks JSON file REPORTER_BLOCKS_PATH = "reporter_blocks" # Path to the reporter blocks JSON file BOOLEAN_BLOCKS_PATH = "boolean_blocks" # Path to the boolean blocks JSON file C_BLOCKS_PATH = "c_blocks" # Path to the C blocks JSON file CAP_BLOCKS_PATH = "cap_blocks" # Path to the cap blocks JSON file # Load the block catalogs from their respective JSON files # ================== HAT BLOCK =============================================== hat_block_data = _load_block_catalog(HAT_BLOCKS_PATH) #hat_description = hat_block_data["description"] hat_description = hat_block_data.get("description", "No description available") #hat_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in hat_block_data["blocks"]]) hat_opcodes_functionalities = "\n".join([ f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}" for block in hat_block_data.get("blocks", []) ]) if isinstance(hat_block_data.get("blocks"), list) else " No blocks information available." print("Hat blocks loaded successfully.", hat_description) # ================== BOOLEAN BLOCK =============================================== boolean_block_data = _load_block_catalog(BOOLEAN_BLOCKS_PATH) #boolean_description = boolean_block_data["description"] boolean_description = boolean_block_data.get("description", "No description available") #boolean_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in boolean_block_data["blocks"]]) boolean_opcodes_functionalities = "\n".join([ f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}" for block in boolean_block_data.get("blocks", []) ]) if isinstance(boolean_block_data.get("blocks"), list) else " No blocks information available." # ================== C BLOCK =============================================== c_block_data = _load_block_catalog(C_BLOCKS_PATH) #c_description = c_block_data["description"] c_description = c_block_data.get("description", "No description available") #c_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in c_block_data["blocks"]]) c_opcodes_functionalities = "\n".join([ f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}" for block in c_block_data.get("blocks", []) ]) if isinstance(c_block_data.get("blocks"), list) else " No blocks information available." # ================== CAP BLOCK =============================================== cap_block_data = _load_block_catalog(CAP_BLOCKS_PATH) #cap_description = cap_block_data["description"] cap_description = cap_block_data.get("description", "No description available") #cap_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in cap_block_data["blocks"]]) cap_opcodes_functionalities = "\n".join([ f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}" for block in cap_block_data.get("blocks", []) ]) if isinstance(cap_block_data.get("blocks"), list) else " No blocks information available." # ================== REPORTER BLOCK =============================================== reporter_block_data = _load_block_catalog(REPORTER_BLOCKS_PATH) #reporter_description = reporter_block_data["description"] reporter_description = reporter_block_data.get("description", "No description available") #reporter_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in reporter_block_data["blocks"]]) reporter_opcodes_functionalities = "\n".join([ f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}" for block in reporter_block_data.get("blocks", []) ]) if isinstance(reporter_block_data.get("blocks"), list) else " No blocks information available." # ================== STACK BLOCK =============================================== stack_block_data = _load_block_catalog(STACK_BLOCKS_PATH) #stack_description = stack_block_data["description"] stack_description = stack_block_data.get("description", "No description available") #stack_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in stack_block_data["blocks"]]) stack_opcodes_functionalities = "\n".join([ f" - Opcode: {block.get('op_code', 'N/A')}, functionality: {block.get('functionality', 'N/A')}" for block in stack_block_data.get("blocks", []) ]) if isinstance(stack_block_data.get("blocks"), list) else " No blocks information available." # This makes ALL_SCRATCH_BLOCKS_CATALOG available globally ALL_SCRATCH_BLOCKS_CATALOG = _load_block_catalog(BLOCK_CATALOG_PATH) # Helper function to generate a unique block ID def generate_block_id(): """Generates a short, unique ID for a Scratch block.""" return str(uuid.uuid4())[:10].replace('-', '') # Shorten for readability, ensure uniqueness # Placeholder for your extract_json_from_llm_response function # # Helper function to extract JSON from LLM response def extract_json_from_llm_response(raw_response: str) -> dict: # --- 1) Pull out the JSON code‑block if present --- md = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", raw_response) json_string = md.group(1).strip() if md else raw_response # --- 2) Trim to the outermost { … } so we drop any prefix/suffix junk --- first, last = json_string.find('{'), json_string.rfind('}') if 0 <= first < last: json_string = json_string[first:last+1] # --- 3) PRE‑CLEANUP: remove stray assistant{…}, rogue assistant keys, fix boolean quotes --- json_string = re.sub(r'\b\w+\s*{', '{', json_string) json_string = re.sub(r'"assistant"\s*:', '', json_string) json_string = re.sub(r'\b(false|true)"', r'\1', json_string) logger.debug("Ran pre‑cleanup for stray tokens and boolean quotes.") # --- 3.1) Fix stray inner quotes at start of name/list values --- # e.g., { "name": " \"recent_scoress\"", ... } → "recent_scoress" json_string = re.sub( r'("name"\s*:\s*")\s*"', r'\1', json_string ) # --- 4) Escape all embedded quotes in any `logic` value up to the next key --- def _esc(m): prefix, body = m.group(1), m.group(2) return prefix + body.replace('"', r'\"') json_string = re.sub( r'("logic"\s*:\s*")([\s\S]+?)(?=",\s*"[A-Za-z_]\w*"\s*:\s*)', _esc, json_string ) logger.debug("Escaped embedded quotes in logic fields.") logger.debug("Quoted unquoted keys.") # --- 6) Remove trailing commas before } or ] --- json_string = re.sub(r',\s*(?=[}\],])', '', json_string) json_string = re.sub(r',\s*,', ',', json_string) logger.debug("Removed trailing commas.") # --- 7) Balance braces: drop extra } at end if needed --- ob, cb = json_string.count('{'), json_string.count('}') if cb > ob: excess = cb - ob json_string = json_string.rstrip()[:-excess] logger.debug(f"Stripped {excess} extra closing brace(s).") # --- 8) Escape literal newlines in *all* string values --- json_string = re.sub( r'"((?:[^"\\]|\\.)*?)"', lambda m: '"' + m.group(1).replace('\n', '\\n').replace('\r', '\\r') + '"', json_string, flags=re.DOTALL ) logger.debug("Escaped newlines in strings.") # --- 9) Final parse attempt --- try: return json.loads(json_string) except json.JSONDecodeError: logger.error("Sanitized JSON still invalid:\n%s", json_string) raise def strip_noise(s: str) -> str: # 1. Remove any <|…|> markers s = re.sub(r"<\|.*?\|>", "", s) # 2. Remove any 'assistant<' plus following non‑whitespace s = re.sub(r"assistant<\S*", "", s) # 3. Strip stray angle‑brackets s = re.sub(r"[<>]", "", s) # 4. Fix malformed empty keys like sensing"": → sensing": s = re.sub(r'(\w+)""', r'\1"', s) # 5. Deduplicate any immediately–repeated JSON keys, e.g. # "control": […], "control": […] → keep only the first s = re.sub( r'("(?P\w+)":\s*(?P\[[^\]]*\]))\s*,\s*"\2":\s*\[[^\]]*\]', lambda m: m.group(1), s ) # 6. Collapse multiple blank lines and extra spaces s = re.sub(r"\n\s*\n+", "\n\n", s) s = re.sub(r" {2,}", " ", s) return s.strip() # Node 1: Detailed description generator. def game_description_node(state: GameState): """ Generates a detailed narrative description of the game based on the initial query. """ logger.info("--- Running GameDescriptionNode ---") sprite_name = {} initial_description = state.get("description", "A simple game.") project_json = state["project_json"] for target in project_json["targets"]: sprite_name[target["name"]] = target["name"] description_prompt = ( f"You are an AI assistant tasked with generating a detailed narrative description for a Scratch 3.0 game.\n" f"The initial high-level description is: '{initial_description}'.\n" f"The current Scratch project JSON is:\n```json\n{json.dumps(state['project_json'], indent=2)}\n```\n" f"Make sure you donot change Sprite and Stage name. Here are all the name: {sprite_name} \n" f"Create a rich, engaging, and detailed description, including potential gameplay elements, objectives, and overall feel with the available resources\n" f"The output should be a plain text description." ) try: response = agent.invoke({"messages": [{"role": "user", "content": description_prompt}]}) detailed_game_description = response["messages"][-1].content state["description"] = detailed_game_description#strip_noise(detailed_game_description) logger.info("Detailed game description generated by GameDescriptionNode.") print(f"Detailed Game Description: {detailed_game_description}") return state except Exception as e: logger.error(f"Error in GameDescriptionNode: {e}") raise # Node 2: Analysis User Query and Initial Positions def parse_query_and_set_initial_positions(state: GameState): logger.info("--- Running ParseQueryNode ---") # the Stage's center is `(0,0)` and height is from `(0,-200)` to `(0,200)` and width is `(-200,0)` to `(200,0)`, [temporaryly used 150 by -150 or 130 to -130] llm_query_prompt = f"""Based on the user's game description: '{state['description']}', \ Stage description: the Stage's center is `(0,0)` and height is from `(0,-130)` to `(0,130)` and width is `(-130,0)` to `(130,0)`, and the current Scratch project JSON below, \ determine the most appropriate initial 'x' and 'y' coordinates for each sprite. \ Return ONLY a JSON object with a single key 'sprite_initial_positions' mapping sprite names to their {{'x': int, 'y': int}} coordinates. The current Scratch project JSON is: ```json {json.dumps(state['project_json'], indent=2)} ``` Example Json output: ```json {{ "sprite_initial_positions": {{ "Sprite1": {{"x": -150, "y": -120}}, "Sprite2": {{"x": 150, "y": -120}} }} }} ``` """ try: response = agent.invoke({"messages": [{"role": "user", "content": llm_query_prompt}]}) raw_response = response["messages"][-1].content#strip_noise(response["messages"][-1].content) print("Raw response from LLM:", raw_response) # json debugging and solving try: updated_data = extract_json_from_llm_response(raw_response) sprite_positions = updated_data.get("sprite_initial_positions", {}) except json.JSONDecodeError as error_json: logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") # Use the JSON resolver agent to fix the response correction_prompt = ( "Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n" "Carefully review the JSON for any errors, especially focusing on the reported error at:\n" f"- **Error Details**: {error_json}\n\n" "**Strict Instructions for your response:**\n" "1. **ONLY** output the corrected JSON. Do not include any other text, comments, or explanations outside the JSON.\n" "2. Ensure all property names (keys) are enclosed in **double quotes**.\n" "3. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\\"`) are properly **escaped**.\n" "4. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n" "5. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n" "6. The corrected JSON must be a **complete and valid** JSON object.\n\n" "Here is the problematic JSON string to correct:\n" "```json\n" # Use markdown for clear JSON distinction f"{raw_response}\n" "```\n" "Corrected JSON:\n" # Indicate where the corrected JSON should start ) correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) print(f"[JSON CORRECTOR RESPONSE AT PARSER]: {correction_response['messages'][-1].content}") corrected_data = extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content)) sprite_positions = corrected_data.get("sprite_initial_positions", {}) new_project_json = update_project_with_sprite_positions(state["project_json"], sprite_positions) state["project_json"]= new_project_json print("Updated project JSON with sprite positions:", json.dumps(new_project_json, indent=2)) # [TEMPORARY FOR LOGGING] # Optional: Save raw_response that failed parsing #with open("debug_state.json", "w", encoding="utf-8") as f: # json.dump(state, f, indent=2, ensure_ascii=False) return state #return {"project_json": new_project_json} except Exception as e: logger.error(f"Error in ParseQueryNode: {e}") raise # Node 3: Declaration Planner Node (Updated) def declaration_planner_node(state: GameState): """ Generates a complete declaration plan that defines: - variables and cloud variables - lists - broadcast messages - monitors (usually tied to reporter blocks or variables) """ logger.info("--- Running DeclarationPlannerNode ---") description = state.get("description", "") # project_json = state["project_json"] # Not directly used in planning prompt, but good to have planning_prompt = f"""Game Description: {description} You are given the game design and are tasked to prepare a full declaration plan for Scratch 3.0. Here is the description to have opcode in the monitor which are mostly reporting block and the target is always **Stage** --- Scratch 3.0 Block Reference --- ### Reporter Blocks Description: {reporter_description} Blocks: {reporter_opcodes_functionalities} Your output must be valid JSON and follow this exact structure given as the example: ```json {{ "declaration_plan": {{ "variables": [ {{ "name": "score", "default": 0, "cloud": false }}, {{ "name": "☁ High Score", "default": 0, "cloud": true }}, {{ "name": "speed", "default": 0, "cloud": false }} ], "lists": [ {{ "name": "recent_scores", "default": [] }}, {{ "name": "tools", "default": [] }} ], "broadcasts": [ "Game Over", "Game Start" ], "monitors": [ {{ "target": "Stage", "variable": "score", "visible": true, "opcode_hint": "data_variable" }}, {{ "target": "Stage", "variable": "☁ High Score", "visible": true, "opcode_hint": "data_variable" }}, {{ "target": "Stage", "reporter_name": "costume #", "visible": false, "opcode_hint": "looks_costumenumbername", "param_name": "NUMBER_NAME" }} {{ "target": "Stage", "reporter_name": "costume #", "visible": false, "opcode_hint": "looks_costumenumbername", "param_name": "NUMBER_NAME" }} ] }} }} ``` Guidelines: - Add meaningful variables to represent game mechanics (e.g., score, health, level, etc). - If a variable is shared across devices or saved to the cloud, mark it as `cloud: true`. - Lists should reflect repeated values or game logs like `recent_scores`, `highscore_history` etc. - Broadcasts should represent game state changes (e.g., Game Over, Game Start, Reset Game). - Monitors include on-screen displayed data like variable reports, costume index, or stage backdrop number. - For monitors, include an `opcode_hint` to suggest the Scratch block opcode that will report the value (e.g., `data_variable`, `motion_xposition`, `looks_costumenumbername`). - If a monitor needs a specific parameter (like 'number' or 'name' for costume), add a `param_name` field (e.g., `"param_name": "NUMBER_NAME"`). - Use variable and broadcast names that are clean, human-readable, and contextually relevant. - Use the actual names given in description as the json is case sensitive **do not** assume the names. - Avoid duplicating variable names. Always use the same case-sensitive name. Start generating the `declaration_plan` for this game.""" try: response = agent.invoke({"messages": [{"role": "user", "content": planning_prompt}]}) raw_response = response["messages"][-1].content print("Raw response from LLM [DeclarationPlannerNode]:", raw_response) try: declaration_plan = extract_json_from_llm_response(raw_response) except json.JSONDecodeError as error_json: logger.warning("Malformed JSON detected. Attempting to correct...") correction_prompt = ( "Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n" "Carefully review the JSON for any errors, especially focusing on the reported error at:\n" f"- **Error Details**: {error_json}\n\n" "**Strict Instructions for your response:**\n" "1. **ONLY** output the corrected JSON. Do not include any other text, comments, or explanations outside the JSON.\n" "2. Ensure all property names (keys) are enclosed in **double quotes**.\n" "3. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\\"`) are properly **escaped**.\n" "4. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n" "5. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n" "6. **Crucially, remove any extraneous characters or duplicate closing braces outside the main JSON object.**\n" "7. The corrected JSON must be a **complete and valid** JSON object.\n\n" "Here is the problematic JSON string to correct:\n" "```json\n" f"{raw_response}\n" "```\n" "Corrected JSON:\n" ) correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) declaration_plan = extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content)) state["declaration_plan"] = declaration_plan logger.info("Declaration plan successfully generated.") # Save debug #with open("debug_declaration_plan.json", "w", encoding="utf-8") as f: # json.dump(declaration_plan, f, indent=2, ensure_ascii=False) return state except Exception as e: logger.error(f"DeclarationPlannerNode failed: {e}") raise # Re-raise to ensure error is propagated # Node 4: Declaration Builder Node def declaration_builder_node(state: GameState): logger.info("--- Running DeclarationBuilderNode ---") declaration_plan = state.get("declaration_plan") if not declaration_plan: logger.warning("No declaration plan found in state. Skipping DeclarationBuilderNode.") return state project_json = state["project_json"] targets = project_json["targets"] # Find the Stage target, which holds global variables, lists, and broadcasts stage_target = next((target for target in targets if target["isStage"]), None) if not stage_target: logger.error("Stage target not found in project_json. Cannot build declarations.") return state # Initialize sections if they don't exist if "variables" not in stage_target: stage_target["variables"] = {} if "lists" not in stage_target: stage_target["lists"] = {} if "broadcasts" not in stage_target: stage_target["broadcasts"] = {} if "monitors" not in project_json: # Monitors are typically at the project root project_json["monitors"] = [] # 1. Build Variables logger.info("Building variables...") for var_data in declaration_plan.get("declaration_plan", {}).get("variables", []): var_name = var_data["name"] var_default = str(var_data["default"]) # Scratch stores values as strings var_cloud = var_data["cloud"] # Generate a unique ID for the variable var_id = f"{var_name}_{str(uuid.uuid4())[:8]}" # Using UUID for uniqueness # Check if variable already exists (e.g., if a previous run created it) # This part requires careful handling of existing IDs, usually by looking them up by name existing_var_id = None for existing_id, existing_var_info in stage_target["variables"].items(): if existing_var_info[0] == var_name: existing_var_id = existing_id break if existing_var_id: # Update existing variable if found stage_target["variables"][existing_var_id][1] = var_default if len(stage_target["variables"][existing_var_id]) > 2: stage_target["variables"][existing_var_id][2] = var_cloud else: if var_cloud: # Only add true for cloud variables if not present stage_target["variables"][existing_var_id].append(True) logger.info(f"Updated variable: {var_name}") else: # Add new variable variable_definition = [var_name, var_default] if var_cloud: variable_definition.append(True) stage_target["variables"][var_id] = variable_definition logger.info(f"Added new variable: {var_name}") # 2. Build Lists logger.info("Building lists...") for list_data in declaration_plan.get("declaration_plan", {}).get("lists", []): list_name = list_data["name"] list_default = list_data["default"] # Generate a unique ID for the list list_id = f"{list_name}_{str(uuid.uuid4())[:8]}" # Check for existing list existing_list_id = None for existing_id, existing_list_info in stage_target["lists"].items(): if existing_list_info[0] == list_name: existing_list_id = existing_id break if existing_list_id: # Update existing list stage_target["lists"][existing_list_id][1] = list_default logger.info(f"Updated list: {list_name}") else: # Add new list stage_target["lists"][list_id] = [list_name, list_default] logger.info(f"Added new list: {list_name}") # 3. Build Broadcasts logger.info("Building broadcasts...") for broadcast_name in declaration_plan.get("declaration_plan", {}).get("broadcasts", []): # Generate a unique ID for the broadcast (Scratch uses IDs for broadcasts internally) broadcast_id = f"{broadcast_name}_{str(uuid.uuid4())[:8]}" # Check if broadcast already exists existing_broadcast_id = None for existing_id, existing_broadcast_name in stage_target["broadcasts"].items(): if existing_broadcast_name == broadcast_name: existing_broadcast_id = existing_id break if existing_broadcast_id: logger.info(f"Broadcast '{broadcast_name}' already exists.") else: stage_target["broadcasts"][broadcast_id] = broadcast_name logger.info(f"Added new broadcast: {broadcast_name}") # 4. Build Monitors logger.info("Building monitors...") monitor_y_offset = 0 # Starting Y position for monitors for monitor_data in declaration_plan.get("declaration_plan", {}).get("monitors", []): monitor_target = monitor_data.get("target") monitor_visible = monitor_data.get("visible", False) opcode_hint = monitor_data.get("opcode_hint") param_name = monitor_data.get("param_name") # For things like costume # or name # Determine the actual target object in project_json actual_target = next((t for t in targets if t["name"] == monitor_target), None) if not actual_target: logger.warning(f"Monitor target '{monitor_target}' not found. Skipping monitor.") continue monitor_block_prompt_data = { "opcode_hint": opcode_hint, "target_name": monitor_target, "visible": monitor_visible, "current_project_json": project_json # Provide full project JSON for context to LLM } # Add variable/reporter specific data if "variable" in monitor_data: monitor_block_prompt_data["variable_name"] = monitor_data["variable"] elif "reporter_name" in monitor_data: monitor_block_prompt_data["reporter_name"] = monitor_data["reporter_name"] if param_name: monitor_block_prompt_data["param_name"] = param_name llm_monitor_block_generation_prompt = ( f"You are an AI assistant tasked with generating the Scratch block JSON for a monitor.\n" f"The monitor details are:\n" f"- Target Sprite/Stage: '{monitor_block_prompt_data['target_name']}'\n" f"- Desired Opcode Hint: '{monitor_block_prompt_data['opcode_hint']}'\n" f"- Visibility: {monitor_block_prompt_data['visible']}\n" ) if "variable_name" in monitor_block_prompt_data: llm_monitor_block_generation_prompt += f"- Variable Name: '{monitor_block_prompt_data['variable_name']}'\n" if "reporter_name" in monitor_block_prompt_data: llm_monitor_block_generation_prompt += f"- Reporter Name: '{monitor_block_prompt_data['reporter_name']}'\n" if "param_name" in monitor_block_prompt_data: llm_monitor_block_generation_prompt += f"- Parameter Name: '{monitor_block_prompt_data['param_name']}'\n" llm_monitor_block_generation_prompt += ( f"\nHere is a catalog of reporter blocks that might be relevant, including their `op_code` and parameters:\n" f"```json\n{json.dumps(reporter_block_data, indent=2)}\n```\n\n" # Provide reporter blocks catalog f"Current Scratch project JSON (for context, especially existing variables/lists):\n" f"```json\n{json.dumps(project_json, indent=2)}\n```\n\n" f"**CRITICAL INSTRUCTIONS FOR GENERATING THE MONITOR BLOCK JSON:**\n" f"1. **Output ONLY the JSON object for the `monitor_block`.** Do NOT include any other text or markdown fences like ```json.\n" f"2. The top-level key in your output should be `monitor_block` and its value should be the block definition.\n" f"3. Generate a **globally unique ID** for the `monitor_block`.\n" f"4. Set the `opcode` field based on the `opcode_hint` provided (e.g., `data_variable`, `motion_xposition`, `looks_costumenumbername`).\n" f"5. The `mode` should typically be `\"default\"` for standard monitors.\n" f"6. For variable monitors (`data_variable` opcode), the `params` field MUST contain `\"VARIABLE\": \"{{variable_name}}\"`.\n" f"7. For reporter blocks that have parameters (like `looks_costumenumbername`), the `params` field should contain the correct parameter name (e.g., `\"NUMBER_NAME\": \"number\"` or `\"NUMBER_NAME\": \"name\"`). Refer to the `reporter_block_data` for parameter names.\n" f"8. Set `spriteName` to the actual target sprite's name, or `null` if it's a Stage variable/reporter.\n" f"9. Set `value` to a reasonable default (e.g., \"0\" for numbers, \"\" for strings, or the current value from project_json for existing variables).\n" f"10. Set `visible` to `true` or `false` as specified in the plan.\n" f"11. Provide sensible `x` and `y` coordinates. For demonstration, increment `y` for each new monitor.\n" f"12. `sliderMin`, `sliderMax`, and `isDiscrete` are typically for variables; set them appropriately if known or use defaults (0, 100, true).\n" f"13. Ensure all keys are double-quoted.\n" ) try: response = agent.invoke({"messages": [{"role": "user", "content": llm_monitor_block_generation_prompt}]}) raw_response = response["messages"][-1].content#strip_noise(response["messages"][-1].content) logger.info(f"Raw response from LLM [DeclarationBuilderNode - Monitor]: {raw_response[:500]}...") try: # Expecting a dict with a "monitor_block" key generated_monitor_json = extract_json_from_llm_response(raw_response) monitor_block = generated_monitor_json.get("monitor_block") if not monitor_block: raise ValueError("LLM response for monitor block did not contain 'monitor_block' key.") except json.JSONDecodeError as error_json: logger.error("Failed to extract JSON for monitor block. Attempting to correct.") correction_prompt = ( "Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n" "It must represent a single Scratch monitor block definition.\n" f"- **Error Details**: {error_json}\n\n" "**Strict Instructions for your response:**\n" "1. **ONLY** output the corrected JSON object. Do not include any other text, comments, or explanations outside the JSON.\n" "2. The top-level key must be `monitor_block`.\n" "3. Ensure all property names (keys) are enclosed in **double quotes**.\n" "4. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\\"`) are properly **escaped**.\n" "5. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n" "6. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n" "7. **Crucially, remove any extraneous characters or duplicate closing braces outside the main JSON object.**\n" "8. The corrected JSON must be a **complete and valid** JSON object.\n\n" "Here is the problematic JSON string to correct:\n" "```json\n" f"{raw_response}\n" "```\n" "Corrected JSON:\n" ) correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) generated_monitor_json = extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content)) monitor_block = generated_monitor_json.get("monitor_block") if not monitor_block: raise ValueError("JSON corrector still did not produce a valid monitor_block.") # Assign coordinates for display, avoiding overlaps monitor_block["x"] = 5 + (monitor_y_offset % 2) * 320 # Alternate X for better layout monitor_block["y"] = 5 + (monitor_y_offset // 2) * 50 # Increment Y monitor_y_offset += 1 # Increment for next monitor project_json["monitors"].append(monitor_block) logger.info(f"Added monitor for '{monitor_data.get('variable', monitor_data.get('reporter_name'))}' to project.json.") except Exception as e: logger.error(f"Error generating monitor block for {monitor_data}: {e}") # Decide whether to raise or continue based on error tolerance state["project_json"] = project_json logger.info("Declaration builder node finished updating project JSON.") #with open("debug_state.json", "w", encoding="utf-8") as f: # json.dump(state, f, indent=2, ensure_ascii=False) print("Updated project JSON after declarations:", json.dumps(project_json, indent=2)) return state # Node 5: Sprite Action Plan Builder def overall_planner_node(state: GameState): """ Generates a comprehensive action plan for sprites, including detailed Scratch block information. This node acts as an overall planner, leveraging knowledge of all block shapes and categories. """ logger.info("--- Running OverallPlannerNode ---") description = state.get("description", "") project_json = state["project_json"] # MODIFICATION 1: Include 'Stage' in the list of names to plan for. # It's crucial to ensure 'Stage' is always present for its global role. target_names = [t["name"] for t in project_json["targets"]] # MODIFICATION 2: Get sprite positions, providing default for Stage as it doesn't have x,y sprite_positions = {} for target in project_json["targets"]: if not target["isStage"]: sprite_positions[target["name"]] = {"x": target.get("x", 0), "y": target.get("y", 0)} else: sprite_positions[target["name"]] = {"x": "N/A", "y": "N/A"} # Stage doesn't have positional coordinates declaration_plan = state["declaration_plan"] planning_prompt = f"""Generate a detailed action plan for the game's sprites and stage based on the user query and sprite details. **Game Description:** '{description}' **Targets in Game (Sprites and Stage):** {', '.join(target_names)} **Current Target Positions (Sprites have x/y, Stage is N/A):** {json.dumps(sprite_positions)} Here is the overall declaration of variable, broadcast and monitors to look for and utilized as per requirment. **Current Declaration Plan:** {json.dumps(declaration_plan)} --- Scratch 3.0 Block Reference --- This section provides a comprehensive reference of Scratch 3.0 blocks, categorized by shape, including their opcodes and functional descriptions. Use this to accurately identify block types and behavior. ### Hat Blocks Description: {hat_description} Blocks: {hat_opcodes_functionalities} ### Boolean Blocks Description: {boolean_description} Blocks: {boolean_opcodes_functionalities} ### C Blocks Description: {c_description} Blocks: {c_opcodes_functionalities} ### Cap Blocks Description: {cap_description} Blocks: {cap_opcodes_functionalities} ### Reporter Blocks Description: {reporter_description} Blocks: {reporter_opcodes_functionalities} ### Stack Blocks Description: {stack_description} Blocks: {stack_opcodes_functionalities} ----------------------------------- Your task is to define the primary actions and movements for each sprite AND THE STAGE. The output should be a JSON object with a single key 'action_overall_flow'. Each key inside this object should be a sprite or 'Stage' name (e.g., 'Player', 'Enemy', 'Stage'), and its value must include a 'description' and a list of 'plans'. Each plan must include a **single Scratch Hat Block** (e.g., 'event_whenflagclicked') to start scratch project and should contain: 1. **'event'**: the exact `opcode` of the hat block that initiates the logic. 2. **'logic'**: a natural language breakdown of each step taken after the event, formatted as a multi-line string representing pseudo-code. Ensure clarity and granularity—each described action should map closely to a Scratch block or tight sequence. - Use 'forever: ...' or 'repeat(10): ...' to prefix repeating logic suitable taking reference from the C blocks. - Use Scratch-consistent verbs: 'move', 'change', 'wait', 'hide', 'show', 'say', 'glide', etc. - For coordinates or numeric values within the logic string, use them directly (e.g., 'x: 240 y: -100' or 'change y by 10'). The entire 'logic' field itself must be a single string. 3. **Opcode Lists**: include relevant Scratch opcodes grouped under `motion`, `control`, `operator`, `sensing`, `looks`, `sounds`, `events`, and `data`. List only the non-empty categories. Use exact opcodes including shadow/helper blocks (e.g., 'math_number'). 4. Few Example of content of logics inside for a specific plan: - example 1[continues moving objects]: "when green flag clicked go to x: 240 y: -100 // Start off to the right, on the ground set [obstacle speed v] to -5 show forever change x by (obstacle speed) if <(x position) < -240> then // If off-screen left go to x: 240 y: -100 // Reset to right again end end " - example 2[jumping script of an plan]: "when [space v] key pressed if <(y position) = -100> then // Only jump if on the ground repeat (5) change y by (100) // Wave paw wait (0.1) seconds change y by (-100) // Back to normal wait (0.1) seconds end end " 5. Use target names exactly as listed in `Targets in Game`. Do NOT rename or invent new targets. 6. Ensure the plan reflects accurate opcode usage derived strictly from the block reference above. 7. Few shot Example structure for 'action_overall_flow': ```json {{ "action_overall_flow": {{ "Stage": {{ "description": "Background and global game state management, including broadcasts, rewards, and score.", "plans": [ {{ "event": "event_whenflagclicked", "logic": "when green flag clicked\n switch backdrop to [backdrop1 v]\n set [score v] to 0\n show variable [score v]\n broadcast [Game Start v]", "motion": [], "control": [], "operator": [], "sensing": [], "looks": [ "looks_switchbackdropto" ], "sounds": [], "events": [ "event_broadcast" ], "data": [ "data_setvariableto", "data_showvariable" ] }}, {{ "event": "event_whenbroadcastreceived", "logic": "when I receive [Game Over v]\n if <(score) > (High Score)> then\n set [High Score v] to (score)\n end\n switch backdrop to [HighScore v]", "motion": [], "control": [ "control_if" ], "operator": [ "operator_gt" ], "sensing": [], "looks": [ "looks_switchbackdropto" ], "sounds": [], "events": [], "data": [ "data_setvariableto" ] }} ] }}, "Sprite1": {{ "description": "Main character (cat) actions", "plans": [ {{ "event": "event_whenflagclicked", "logic": "when green flag clicked\n go to x: 240 y: -100\n end\n.", "motion": [ "motion_gotoxy" ], "control": [], "operator": [], "sensing": [], "looks": [], "sounds": [], "events": [], "data": [] }}, {{ "event": "event_whenkeypressed", "logic": "when [space v] key pressed\n repeat (10)\n change y by (20)\n wait (0.1) seconds\n change y by (-20)\n end", "motion": [ "motion_changeyby" ], "control": [ "control_repeat", "control_wait" ], "operator": [], "sensing": [], "looks": [], "sounds": [], "events": [], "data": [] }} ] }}, "soccer ball": {{ "description": "Obstacle movement and interaction", "plans": [ {{ "event": "event_whenflagclicked", "logic": "when green flag clicked\n go to x: 240 y: -135\n forever\n glide 2 seconds to x: -240 y: -135\n if <(x position) < -235> then\n set x to 240\n end\n if then\n broadcast [Game Over v]\n stop [all v]\n end\n end", "motion": [ "motion_gotoxy", "motion_glidesecstoxy", "motion_xposition", "motion_setx" ], "control": [ "control_forever", "control_if", "control_stop" ], "operator": [ "operator_lt" ], "sensing": [ "sensing_istouching", "sensing_touchingobjectmenu" ], "looks": [], "sounds": [], "events": [ "event_broadcast" ], "data": [] }} ] }} }} }} ``` 8. Based on the provided context, generate the `action_overall_flow`. - Maintain the **exact JSON structure** shown above. - All `logic` fields must be **clear and granular**. - Only include opcode categories that contain relevant opcodes. - Ensure that each opcode matches its intended Scratch functionality. - If feedback suggests major change, **rethink the entire plan** for the affected sprite(s). - If feedback is minor, make precise, minimal improvements only. """ try: response = agent.invoke({"messages": [{"role": "user", "content": planning_prompt}]}) print("Raw response from LLM [OverallPlannerNode 1]:",response) raw_response = response["messages"][-1].content#strip_noise(response["messages"][-1].content) print("Raw response from LLM [OverallPlannerNode 2]:", raw_response) # Uncomment for debugging # json debugging and solving try: overall_plan = extract_json_from_llm_response(raw_response) except json.JSONDecodeError as error_json: logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") # Use the JSON resolver agent to fix the response correction_prompt = ( "Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n" "Carefully review the JSON for any errors, especially focusing on the reported error at:\n" f"- **Error Details**: {error_json}\n\n" "**Strict Instructions for your response:**\n" "1. **ONLY** output the corrected JSON. Do not include any other text, comments, or explanations outside the JSON.\n" "2. Ensure all property names (keys) are enclosed in **double quotes**.\n" "3. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\`) are properly **escaped**.\n" "4. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n" "5. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n" "6. **Crucially, remove any extraneous characters or duplicate closing braces outside the main JSON object.**\n" "7. The corrected JSON must be a **complete and valid** JSON object.\n\n" "Here is the problematic JSON string to correct:\n" "```json\n" f"{raw_response}\n" "```\n" "Corrected JSON:\n" ) correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) print(f"[JSON CORRECTOR RESPONSE AT OVERALLPLANNERNODE ]: {correction_response['messages'][-1].content}") overall_plan= extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content)) state["action_plan"] = overall_plan logger.info("Overall plan generated by OverallPlannerNode.") #with open("debug_state.json", "w", encoding="utf-8") as f: # json.dump(state, f, indent=2, ensure_ascii=False) return state except Exception as e: logger.error(f"Error in OverallPlannerNode: {e}") raise # Node 6: Logic updating if any issue here def plan_logic_aligner_node(state: GameState): logger.info("--- Running plan_logic_aligner_node ---") game_description = state.get("description", "No game description provided.") action_plan = state.get("action_plan", {}) # Ensure action_plan exists and contains "action_overall_flow" # if not action_plan or "action_overall_flow" not in action_plan: # logger.warning("No 'action_overall_flow' found in state['action_plan']. Skipping plan refinement and block relation analysis.") # return state all_catalogs = [ hat_block_data, boolean_block_data, c_block_data, cap_block_data, reporter_block_data, stack_block_data ] refined_overall_flow = {} if action_plan.get("action_overall_flow", {}) == {}: plan_data = action_plan.items() else: plan_data = action_plan.get("action_overall_flow", {}).items() for target_name, target_data in plan_data: refined_target_plans = [] overall_plans_in_sprite = [] # for getting overall plan for not getting deviated for plan in target_data.get("plans", []): original_logic = plan.get("logic", "") overall_plans_in_sprite.append(original_logic) print(f"[overall_plans_in_sprite AT PLAN ALIGNER]: {overall_plans_in_sprite}") for plan in target_data.get("plans", []): original_logic = plan.get("logic", "") event = plan.get("event", "unknown_event") opcodes = { "motion": plan.get("motion", plan.get("motions", [])), "control": plan.get("control", plan.get("controls", [])), "operator": plan.get("operator", plan.get("operators", [])), "sensing": plan.get("sensing", []), "looks": plan.get("looks", plan.get("look", [])), "sounds": plan.get("sounds", plan.get("sound", [])), "events": plan.get("events", []), "data": plan.get("data", []) } motion_opcodes = plan.get("motion", plan.get("motions", [])) # handling of motion and motions by llm without re control_opcodes = plan.get("control", plan.get("controls", [])) # handling of control and controls by llm without re operator_opcodes = plan.get("operator", plan.get("operators", [])) # handling of operator and operators by llm without re sensing_opcodes = plan.get("sensing", []) looks_opcodes = plan.get("looks", plan.get("look", [])) # handling of look and looks by llm without re sound_opcodes = plan.get("sounds", plan.get("sound", [])) # handling of sound and sounds by llm without re events_opcodes = plan.get("events", []) events_opcodes_2_raw = plan.get("event", []) # Get the value as is events_opcodes_2 = [events_opcodes_2_raw] if isinstance(events_opcodes_2_raw, str) else events_opcodes_2_raw data_opcodes = plan.get("data", []) needed_opcodes = ( motion_opcodes + control_opcodes + operator_opcodes + sensing_opcodes + looks_opcodes + sound_opcodes + events_opcodes + events_opcodes_2 + data_opcodes ) needed_opcodes = list(set(needed_opcodes)) # 2) build filtered runtime catalog (if you still need it) filtered_catalog = { op: ALL_SCRATCH_BLOCKS_CATALOG[op] for op in needed_opcodes if op in ALL_SCRATCH_BLOCKS_CATALOG } # 3) merge human-written catalog + runtime entry for each opcode combined_blocks = {} for op in needed_opcodes: catalog_def = find_block_in_all(op, all_catalogs) or {} runtime_def = ALL_SCRATCH_BLOCKS_CATALOG.get(op, {}) # merge: catalog fields first, then runtime overrides/adds merged = {**catalog_def, **runtime_def} combined_blocks[op] = merged print("Combined blocks for this script:", json.dumps(combined_blocks, indent=2)) refinement_prompt = f""" You are an expert in Scratch 3.0 game development, specializing in understanding block relationships (stacked, nested). Review the following plan for '{target_name}' triggered by '{event}'. Game Description: {game_description} --- Scratch 3.0 Block Reference --- ### Hat Blocks Description: {hat_description} Blocks: {hat_opcodes_functionalities} ### Boolean Blocks Description: {boolean_description} Blocks: {boolean_opcodes_functionalities} ### C Blocks Description: {c_description} Blocks: {c_opcodes_functionalities} ### Cap Blocks Description: {cap_description} Blocks: {cap_opcodes_functionalities} ### Reporter Blocks Description: {reporter_description} Blocks: {reporter_opcodes_functionalities} ### Stack Blocks Description: {stack_description} Blocks: {stack_opcodes_functionalities} ----------------------------------- Current Plan Details: - Event (Hat Block Opcode): {event} - Overall plans made on {target_name} are in the list: {overall_plans_in_sprite} - Associated Opcodes by Category: {json.dumps(opcodes, indent=2)} - Reference code and functionality and logics script for to check is script correct: {combined_blocks} - Current Logic Description to Update if required: "{original_logic}" Your task is to: 1. **Refine the 'Logic'**: Make it more precise, accurate, and fully aligned with the game mechanics described in the 'Game Description'. Ensure it uses Scratch-consistent verbs and phrasing. **Do NOT** use double quotes within the 'logic' string itself for values. 2. The logic should build flow which can correctly describe motion or flow for e.g jumping, running, flying and other mechanics. 3. **'logic'**: a natural language breakdown of each step taken after the event, formatted as a multi-line string representing pseudo-code. Ensure clarity and granularity—each described action should map closely to a Scratch block or tight sequence. 4. Few Example of content of logics inside for a specific plan: - example 1[continuos moving objects]: "when green flag clicked go to x: 240 y: -100 // Start off to the right, on the ground set [obstacle speed v] to -5 show forever change x by (obstacle speed) if <(x position) < -240> then // If off-screen left go to x: 240 y: -100 // Reset to right again end end " - example 2[jumping script of an plan]: " when [space v] key pressed if <(y position) = -100> then // Only jump if on the ground repeat (5) change y by (100) // Wave paw wait (0.1) seconds change y by (-100) // Back to normal wait (0.1) seconds end end " - example 3[score decrement on collision]: "when green flag clicked set [score v] to 0 forever if then change [score v] by (-1) wait (0.1) seconds // To prevent rapid score decrement end 5. Donot add any explaination of logic or comments to justify or explain just put the logic content in the json. 6. Output a JSON object and [NOTE: use double quates always ""]: - `refined_logic`: The refined logic string. Output ONLY the JSON object. """ refined_logic = original_logic # Default to original #block_relationships = [] # Default to empty try: # Invoke the main agent for logic refinement and relationship identification response = agent.invoke({"messages": [{"role": "user", "content": refinement_prompt}]}) llm_output_raw = response["messages"][-1].content parsed_llm_output = extract_json_from_llm_response(llm_output_raw) refined_logic = parsed_llm_output.get("refined_logic", original_logic) #block_relationships = parsed_llm_output.get("block_relationships", []) logger.info(f"Successfully processed plan for {target_name} - {event}.") except json.JSONDecodeError as error_json: logger.error(f"JSON Decode Error for {target_name} - {event}: {error_json}. Attempting correction.") # If JSON parsing fails, use the json resolver agent correction_prompt = ( "Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n" "It must be a JSON object with `refined_logic` (string) and `block_relationships` (array of objects).\n" f"- **Error Details**: {error_json}\n\n" "**Strict Instructions for your response:**\n" "1. **ONLY** output the corrected JSON. Do not include any other text or explanations.\n" "2. Ensure all keys and string values are enclosed in **double quotes**. Escape internal quotes (`\\`).\n" "3. No trailing commas. Correct nesting.\n\n" "Here is the problematic JSON string to correct:\n" f"```json\n{llm_output_raw}\n```\n" "Corrected JSON:\n" ) try: correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) corrected_output = extract_json_from_llm_response(correction_response["messages"][-1].content) refined_logic = corrected_output.get("refined_logic", original_logic) #block_relationships = corrected_output.get("block_relationships", []) logger.info(f"Successfully corrected JSON output for {target_name} - {event}.") except Exception as e_corr: logger.error(f"Failed to correct JSON output for {target_name} - {event} even after retry: {e_corr}") # Final fallback if correction fails refined_logic = original_logic #block_relationships = [] except Exception as e: logger.error(f"General error invoking LLM for {target_name} - {event}: {e}") # Fallback to original logic and empty relationships on any other error refined_logic = original_logic #block_relationships = [] plan["logic"] = refined_logic #plan["block_relationships"] = block_relationships refined_target_plans.append(plan) refined_overall_flow[target_name] = {"description": target_data.get("description"), "plans": refined_target_plans} # Update the original action_plan in the state with the refined version state["action_plan"]["action_overall_flow"] = refined_overall_flow print(f"[OVREALL REFINED LOGIC]: {refined_overall_flow}") logger.info("Plan refinement and block relation analysis completed for all plans.") return state # Node 7: Sprite Plan Verification Node def plan_verification_node(state: GameState): """ Validates the generated action plan/blocks, identifies missing logic, provides feedback, and determines if further improvements are needed. Also manages the iteration count for the improvement loop. """ logger.info(f"--- Running VerificationNode (Iteration: {state.get('iteration_count', 0)}) ---") MAX_IMPROVEMENT_ITERATIONS = 1 # Set a sensible limit to prevent infinite loops current_iteration = state.get("iteration_count", 0) project_json = state["project_json"] action_plan = state.get("action_plan", {}) print(f"[action_plan before verification] on ({current_iteration}): {json.dumps(action_plan, indent=2)}") # --- Optimized Logic: Check iteration count BEFORE LLM invocation --- if current_iteration >= MAX_IMPROVEMENT_ITERATIONS: logger.warning(f"Max improvement iterations ({MAX_IMPROVEMENT_ITERATIONS}) reached. Skipping LLM call and forcing 'needs_improvement' to False.") state["needs_improvement"] = False state["plan_validation_feedback"] = "Max iterations reached, stopping further improvements." state["iteration_count"] = 0 # Reset iteration count as improvement loop is stopping print(f"[updated action_plan after verification] on ({current_iteration}): {json.dumps(state.get('action_plan', {}), indent=2)}") #with open("debug_state.json", "w", encoding="utf-8") as f: # json.dump(state, f, indent=2, ensure_ascii=False) return state # --- End of Optimized Logic --- # Corrected validation_prompt (ensure all variables are passed or mocked for demonstration) # For a real scenario, make sure these are populated from the state or other sources hat_description = "Blocks that start a script when an event happens." hat_opcodes_functionalities = "event_whenflagclicked (when green flag clicked), event_whenkeypressed (when key pressed), event_whenthisspriteclicked (when this sprite clicked)" boolean_description = "Blocks that report true or false." boolean_opcodes_functionalities = "operator_and, operator_or, operator_not, sensing_touchingobject, sensing_keypressed" c_description = "Blocks that create a control flow, like loops or conditionals." c_opcodes_functionalities = "control_if, control_if_else, control_repeat, control_forever" cap_description = "Blocks that stop a script." cap_opcodes_functionalities = "control_stop" reporter_description = "Blocks that report a value (numbers, strings)." reporter_opcodes_functionalities = "motion_xposition, motion_yposition, sensing_askandwait, data_variable" stack_description = "Blocks that perform actions sequentially." stack_opcodes_functionalities = "motion_movesteps, looks_sayforsecs, sound_playuntildone, control_wait" validation_prompt = f"""You are an AI validator for Scratch project plans and generated blocks. \ Your task is to review the current state of the game's action plan and block structure. \ Critically analyze if there are any missing logic, structural inconsistencies, or unclear intentions. \ Provide **precise** and **constructive** feedback for improvement. **Game Description:** {state.get('description', '')} **Current Action Plan (High-Level Logic):** ```json {json.dumps(action_plan, indent=2)} ``` **Current Project JSON (Generated Blocks):** ```json {json.dumps(project_json, indent=2)} ``` **Previous Feedback (if any):** {state.get('plan_validation_feedback', 'None')} --- Scratch 3.0 Block Reference --- This section provides a comprehensive reference of Scratch 3.0 blocks, categorized by shape, including their opcodes and functional descriptions. Use this to accurately identify block types and behavior. ### Hat Blocks Description: {hat_description} Blocks: {hat_opcodes_functionalities} ### Boolean Blocks Description: {boolean_description} Blocks: {boolean_opcodes_functionalities} ### C Blocks Description: {c_description} Blocks: {c_opcodes_functionalities} ### Cap Blocks Description: {cap_description} Blocks: {cap_opcodes_functionalities} ### Reporter Blocks Description: {reporter_description} Blocks: {reporter_opcodes_functionalities} ### Stack Blocks Description: {stack_description} Blocks: {stack_opcodes_functionalities} **Check** if a any opcode related to logic inside the plans is missing or invalid opcode used, if any update it where required. Based on the above, return a response strictly in the following JSON format: ```json {{ "feedback": "Detailed comments on any missing logic, inconsistencies, or unclear intent. Be concise but specific. If everything is perfect, state that explicitly.", "needs_improvement": true, "suggested_description_updates": "Concise revision of the game description if needed. Use an empty string if no change is required." }} ``` **Important:** - The `needs_improvement` field must be strictly `true` or `false` (boolean). Do **not** include any other text or explanation inside the JSON. - Be strict in evaluation. If **any** part of the plan or block logic appears incomplete, ambiguous, or incorrect, set `needs_improvement` to `true`.""" try: response = agent.invoke({"messages": [{"role": "user", "content": validation_prompt}]}) raw_response = response["messages"][-1].content #strip_noise(response["messages"][-1].content) logger.info(f"Raw response from LLM [VerificationNode]: {raw_response[:500]}...") # json debugging and solving try: validation_result = extract_json_from_llm_response(raw_response) except json.JSONDecodeError as error_json: logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") # Use the JSON resolver agent to fix the response correction_prompt = ( "Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n" "Carefully review the JSON for any errors, especially focusing on the reported error at:\n" f"- **Error Details**: {error_json}\n\n" "**Strict Instructions for your response:**\n" "1. **ONLY** output the corrected JSON. Do not include any other text, comments, or explanations outside the JSON.\n" "2. Ensure all property names (keys) are enclosed in **double quotes**.\n" "3. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\`) are properly **escaped**.\n" "4. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n" "5. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n" "6. **Crucially, remove any extraneous characters or duplicate closing braces outside the main JSON object.**\n" # Added instruction "7. The corrected JSON must be a **complete and valid** JSON object.\n\n" "Here is the problematic JSON string to correct:\n" "```json\n" f"{raw_response}\n" "```\n" "Corrected JSON:\n" ) correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) print(f"[JSON CORRECTOR RESPONSE AT PLANVERIFICATIONNODE ]: {correction_response['messages'][-1].content}") validation_result = extract_json_from_llm_response(correction_response["messages"][-1].content) #strip_noise(correction_response["messages"][-1].content)) # Update state with feedback and improvement flag state["plan_validation_feedback"] = validation_result.get("feedback", "No specific feedback provided.") state["needs_improvement"] = validation_result.get("needs_improvement", False) suggested_description_updates = validation_result.get("suggested_description_updates", "") if suggested_description_updates: # You might want to append or intelligently merge this with the existing detailed_game_description # For simplicity, let's just append for now or update a specific field #current_description = state.get("detailed_game_description", state.get("description", "")) current_description = state.get("description", "") # Corrected to get from 'description' key state["description"] = f"{current_description}\n\nSuggested Update: {suggested_description_updates}" logger.info("Updated detailed game description based on validation feedback.") # Manage iteration count if state["needs_improvement"]: state["iteration_count"] = current_iteration + 1 # The check for MAX_IMPROVEMENT_ITERATIONS is now done at the beginning of the function else: state["iteration_count"] = 0 # Reset if no more improvement needed logger.info(f"Verification completed. Needs Improvement: {state['needs_improvement']}. Feedback: {state['plan_validation_feedback'][:100]}...") print(f"[updated action_plan after verification] on ({current_iteration}): {json.dumps(state.get('action_plan', {}), indent=2)}") #with open("debug_state.json", "w", encoding="utf-8") as f: # json.dump(state, f, indent=2, ensure_ascii=False) return state except Exception as e: logger.error(f"Error in VerificationNode: {e}") state["needs_improvement"] = False # Force end loop on error state["plan_validation_feedback"] = f"Validation error: {e}" raise # Node 8: Refine planner node def refined_planner_node(state: GameState): """ Refines the action plan based on validation feedback and game description. """ logger.info("--- Running RefinedPlannerNode ---") #detailed_game_description = state.get("detailed_game_description", state.get("description", "A game.")) detailed_game_description = state.get("description","") current_action_plan = state.get("action_plan", {}) print(f"[current_action_plan before refinement] on ({state.get('iteration_count', 0)}): {json.dumps(current_action_plan, indent=2)}") plan_validation_feedback = state.get("plan_validation_feedback", "No specific feedback provided. Assume general refinement is needed.") project_json = state["project_json"] target_names = [t["name"] for t in project_json["targets"]] # MODIFICATION 2: Get sprite positions, providing default for Stage as it doesn't have x,y sprite_positions = {} for target in project_json["targets"]: if not target["isStage"]: sprite_positions[target["name"]] = {"x": target.get("x", 0), "y": target.get("y", 0)} else: sprite_positions[target["name"]] = {"x": "N/A", "y": "N/A"} # Stage doesn't have positional coordinates declaration_plan = state["declaration_plan"] refinement_prompt = f"""Refine the existing action plan for the game's sprites based on the detailed game description and the validation feedback provided. **Game Description:** '{detailed_game_description}' **Targets in Game (Sprites and Stage):** {', '.join(target_names)} **Current Target Positions (Sprites have x/y, Stage is N/A):** {json.dumps(sprite_positions)} Here is the overall declaration of variable, broadcast and monitors to look for and utilized as per requirment. **Current Declaration Plan:** {json.dumps(declaration_plan)} **Validation Feedback:** '{plan_validation_feedback}' --- Scratch 3.0 Block Reference --- ### Hat Blocks Description: {hat_description} Blocks: {hat_opcodes_functionalities} ### Boolean Blocks Description: {boolean_description} Blocks: {boolean_opcodes_functionalities} ### C Blocks Description: {c_description} Blocks: {c_opcodes_functionalities} ### Cap Blocks Description: {cap_description} Blocks: {cap_opcodes_functionalities} ### Reporter Blocks Description: {reporter_description} Blocks: {reporter_opcodes_functionalities} ### Stack Blocks Description: {stack_description} Blocks: {stack_opcodes_functionalities} ----------------------------------- * **Your task is to align to description, refine and correct the JSON object 'action_overall_flow'.** Use sprite names exactly as provided in `sprite_names` (e.g., 'Sprite1', 'soccer ball'); and also the stage, do **NOT** rename them. 1. **'event'**: the exact `opcode` of the hat block that initiates the logic. 2. **'logic'**: a natural language breakdown of each step taken after the event, formatted as a multi-line string representing pseudo-code. Ensure clarity and granularity—each described action should map closely to a Scratch block or tight sequence. - Use 'forever: ...' or 'repeat(10): ...' to prefix repeating logic suitable taking reference from the C blocks. - Use Scratch-consistent verbs: 'move', 'change', 'wait', 'hide', 'show', 'say', 'glide', etc. - For coordinates or numeric values within the logic string, use them directly (e.g., 'x: 240 y: -100' or 'change y by 10'). The entire 'logic' field itself must be a single string. 3. **Opcode Lists**: include relevant Scratch opcodes grouped under `motion`, `control`, `operator`, `sensing`, `looks`, `sounds`, `events`, and `data`. List only the non-empty categories. Use exact opcodes including shadow/helper blocks (e.g., 'math_number'). 4. Few Example of content of logics inside for a specific plan: - example 1[continues moving objects]: "when green flag clicked go to x: 240 y: -100 // Start off to the right, on the ground set [obstacle speed v] to -5 show forever change x by (obstacle speed) if <(x position) < -240> then // If off-screen left go to x: 240 y: -100 // Reset to right again end end " - example 2[jumping script of an plan]: "when [space v] key pressed if <(y position) = -100> then // Only jump if on the ground repeat (5) change y by (100) // Wave paw wait (0.1) seconds change y by (-100) // Back to normal wait (0.1) seconds end end " 5. Use target names exactly as listed in `Targets in Game`. Do NOT rename or invent new targets. 6. Ensure the plan reflects accurate opcode usage derived strictly from the block reference above. 7. Few shot Example structure for 'action_overall_flow': ```json {{ "action_overall_flow": {{ "Stage": {{ "description": "Background and global game state management, including broadcasts, rewards, and score.", "plans": [ {{ "event": "event_whenflagclicked", "logic": "when green flag clicked\n switch backdrop to [backdrop1 v]\n set [score v] to 0\n show variable [score v]\n broadcast [Game Start v]", "motion": [], "control": [], "operator": [], "sensing": [], "looks": [ "looks_switchbackdropto" ], "sounds": [], "events": [ "event_broadcast" ], "data": [ "data_setvariableto", "data_showvariable" ] }}, {{ "event": "event_whenbroadcastreceived", "logic": "when I receive [Game Over v]\n if <(score) > (High Score)> then\n set [High Score v] to (score)\n end\n switch backdrop to [HighScore v]", "motion": [], "control": [ "control_if" ], "operator": [ "operator_gt" ], "sensing": [], "looks": [ "looks_switchbackdropto" ], "sounds": [], "events": [], "data": [ "data_setvariableto" ] }} ] }}, "Sprite1": {{ "description": "Main character (cat) actions", "plans": [ {{ "event": "event_whenflagclicked", "logic": "when green flag clicked\n go to x: 240 y: -100\n end\n.", "motion": [ "motion_gotoxy" ], "control": [], "operator": [], "sensing": [], "looks": [], "sounds": [], "events": [], "data": [] }}, {{ "event": "event_whenkeypressed", "logic": "when [space v] key pressed\n repeat (10)\n change y by (20)\n wait (0.1) seconds\n change y by (-20)\n end", "motion": [ "motion_changeyby" ], "control": [ "control_repeat", "control_wait" ], "operator": [], "sensing": [], "looks": [], "sounds": [], "events": [], "data": [] }} ] }}, "soccer ball": {{ "description": "Obstacle movement and interaction", "plans": [ {{ "event": "event_whenflagclicked", "logic": "when green flag clicked\n go to x: 240 y: -135\n forever\n glide 2 seconds to x: -240 y: -135\n if <(x position) < -235> then\n set x to 240\n end\n if then\n broadcast [Game Over v]\n stop [all v]\n end\n end", "motion": [ "motion_gotoxy", "motion_glidesecstoxy", "motion_xposition", "motion_setx" ], "control": [ "control_forever", "control_if", "control_stop" ], "operator": [ "operator_lt" ], "sensing": [ "sensing_istouching", "sensing_touchingobjectmenu" ], "looks": [], "sounds": [], "events": [ "event_broadcast" ], "data": [] }} ] }} }} }} ``` 8. Use the validation feedback to address errors, fill in missing logic, or enhance clarity. example of few possible improvements: 1.event_whenflagclicked is used to control sprite but its used for actual start scratch project and reset scratch. 2. looping like forever used where we should use iterative. 3. missing of for variable we used in the block - Maintain the **exact JSON structure** shown above. - All `logic` fields must be **clear and granular**. - Only include opcode categories that contain relevant opcodes. - Ensure that each opcode matches its intended Scratch functionality. - If feedback suggests major change, **rethink the entire plan** for the affected sprite(s). - If feedback is minor, make precise, minimal improvements only. """ try: response = agent.invoke({"messages": [{"role": "user", "content": refinement_prompt}]}) raw_response = response["messages"][-1].content#strip_noise(response["messages"][-1].content) logger.info(f"Raw response from LLM [RefinedPlannerNode]: {raw_response[:500]}...") # json debugging and solving try: refined_plan = extract_json_from_llm_response(raw_response) except json.JSONDecodeError as error_json: logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") # Use the JSON resolver agent to fix the response correction_prompt = ( "Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n" "Carefully review the JSON for any errors, especially focusing on the reported error at:\n" f"- **Error Details**: {error_json}\n\n" "**Strict Instructions for your response:**\n" "1. **ONLY** output the corrected JSON. Do not include any other text, comments, or explanations outside the JSON.\n" "2. Ensure all property names (keys) are enclosed in **double quotes**.\n" "3. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\`) are properly **escaped**.\n" "4. IN `logic` field make sure content enclosed in **double quotes** should not have invalid **double quotes**, **eliminate** all quotes inside the content if any. " "4. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n" "5. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n" "6. **Crucially, remove any extraneous characters or duplicate closing braces outside the main JSON object.**\n" # Added instruction "7. The corrected JSON must be a **complete and valid** JSON object.\n\n" "Here is the problematic JSON string to correct:\n" "```json\n" f"{raw_response}\n" "```\n" "Corrected JSON:\n" ) correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) print(f"[JSON CORRECTOR RESPONSE AT REFINEPLANNER ]: {correction_response['messages'][-1].content}") refined_plan = extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content)) logger.info("Refined plan corrected by JSON resolver agent.") if refined_plan: #state["action_plan"] = refined_plan.get("action_overall_flow", {}) # Update to the key 'action_overall_flow' [error] state["action_plan"] = refined_plan.get("action_overall_flow", {}) # Update the main the prompt includes updated only logger.info("Action plan refined by RefinedPlannerNode.") else: logger.warning("RefinedPlannerNode did not return a valid 'action_overall_flow' structure. Keeping previous plan.") print("[Refined Action Plan]:", json.dumps(state["action_plan"], indent=2)) #print("[current state after refinement]:", json.dumps(state, indent=2)) #with open("debug_state.json", "w", encoding="utf-8") as f: # json.dump(state, f, indent=2, ensure_ascii=False) return state except Exception as e: logger.error(f"Error in RefinedPlannerNode: {e}") raise # Node 9:plan with exact count of the opcode used per logic def plan_opcode_counter_node(state: Dict[str, Any]) -> Dict[str, Any]: """ For each plan in state["action_plan"]["action_overall_flow"], calls the LLM agent to analyze the `logic` string and return a list of {opcode, count} for each category. """ logger.info("=== Running OPCODE COUTER LOGIC with LLM counts ===") game_description = state.get("description", "No game description provided.") sprite_name = {} project_json = state["project_json"] for target in project_json["targets"]: sprite_name[target["name"]] = target["name"] action_flow = state.get("action_plan", state.get("action_plan", {}).get("action_overall_flow", {})) # if not action_flow: # logger.warning("No action_overall_flow found; skipping.") # return state all_catalogs = [ hat_block_data, boolean_block_data, c_block_data, cap_block_data, reporter_block_data, stack_block_data ] if action_flow.get("action_overall_flow", {}) == {}: plan_data = action_flow.items() else: plan_data = action_flow.get("action_overall_flow", {}).items() refined_flow: Dict[str, Any] = {} for sprite, sprite_data in plan_data: refined_plans = [] for plan in sprite_data.get("plans", []): logic = plan.get("logic", "") event = plan.get("event", "") # Pre‑extract the candidate opcodes per category opcodes = { "motion": plan.get("motion", []), "control": plan.get("control", []), "operator": plan.get("operator", []), "sensing": plan.get("sensing", []), "looks": plan.get("looks", []), "sounds": plan.get("sounds", []), "events": plan.get("events", []) + ([event] if isinstance(event, str) else []), "data": plan.get("data", []), } # Build a prompt to the LLM, providing the logic and asking for opcode counts refinement_prompt = f""" You are a Scratch 3.0 expert with deep knowledge of block types, nesting and stack relationships. Your job: read the plan logic below and decide exactly which blocks (and how many of each) are required to implement it. Review the following plan for '{sprite}' triggered by '{event}'. Game Description: {game_description} --- Scratch 3.0 Block Reference --- ### Hat Blocks Description: {hat_description} Blocks: {hat_opcodes_functionalities} ### Boolean Blocks Description: {boolean_description} Blocks: {boolean_opcodes_functionalities} ### C Blocks Description: {c_description} Blocks: {c_opcodes_functionalities} ### Cap Blocks Description: {cap_description} Blocks: {cap_opcodes_functionalities} ### Reporter Blocks Description: {reporter_description} Blocks: {reporter_opcodes_functionalities} ### Stack Blocks Description: {stack_description} Blocks: {stack_opcodes_functionalities} ----------------------------------- Current Plan Details: - Event (Hat Block Opcode): {event} - Associated Opcodes by Category: {json.dumps(opcodes, indent=2)} ── Game Context ── Sprite: "{sprite_name}" Description: {game_description} ── Current Plan ── Event (hat block): {event} Logic (pseudo‑Scratch): {logic} Plan : {plan} ── Opcode Candidates ── Motion: {opcodes["motion"]} Control: {opcodes["control"]} Operator: {opcodes["operator"]} Sensing: {opcodes["sensing"]} Looks: {opcodes["looks"]} Sounds: {opcodes["sounds"]} Events: {opcodes["events"]} Data: {opcodes["data"]} ── Your Task ── 1. Analyze the “Logic” steps and choose exactly which opcodes from each category are needed. 2. For each category, return a JSON list of objects: {{ "opcode": "", "count": }} 3. Return a top‑level JSON object with exactly these eight keys: "motion", "control", "operator", "sensing", "looks", "sounds", "events", "data". 4. Use only double quotes and ensure valid JSON. Example output: ```json {{ "event": "event_whenflagclicked", "logic": "when green flag clicked\n switch backdrop to [backdrop1 v]\n set [score v] to 0\n set [speed v] to 0\n show variable [score v]\n show variable [speed v]\n broadcast [Game Start v]", "motion": [{{"opcode":"motion_gotoxy","count":1}}], "control": [{{"opcode":"control_forever","count":1}},{{"opcode":"control_if","count":1}}], "operator": [], "sensing": [], "looks": [{{"opcode":"looks_switchbackdropto","count":1}}], "sounds": [], "events": [{{"opcode":"event_broadcast","count":1}}], "data": [{{"opcode":"data_setvariableto","count":2}},{{"opcode":"data_showvariable","count":2}}], }}, {{ "event": [{{"opcode":"event_whenflagclicked","count":1}}], "logic": "when green flag clicked\n go to x: 240 y: -135\n set [score v] to +1\n set [speed v] to +1\n show variable [score v]\n show variable [speed v]\n forever\n glide 2 seconds to x: -240 y: -135\n if <(x position) < -235> then\n set x to 240\n end\n if then\n broadcast [Game Over v]\n stop [all v]\n end\n end", "motion": [ {{"opcode":"motion_gotoxy","count":1}}, {{"opcode":"motion_glidesecstoxy","count":1}}, {{"opcode":"motion_xposition","count":1}}, {{"opcode":"motion_setx","count":1}}, ], "control": [ {{"opcode":"control_forever","count":1}}, {{"opcode":"control_if","count":1}}, {{"opcode":"control_stop","count":1}}, ], "operator": [ {{"opcode":"operator_lt","count":1}}, ], "sensing": [ {{"opcode":"sensing_istouching","count":1}}, {{"opcode":"sensing_touchingobjectmenu","count":1}}, ], "looks": [], "sounds": [], "events": [ {{"opcode":"event_broadcast","count":1}}, ], "data": [ {{"opcode":"data_setvariableto","count":2}}, {{"opcode":"data_showvariable","count":2}} ], }} ``` """ try: # Invoke the main agent for logic refinement and relationship identification response = agent.invoke({"messages": [{"role": "user", "content": refinement_prompt}]}) llm_output = response["messages"][-1].content #print(f"[RAW RESPONSE OPCODE COUTER LOGIC]: {response}") llm_json = extract_json_from_llm_response(llm_output) logger.info(f"Successfully analyse the opcode requirement for {sprite} - {event}.") except json.JSONDecodeError as error_json: logger.error(f"JSON Decode Error for {sprite} - {event}: {error_json}. Attempting correction.") # If JSON parsing fails, use the json resolver agent correction_prompt = ( "Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n" "It must be a JSON object with `refined_logic` (string) and `block_relationships` (array of objects).\n" f"- **Error Details**: {error_json}\n\n" "**Strict Instructions for your response:**\n" "1. **ONLY** output the corrected JSON. Do not include any other text or explanations.\n" "2. Ensure all keys and string values are enclosed in **double quotes**. Escape internal quotes (`\\`).\n" "3. No trailing commas. Correct nesting.\n\n" "Here is the problematic JSON string to correct:\n" f"```json\n{llm_output}\n```\n" "Corrected JSON:\n" ) try: correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) llm_json = extract_json_from_llm_response(correction_response["messages"][-1].content) logger.info(f"Successfully corrected JSON output for {sprite} - {event}.") except Exception as e_corr: logger.error(f"Failed to correct JSON output for {sprite} - {event} even after retry: {e_corr}") return state # Replace each category in the plan with the LLM’s output (or empty list fallback) plan["motion"] = llm_json.get("motion", llm_json.get("motions", [])) # handling of motion and motions by llm without re plan["control"] = llm_json.get("control", llm_json.get("controls", [])) # handling of control and controls by llm without re plan["operator"] = llm_json.get("operator", llm_json.get("operators", [])) # handling of operator and operators by llm without re plan["sensing"] = llm_json.get("sensing", []) plan["looks"] = llm_json.get("looks", llm_json.get("look", [])) # handling of look and looks by llm without re plan["sounds"] = llm_json.get("sounds", llm_json.get("sound", [])) # handling of sound and sounds by llm without re plan["events"] = llm_json.get("events", []) plan["data"] = llm_json.get("data", []) refined_plans.append(plan) refined_flow[sprite] = { "description": sprite_data.get("description", ""), "plans": refined_plans } state["temporary_node"] = refined_flow print(f"[OPCODE COUTER LOGIC]: {refined_flow}") #state["action_plan"]["action_overall_flow"] = refined_flow logger.info("=== OPCODE COUTER LOGIC completed ===") return state # Node 10: to refine plans and identify block relationships def block_relationship_node(state: GameState): logger.info("--- Running PlanBlockRelationNode ---") game_description = state.get("description", "No game description provided.") action_plan = state.get("action_plan", {}) # # Ensure action_plan exists and contains "action_overall_flow" # if not action_plan or "action_overall_flow" not in action_plan: # logger.warning("No 'action_overall_flow' found in state['action_plan']. Skipping plan refinement and block relation analysis.") # return state refined_overall_flow = {} if action_plan.get("action_overall_flow", {}) == {}: plan_data = action_plan.items() else: plan_data = action_plan.get("action_overall_flow", {}).items() for target_name, target_data in plan_data: refined_target_plans = [] for plan in target_data.get("plans", []): original_logic = plan.get("logic", "") event = plan.get("event", "unknown_event") opcodes = { "motion": plan.get("motion", []), "control": plan.get("control", []), "operator": plan.get("operator", []), "sensing": plan.get("sensing", []), "looks": plan.get("looks", []), "sounds": plan.get("sounds", []), "events": plan.get("events", []), "data": plan.get("data", []) } refinement_prompt = f""" You are an expert in Scratch 3.0 game development, specializing in understanding block relationships (stacked, nested). Review the following plan for '{target_name}' triggered by '{event}'. Game Description: {game_description} --- Scratch 3.0 Block Reference --- ### Hat Blocks Description: {hat_description} ### Boolean Blocks Description: {boolean_description} ### C Blocks Description: {c_description} ### Cap Blocks Description: {cap_description} ### Reporter Blocks Description: {reporter_description} ### Stack Blocks Description: {stack_description} ----------------------------------- Current Plan Details: - Event (Hat Block Opcode): {event} - Current Logic Description: "{original_logic}" - Associated Opcodes by Category: {json.dumps(opcodes, indent=2)} Your task is to: 1. **Identify Block Relationships**: Based on the 'Refined Logic' and 'Associated Opcodes', describe the relationships between the Scratch blocks implied by this plan. Identify if blocks are 'stacked' (one after another in a script) or 'nested' (one block plugged into another's input or within a C-block). For nested relationships, specify the 'parent' and 'child' or 'children_opcodes'. 2. Donot add any explaination of logic or relation or comments to justify or explain, just put the logic content in the json. Output a JSON object and [NOTE: use double quates always ""]: - `block_relationships`: An array of dictionaries, each describing a relationship. - `refined_logic`: A concise, refined description of the plan's logic, focusing on the sequence and nesting of blocks. Examples for `block_relationships`: - **Example 1: Simple Stacked Blocks** - Scenario: A sprite moves 10 steps, then says "Hello!" for 2 seconds. - Opcodes: ["motion_movesteps", "looks_sayforsecs"] - `refined_logic`: "The sprite moves forward and then says 'Hello!' for a duration." - `block_relationships`: [ {{ "type": "stacked", "blocks": ["motion_movesteps", "looks_sayforsecs"] }} ] - **Example 2: Nested C-block (Forever Loop) with Stacked Blocks inside** - Scenario: A sprite continuously moves 10 steps and then bounces if on edge. - Opcodes: ["control_forever", "motion_movesteps", "motion_ifonedgebounce"] - `refined_logic`: "The sprite forever moves and bounces off the edge if it touches." - `block_relationships`: [ {{ "type": "nested_c_block_body", "parent": "control_forever", "child_opcodes": ["motion_movesteps", "motion_ifonedgebounce"] }} ] - **Example 3: Nested Conditional (If-Then) with Boolean Input and Stacked Blocks** - Scenario: If the sprite is touching the mouse-pointer, it says "You found me!" - Opcodes: ["control_if", "sensing_touchingobject", "looks_say"] - `refined_logic`: "If the sprite touches the mouse pointer, it says a specific phrase." - `block_relationships`: [ {{ "type": "nested_input", "parent_opcode": "control_if", "input_slot_name": "CONDITION", "child_opcode": "sensing_touchingobject" }}, {{ "type": "nested_c_block_body", "parent": "control_if", "child_opcodes": ["looks_say"] }} ] - **Example 4: Nested Arithmetic Operation (Reporter Block) as Input** - Scenario: A sprite moves steps equal to its x-position plus 50. - Opcodes: ["motion_movesteps", "operator_add", "motion_xposition"] - `refined_logic`: "The sprite moves a number of steps calculated by adding 50 to its current x-position." - `block_relationships`: [ {{ "type": "nested_input", "parent_opcode": "motion_movesteps", "input_slot_name": "STEPS", "child_opcode": "operator_add" }}, {{ "type": "nested_input", "parent_opcode": "operator_add", "input_slot_name": "NUM1", "child_opcode": "motion_xposition" }}, {{ "type": "nested_literal_input", "parent_opcode": "operator_add", "input_slot_name": "NUM2", "value": "50" }} ] - **Example 5: Nested If-Else Block with Boolean Input** - Scenario: If score is greater than 10, say "High Score!", else say "Keep Playing!". - Opcodes: ["control_if_else", "operator_gt", "data_variable", "looks_say"] - `refined_logic`: "Checks if the score is high; provides different messages based on the condition." - `block_relationships`: [ {{ "type": "nested_input", "parent_opcode": "control_if_else", "input_slot_name": "CONDITION", "child_opcode": "operator_gt" }}, {{ "type": "nested_input", "parent_opcode": "operator_gt", "input_slot_name": "OPERAND1", "child_opcode": "data_variable" }}, {{ "type": "nested_literal_input", "parent_opcode": "operator_gt", "input_slot_name": "OPERAND2", "value": "10" }}, {{ "type": "nested_c_block_body", "parent": "control_if_else", "child_opcodes": ["looks_say"] }}, {{ "type": "nested_c_block_body", "parent": "control_if_else", "child_opcodes": ["looks_say"] }} ] Ensure the `block_relationships` array is comprehensive for this plan based on the detailed logic and opcodes. Output ONLY the JSON object. """ refined_logic = original_logic # Default to original block_relationships = [] # Default to empty try: # Invoke the main agent for logic refinement and relationship identification response = agent.invoke({"messages": [{"role": "user", "content": refinement_prompt}]}) llm_output_raw = response["messages"][-1].content parsed_llm_output = extract_json_from_llm_response(llm_output_raw) refined_logic = parsed_llm_output.get("refined_logic", original_logic) block_relationships = parsed_llm_output.get("block_relationships", []) logger.info(f"Successfully processed plan for {target_name} - {event}.") except json.JSONDecodeError as error_json: logger.error(f"JSON Decode Error for {target_name} - {event}: {error_json}. Attempting correction.") # If JSON parsing fails, use the json resolver agent correction_prompt = ( "Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n" "It must be a JSON object with `refined_logic` (string) and `block_relationships` (array of objects).\n" f"- **Error Details**: {error_json}\n\n" "**Strict Instructions for your response:**\n" "1. **ONLY** output the corrected JSON. Do not include any other text or explanations.\n" "2. Ensure all keys and string values are enclosed in **double quotes**. Escape internal quotes (`\\`).\n" "3. No trailing commas. Correct nesting.\n\n" "Here is the problematic JSON string to correct:\n" f"```json\n{llm_output_raw}\n```\n" "Corrected JSON:\n" ) try: correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) corrected_output = extract_json_from_llm_response(correction_response["messages"][-1].content) refined_logic = corrected_output.get("refined_logic", original_logic) block_relationships = corrected_output.get("block_relationships", []) logger.info(f"Successfully corrected JSON output for {target_name} - {event}.") except Exception as e_corr: logger.error(f"Failed to correct JSON output for {target_name} - {event} even after retry: {e_corr}") # Final fallback if correction fails refined_logic = original_logic block_relationships = [] except Exception as e: logger.error(f"General error invoking LLM for {target_name} - {event}: {e}") # Fallback to original logic and empty relationships on any other error refined_logic = original_logic block_relationships = [] plan["logic"] = refined_logic plan["block_relationships"] = block_relationships refined_target_plans.append(plan) refined_overall_flow[target_name] = {"description": target_data.get("description"), "plans": refined_target_plans} # Update the original action_plan in the state with the refined version state["action_plan"]["action_overall_flow"] = refined_overall_flow print(f"[OVREALL REFINED RELATIONSHIP]: {refined_overall_flow}") logger.info("Plan refinement and block relation analysis completed for all plans.") return state # Helper function to get a block by its opcode from a single catalog def get_block_by_opcode(catalog_data: dict, opcode: str) -> dict | None: """ Search a single catalog (with keys "description" and "blocks": List[dict]) for a block whose 'op_code' matches the given opcode. Returns the block dict or None if not found. """ for block in catalog_data["blocks"]: if block.get("op_code") == opcode: return block return None # Helper function to find a block in all catalogs by opcode def find_block_in_all(opcode: str, all_catalogs: list[dict]) -> dict | None: """ Search across multiple catalogs for a given opcode. Returns the first matching block dict or None. """ for catalog in all_catalogs: blk = get_block_by_opcode(catalog, opcode) if blk is not None: return blk return None # Node 11: Overall Block Builder Node def overall_block_builder_node(state: dict): logger.info("--- Running OverallBlockBuilderNode ---") project_json = state["project_json"] targets = project_json["targets"] monitors = project_json["monitors"] # --- Sprite and Stage Target Mapping --- sprite_map = {target["name"]: target for target in targets if not target["isStage"]} stage_target = next((target for target in targets if target["isStage"]), None) if stage_target: sprite_map[stage_target["name"]] = stage_target # --- Pre-load all block-catalog JSONs once --- all_catalogs = [ hat_block_data, boolean_block_data, c_block_data, cap_block_data, reporter_block_data, stack_block_data ] action_plan = state.get("action_plan", {}) print("[Overall Action Plan received at the block generator]:", json.dumps(action_plan, indent=2)) if not action_plan: logger.warning("No action plan found in state. Skipping OverallBlockBuilderNode.") return state # Initialize offsets for script placement on the Scratch canvas script_y_offset = {} script_x_offset_per_sprite = {name: 0 for name in sprite_map.keys()} # This handles potential variations in the action_plan structure. if action_plan.get("action_overall_flow", {}) == {}: plan_data = action_plan.items() else: plan_data = action_plan.get("action_overall_flow", {}).items() # --- Extract global project context for LLM --- all_sprite_names = list(sprite_map.keys()) all_variable_names = {} all_list_names = {} all_broadcast_messages = {} for target in targets: for var_id, var_info in target.get("variables", {}).items(): all_variable_names[var_info[0]] = var_id # Store name -> ID mapping (e.g., "myVariable": "myVarId123") for list_id, list_info in target.get("lists", {}).items(): all_list_names[list_info[0]] = list_id # Store name -> ID mapping for broadcast_id, broadcast_name in target.get("broadcasts", {}).items(): all_broadcast_messages[broadcast_name] = broadcast_id # Store name -> ID mapping # --- Process each sprite's action plan --- for sprite_name, sprite_actions_data in plan_data: if sprite_name in sprite_map: current_sprite_target = sprite_map[sprite_name] if "blocks" not in current_sprite_target: current_sprite_target["blocks"] = {} if sprite_name not in script_y_offset: script_y_offset[sprite_name] = 0 for plan_entry in sprite_actions_data.get("plans", []): event_opcode = plan_entry["event"] logic_sequence = plan_entry["logic"] block_relationship = plan_entry["block_relationships"] # Gather all opcodes expected to be used in this script needed_opcodes = [] needed_opcodes.extend(plan_entry.get("motion", plan_entry.get("motions", []))) # handling of motion and motions by llm without re needed_opcodes.extend(plan_entry.get("control", plan_entry.get("controls", []))) # handling of control and controls by llm without re needed_opcodes.extend(plan_entry.get("operator", plan_entry.get("operators", []))) # handling of operator and operators by llm without re needed_opcodes.extend(plan_entry.get("sensing", [])) needed_opcodes.extend(plan_entry.get("looks", plan_entry.get("look", []))) # handling of look and looks by llm without re needed_opcodes.extend(plan_entry.get("sounds", plan_entry.get("sound", []))) # handling of sound and sounds by llm without re needed_opcodes.extend(plan_entry.get("events", [])) # there is no need of that here needed_opcodes.extend(plan_entry.get("event", [])) # hat_block also required needed_opcodes.extend(plan_entry.get("data", [])) needed_opcodes = list(set(needed_opcodes)) # Remove duplicates # Merge human-written catalog and runtime catalog for comprehensive block definitions combined_blocks = {} for op in needed_opcodes: catalog_def = find_block_in_all(op, all_catalogs) or {} runtime_def = ALL_SCRATCH_BLOCKS_CATALOG.get(op, {}) merged = {**catalog_def, **runtime_def} combined_blocks[op] = merged print(f"[Combined blocks for this script for event {event_opcode}]: {json.dumps(combined_blocks, indent=2)}") #with open("debug_combine_blocks.json", "w", encoding="utf-8") as f: # json.dump(combined_blocks, f, indent=2, ensure_ascii=False) # --- LLM Block Generation Prompt (Self-Contained and Explicit) --- # All necessary instructions and context are directly included in the prompt. llm_block_generation_prompt = f"""You are an AI assistant generating a **single complete Scratch 3.0 script** in JSON format. The current sprite is '{sprite_name}'. This script must start with the event block (Hat Block) for opcode '{event_opcode}'. The sequential logic and the block relationship(e.g. stacking and nested logic) to implement for this script is: Logic: {logic_sequence} block relationships: {block_relationship} **Project Context:** Here is a summary of the current Scratch project's elements. You MUST reference these when generating blocks that interact with sprites, variables, lists, or broadcasts to use it in block where you requird. Do NOT invent new names if an existing one is suitable. - **All Sprite Names:** {json.dumps(all_sprite_names)} - **Existing Variables (Name: ID):** {json.dumps(all_variable_names)} **USE**: You can use this variable in blocks e.g. to update score & highscore, power ups, etc. - **Existing Lists (Name: ID):** {json.dumps(all_list_names)} **USE**: You can use this List variable in blocks e.g. to show inventory, tools, etc. - **Existing Broadcast Messages (Name: ID):** {json.dumps(all_broadcast_messages)} **USE** : you can use to broadcast messages in blocks e.g. game over, game start, next level, etc. **Required Block Opcodes & Catalog:** Based on the planning, the following specific Scratch block opcodes are expected to be used. You MUST use these opcodes where applicable. Here is the comprehensive catalog for these blocks, including their structure and required inputs/fields: ```json {json.dumps(combined_blocks, indent=2)} ```` Current Scratch project JSON for this sprite (provided for context; you are generating a NEW, complete script): ```json {json.dumps(current_sprite_target, indent=2)} ``` **CRITICAL INSTRUCTIONS FOR GENERATING THE BLOCK JSON (READ CAREFULLY AND FOLLOW PRECISELY):** 1. **Unique Block IDs:** Generate a **globally unique ID** for EVERY block (main and shadow blocks) within the entire JSON output for this script. Example: 'myBlockID123'. These IDs must be unique within the *entire* generated JSON for this script. 2. **Script Initiation (Hat Block - VERY IMPORTANT):** * The **first block** of the script (the Hat block, opcode '{event_opcode}') MUST have `"topLevel": true` and `"parent": null`. * ONLY this Hat block should have `"topLevel": true`. All other blocks in the script MUST have `"topLevel": false`. * Set its `x` and `y` coordinates (e.g., `x: 0, y: 0` or similar for clear placement). 3. **Strict Block Chaining (`next` and `parent` - ABSOLUTELY CRITICAL):** * The `next` field is used ONLY for connecting **Stack Blocks** (blocks that perform an action and can be stacked vertically). * **NEVER** set the `next` field of a block to point to a **Reporter Block** (blocks that report a value, like `operator_gt`, `data_variable`, `math_number`). Reporter blocks (used as nested always) are placed *inside* the `inputs` of other blocks. * Use the `next` field to point to the ID of the Stack Block DIRECTLY BELOW it in the stack. If a block has a `next`, its `next` block's `parent` MUST point back to the current block's ID. * Use the `parent` field to point to the ID of the Stack Block DIRECTLY ABOVE it in the stack (or the C-block that contains it for substacks). If a block has a `parent`, then that `parent` block's `next` MUST point to the current block's ID (unless it's the first block in a substack, in which case the C-block's `SUBSTACK` input points to it). * The **last stack block** in a linear stack MUST have `"next": null`. * Blocks plugged into inputs (e.g., a reporter block into a number input) or substacks (e.g., inside a `control_forever` block) DO NOT use `next` to connect to their *parent* block; their connection is solely via the `inputs` field of their parent. They *do* use `next` for subsequent blocks *within their own stack*. 4. **`inputs` Field Structure (ABSOLUTELY CRITICAL - ADHERE TO THIS RIGIDLY):** * The value for ANY key within the `inputs` dictionary MUST be an **array of EXACTLY two elements**: `[type_code, value_or_block_id]`. * **`type_code` Guidance:** * `1`: Use when connecting a block (e.g., a reporter block, boolean block, or stack block if the input expects it) to an input slot. This is the most common `type_code` when referencing a separate block ID. * `2`: Use specifically for C-shaped substacks (e.g., the `SUBSTACK` input of `control_forever`, `control_if`, `control_repeat`). The second element is the ID of the first block within that substack. * **NEVER embed direct primitive values or nested arrays** inside the second element of `inputs`. For example, **FORBID** `["STEPS": [1, ["math_number","10"]]]`. This is incorrect. * **ALWAYS** generate a separate shadow block with its own unique ID for every literal value (numbers, strings, booleans) or dropdown/menu selection that is an input. Reference that shadow block ID in the parent `inputs`. * **Correct Example for Numerical Input (e.g., for `motion_movesteps` STEPS, or `motion_gotoxy` X/Y):** * If you need to input the number `10` into a block, you MUST create a separate `math_number` shadow block for it, and then reference its ID. ```json // Main block using the number "mainBlockID": {{ "opcode": "motion_movesteps", "inputs": {{ "STEPS": [1, "shadowNumID"] }}, "next": "nextBlockID", "parent": "parentBlockID", "shadow": false, "topLevel": false }}, // The separate shadow block for the number '10' "shadowNumID": {{ "opcode": "math_number", "fields": {{ "NUM": ["10", null] }}, "shadow": true, "parent": "mainBlockID", "topLevel": false }} ``` * **Correct Example for Dropdown/Menu Input (e.g., `sensing_touchingobject` with 'edge'):** * If you need to select 'edge' for the `touching ()?` block, you MUST create a separate `sensing_touchingobjectmenu` shadow block and reference its ID. ```json // Main block using the dropdown selection "touchingBlockID": {{ "opcode": "sensing_touchingobject", "inputs": {{ "TOUCHINGOBJECTMENU": [1, "shadowEdgeMenuID"] }}, "next": null, "parent": "someParentBlockID", "shadow": false, "topLevel": false }}, // The separate shadow block for the 'edge' menu option "shadowEdgeMenuID": {{ "opcode": "sensing_touchingobjectmenu", "fields": {{ "TOUCHINGOBJECTMENU": ["_edge_", null] }}, "shadow": true, "parent": "touchingBlockID", "topLevel": false }} ``` * **Correct Example for C-block (e.g., `control_forever`):** * The `control_forever` block MUST have a `SUBSTACK` input pointing to the first block inside its loop. The value for `SUBSTACK` must be an array: `[2, "FIRST_BLOCK_IN_FOREVER_LOOP_ID"]`. ```json "foreverBlockID": {{ "opcode": "control_forever", "inputs": {{ "SUBSTACK": [2, "firstBlockInsideForeverID"] }}, "next": null, "parent": "blockAboveForeverID", "shadow": false, "topLevel": false }}, "firstBlockInsideForeverID": {{ // ... definition of the first block inside the forever loop "parent": "foreverBlockID", "next": "secondBlockInsideForeverID" // if there's another block }} ``` * **Correct Example for C-block with condition (e.g., `control_if`):** * The `control_if` block MUST have a `CONDITION` input (typically `type_code: 1` referencing a boolean reporter block) and a `SUBSTACK` input (`type_code: 2` referencing the first block inside the if-body). ```json "ifBlockID": {{ "opcode": "control_if", "inputs": {{ "CONDITION": [1, "conditionBlockID"], "SUBSTACK": [2, "firstBlockInsideIfID"] }}, "next": "blockAfterIfID", "parent": "blockAboveIfID", "shadow": false, "topLevel": false }}, "conditionBlockID": {{ "opcode": "sensing_touchingobject", // Example condition block // ... definition for condition block, parent should be "ifBlockID" "shadow": false, "topLevel": false }}, "firstBlockInsideIfID": {{ // ... definition of the first block inside the if body "parent": "ifBlockID", "next": null // or next block if more }} ``` * **Correct Example for Broadcast Blocks (`event_broadcast`, `event_broadcast_and_wait`):** * The main broadcast block (e.g., `event_broadcast`) has an input named `"BROADCAST_INPUT"`. Its value must be `[1, "ID_OF_BROADCAST_MENU_SHADOW_BLOCK"]`. * The corresponding shadow block (`event_broadcast_menu`) for selecting the message has a `fields` dictionary with a key named `"BROADCAST"`. The value for `"BROADCAST"` must be an array `["Message Name", "MESSAGE_ID_FROM_TARGET_BROADCASTS"]`. * If the broadcast message does not exist in `all_broadcast_messages`, generate a *new unique ID* for it within your generated blocks and use the desired message name. The external system will be responsible for registering this new broadcast in the overall Scratch project JSON. * Example for a broadcast named "Game Over" with a hypothetical ID: ```json "myBroadcastBlockID": {{ "opcode": "event_broadcast", "inputs": {{ "BROADCAST_INPUT": [1, "myBroadcastMenuShadowID"] }}, "next": null, "parent": "someParentBlockID", "shadow": false, "topLevel": false }}, "myBroadcastMenuShadowID": {{ "opcode": "event_broadcast_menu", "fields": {{ "BROADCAST": ["Game Over", "my_game_over_message_ID_from_target"] }}, "shadow": true, "parent": "myBroadcastBlockID", "topLevel": false }} ``` * **Correct Example for Sensing Blocks (`sensing_touchingobject`, `sensing_touchingcolor`, etc.):** * For blocks like `sensing_touchingobject`, the input is typically `"TOUCHINGOBJECTMENU"`. Its value must be `[1, "ID_OF_SENSING_MENU_SHADOW_BLOCK"]`. * The corresponding shadow block (e.g., `sensing_touchingobjectmenu`) has a `fields` dictionary with a key matching the input name (e.g., `"TOUCHINGOBJECTMENU"`). The value for this field must be an array `["_mouse_", null]` or `["_edge_", null]` for specific options, or `["SpriteName", "SPRITE_ID_FROM_TARGETS"]` for other sprites. * Ensure the string value in the `fields` array for these menus matches the exact Scratch internal string for the option (e.g., `"_edge_"` not `"edge"` unless specifically allowed). Refer to `ALL_SCRATCH_BLOCKS_CATALOG` for exact field values. * Example for touching 'edge': ```json "myTouchingBlockID": {{ "opcode": "sensing_touchingobject", "inputs": {{ "TOUCHINGOBJECTMENU": [1, "myTouchingObjectMenuShadowID"] }}, "next": null, "parent": "someParentBlockID", "shadow": false, "topLevel": false }}, "myTouchingObjectMenuShadowID": {{ "opcode": "sensing_touchingobjectmenu", "fields": {{ "TOUCHINGOBJECTMENU": ["_edge_", null] }}, "shadow": true, "parent": "myTouchingBlockID", "topLevel": false }} ``` * **Correct Example for Clone Blocks (`control_create_clone_of`, `control_when_start_as_clone`, `control_delete_this_clone`):** * `control_create_clone_of` has an input named `"CLONE_TARGET"`. Its value must be `[1, "ID_OF_CLONE_TARGET_MENU_SHADOW_BLOCK"]`. * The corresponding shadow block (`control_create_clone_of_menu`) has a `fields` dictionary with a key named `"CLONE_TARGET"`. The value for `"CLONE_TARGET"` must be an array `["_myself_", null]` or `["SpriteName", "SPRITE_ID_FROM_TARGETS"]`. * `control_when_start_as_clone` is a hat block and thus `topLevel: true`, `parent: null`, and `next: null`. * `control_delete_this_clone` is a stack block and will have `next` and `parent` connections. * Example for creating a clone of 'myself': ```json "myCloneBlockID": {{ "opcode": "control_create_clone_of", "inputs": {{ "CLONE_TARGET": [1, "myCloneTargetMenuShadowID"] }}, "next": null, "parent": "someParentBlockID", "shadow": false, "topLevel": false }}, "myCloneTargetMenuShadowID": {{ "opcode": "control_create_clone_of_menu", "fields": {{ "CLONE_TARGET": ["_myself_", null] }}, "shadow": true, "parent": "myCloneBlockID", "topLevel": false }} ``` 5. **Define ALL Shadow Blocks Separately (THIS IS ESSENTIAL):** Every time a block's input requires a number, string literal, or a selection from a dropdown/menu, you MUST define a **separate block entry** in the top-level blocks dictionary for that shadow. Each shadow block MUST have `"shadow": true` and `"topLevel": false`. 6. **`fields` for direct values, Variables, and Lists:** * Use the `fields` dictionary ONLY IF the block directly embeds a dropdown value, a text field, a variable reference, or a list reference without an `inputs` connection. * The value for a field is typically an array `["Value", null]` (for simple text/dropdowns) or `["Name", "ID"]` (for variables, lists, or broadcast messages). * **For Variables and Lists:** When referencing an existing variable or list (from `all_variable_names` or `all_list_names`), the `fields` entry MUST be `\"VARIABLE\": [\"VariableName\", \"VariableID\"]` (for a variable) or `\"LIST\": [\"ListName\", \"ListID\"]` (for a list). If the `logic_sequence` implies the use of a variable or list that does not exist, you *must* generate a *new unique ID* for it within your generated blocks, and use its name in the `fields` as specified. The external system (e.g., the `block_builder` or `project_json` integration) will be responsible for registering this new element in the overall Scratch project JSON. * **Example for using an existing Variable (e.g., `data_setvariableto`):** ```json "setVariableBlockID": {{ "opcode": "data_setvariableto", "fields": {{ "VARIABLE": ["myVariable", "myVariableID_from_project_context"] }}, "inputs": {{ "VALUE": [1, "shadowValueID"] }}, "next": null, "parent": "someParentBlockID", "shadow": false, "topLevel": false }}, "shadowValueID": {{ "opcode": "math_number", "fields": {{ "NUM": ["0", null] }}, "shadow": true, "parent": "setVariableBlockID", "topLevel": false }} ``` * **Example for referencing a Variable (e.g., `data_variable` reporter block):** ```json "variableReporterID": {{ "opcode": "data_variable", "fields": {{ "VARIABLE": ["myVariable", "myVariableID_from_project_context"] }}, "shadow": false, "parent": "blockUsingReporterID", "topLevel": false }} ``` 7. **`topLevel: true` for hat blocks:** Only the starting (hat) block of a script should have `"topLevel": true`. 8. **Ensure Unique Block IDs:** Every block you generate (main blocks and shadow blocks) must have a unique ID within the entire script's block dictionary. 9. **Strictly Use Catalog Opcodes:** You MUST only use `opcode` values that are present in the provided `ALL_SCRATCH_BLOCKS_CATALOG`. Do NOT use unlisted opcodes like `motion_jump`. 10. **Return ONLY the JSON object representing all the blocks for THIS SINGLE SCRIPT.** Do NOT wrap it in a 'blocks' key or the full project JSON. The output should be a dictionary where keys are block IDs and values are block definitions. You need to ensure that the generated blocks fulfill the `logic_sequence` for the `event_opcode`. """ try: response = agent.invoke({"messages": [{"role": "user", "content": llm_block_generation_prompt}]}) raw_response = response["messages"][-1].content logger.info(f"Raw response from LLM [OverallBlockBuilderNode - {sprite_name} - {event_opcode}]: {raw_response[:500]}...") print(f"Raw response from LLM [OverallBlockBuilderNode - {sprite_name} - {event_opcode}]: {raw_response}") try: generated_blocks = extract_json_from_llm_response(raw_response) except json.JSONDecodeError as error_json: logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") # --- JSON Correction Agent --- correction_prompt = ( "Your primary goal is to generate JSON that precisely defines Scratch 3.0 blocks. " "The output MUST be a single, valid JSON object, enclosed within a '```json' markdown block.\n" "**STRICT JSON SYNTAX RULES:**\n" "1. **NO EXTRA TEXT, COMMENTS, OR CONVERSATIONAL FILLER.** Only the JSON.\n" "2. All keys MUST be enclosed in double quotes.\n" "3. String values MUST be enclosed in double quotes. Internal double quotes must be escaped (e.g., `\\\"`). Newlines (`\\n`) and carriage returns (`\\r`) in strings must be escaped.\n" "4. No trailing commas.\n" "5. Correct nesting of braces `{}` and brackets `[]`.\n" "6. **CRITICAL: Understand and strictly follow the Scratch block input array format.**\n" " - When an input refers to another top-level block (like a variable block or a number block that is defined separately):\n" " `\"INPUT_NAME\": [1, \"ID_OF_REFERENCED_BLOCK\"]`\n" " - When an input *directly embeds* a literal value (like a number, string, or boolean shadow block) as per the Scratch schema (which is less common for simple literals but necessary for certain types):\n" " `\"INPUT_NAME\": [1, [\"math_number\", \"10\"]]` (for numbers, as a shadow block definition)\n" " `\"INPUT_NAME\": [1, [\"text\", \"hello\"]]` (for strings, as a shadow block definition)\n" " `\"INPUT_NAME\": [1, [\"boolean\", true]]` (for booleans, as a shadow block definition)\n" " **However, for most literal inputs, the standard is to define a separate shadow block and reference its ID.** Refer to the earlier prompt for the preferred method using separate shadow blocks. The nested array format `[1, [\"opcode\", \"value\"]]` should generally be avoided unless specifically indicated by the Scratch schema for certain block types where the shadow block is *always* an implicit part of the input. **For clarity and consistency, prioritize creating separate shadow blocks and referencing their IDs in the input array as `[1, \"ShadowBlockID\"]`.**\n" " - **Do NOT** generate patterns like `some_id[\"value\", null]]` or `\"\"\": [1, some_id[\"\", null]]`.\n" " - Ensure `\"X\"` and `\"Y\"` inputs for `motion_gotoxy` and `motion_glidesecstoxy` correctly reference a block ID (usually a math_number shadow block).\n" "7. The 'logic' field should contain a plain string describing the action, without any embedded JSON or Scratch block syntax (like `\"forever\":\"` which breaks the string).\n\n" "Here is the problematic JSON string that needs correction, including the error details:\n" f"- **Error Details**: {error_json}\n" "```json\n" f"{raw_response}\n" "```\n" "Your corrected, valid JSON response:\n" "```json\n" ) correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) print(f"[JSON CORRECTOR RESPONSE AT OVERALLBLOCKBUILDER ]: {correction_response['messages'][-1].content}") generated_blocks = extract_json_from_llm_response(correction_response["messages"][-1].content) if "blocks" in generated_blocks and isinstance(generated_blocks["blocks"], dict): logger.warning(f"LLM returned nested 'blocks' key for {sprite_name}. Unwrapping.") generated_blocks = generated_blocks["blocks"] # Update block positions for top-level script for block_id, block_data in generated_blocks.items(): if block_data.get("topLevel"): block_data["x"] = script_x_offset_per_sprite.get(sprite_name, 0) block_data["y"] = script_y_offset[sprite_name] script_y_offset[sprite_name] += 150 # Increment for next script current_sprite_target["blocks"].update(generated_blocks) state["iteration_count"] = 0 logger.info(f"Action blocks added for sprite '{sprite_name}', script '{event_opcode}' by OverallBlockBuilderNode.") except Exception as e: logger.error(f"Error generating blocks for sprite '{sprite_name}', script '{event_opcode}': {e}") raise state["project_json"] = project_json logger.info("Updated project JSON with action nodes.") #with open("debug_state.json", "w", encoding="utf-8") as f: # json.dump(state, f, indent=2, ensure_ascii=False) print("Updated project JSON with action nodes:", json.dumps(project_json, indent=2)) return state #helper function to identify the shape of block utilized by the block verifier def get_block_type(opcode: str) -> str: """Determines the general type of a Scratch block based on its opcode.""" if not opcode: return "unknown" if opcode.startswith("event_when") or opcode == "control_start_as_clone": return "hat" elif opcode.startswith("control_") and ("if" in opcode or "repeat" in opcode or "forever" in opcode): return "c_block" elif opcode in ["operator_equals", "operator_gt", "operator_lt", "operator_and", "operator_or", "operator_not"] or \ (opcode.startswith("sensing_") and ("mousedown" in opcode or "keypressed" in opcode or "touching" in opcode)): return "boolean" elif opcode.endswith("menu"): # For dropdown shadow blocks, treat as reporter for type checking return "reporter" elif any(s in opcode for s in ["position", "direction", "size", "volume", "costume", "backdrop", "random", "add", "subtract", "multiply", "divide", "length", "item", "of"]): # A more comprehensive check for reporters return "reporter" elif "stop_all" in opcode or "delete_this_clone" in opcode or "procedures_definition" in opcode: return "cap" # Default to stack for most command blocks if not explicitly defined return ALL_SCRATCH_BLOCKS_CATALOG.get(opcode, {}).get("blockType", "stack") def filter_script_blocks(all_blocks: dict, hat_block_id: str) -> dict: """ Filters and returns only the blocks that are part of a specific script starting from the given hat_block_id, including connected reporters/shadows. """ script_blocks = {} q = [hat_block_id] visited = set() while q: current_block_id = q.pop(0) if current_block_id in visited: continue visited.add(current_block_id) block_data = all_blocks.get(current_block_id) if not block_data: continue script_blocks[current_block_id] = block_data # Add next block in sequence (only for stack/c-blocks) if get_block_type(block_data.get("opcode")) in ["stack", "c_block", "hat"]: # Hat blocks have next too! next_id = block_data.get("next") if next_id and next_id in all_blocks: q.append(next_id) # Add blocks connected via inputs (e.g., reporters, shadow blocks, substacks) if "inputs" in block_data: for input_name, input_value in block_data["inputs"].items(): if isinstance(input_value, list) and len(input_value) >= 2: value_or_block_id = input_value[1] if isinstance(value_or_block_id, str) and value_or_block_id in all_blocks: q.append(value_or_block_id) # For type code 3 (reporter with default value), the third element might be a connected block if len(input_value) >= 3 and isinstance(input_value[2], str) and input_value[2] in all_blocks: q.append(input_value[2]) # For C-blocks, add blocks in substacks (if present) if get_block_type(block_data.get("opcode")) == "c_block": for input_key, input_val in block_data.get("inputs", {}).items(): # Check for substack inputs (type code 2, typically starts with SUBSTACK) if isinstance(input_val, list) and len(input_val) >= 2 and input_val[0] == 2 and isinstance(input_val[1], str) and input_val[1] in all_blocks: q.append(input_val[1]) return script_blocks def analyze_script_structure( script_blocks: dict, hat_block_id: str, sprite_name: str, sprite_variables: dict, sprite_lists: dict, sprite_broadcasts: dict, sprite_custom_blocks: dict # proccode -> block_id mapping ) -> list: """ Analyzes the structure of a single Scratch script for common errors. Returns a list of issue strings. """ issues = [] # 1. Validate the hat block hat_block = script_blocks.get(hat_block_id) if not hat_block: issues.append(f"Script for sprite '{sprite_name}' (hat ID: {hat_block_id}) has no hat block data.") return issues # Cannot proceed without a hat block if not hat_block.get("topLevel") or hat_block.get("parent") is not None: issues.append(f"Hat block '{hat_block_id}' for sprite '{sprite_name}' is not marked as topLevel or has a parent, which is incorrect for a script start.") # Hat blocks can and do have a 'next' connection to the first block in their script. # The previous check was a false positive. Removed the explicit "should not have next" check for hat blocks. # 2. Check all blocks within the script for block_id, block_data in script_blocks.items(): opcode = block_data.get("opcode") if not opcode: issues.append(f"Block '{block_id}' for sprite '{sprite_name}' is missing an opcode.") continue # Check if opcode exists in the catalog if opcode not in ALL_SCRATCH_BLOCKS_CATALOG: issues.append(f"Block '{block_id}' for sprite '{sprite_name}' has unknown opcode '{opcode}'. Recommendation: Verify against Scratch 3.0 block reference.") # Parent-Child and Next-Previous Linkage parent_id = block_data.get("parent") next_id = block_data.get("next") current_block_type = get_block_type(opcode) if parent_id: parent_block = script_blocks.get(parent_id) if not parent_block: issues.append(f"Block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' references non-existent parent '{parent_id}'.") else: # Validate the parent-child connection type based on block types parent_block_type = get_block_type(parent_block.get("opcode")) if parent_block_type in ["stack", "hat", "c_block"]: # If parent is a stack/hat/C-block, this block should be its 'next' or an input/substack if parent_block.get("next") == block_id: # Correct sequential connection pass else: found_as_input = False for input_name, input_value in parent_block.get("inputs", {}).items(): if isinstance(input_value, list) and len(input_value) >= 2 and input_value[1] == block_id: found_as_input = True break if not found_as_input: issues.append(f"Block '{block_id}' (opcode: {opcode}) has parent '{parent_id}' but is not a sequential 'next' block nor connected via an input slot. Potential broken linkage.") elif parent_block_type in ["reporter", "boolean"]: # If parent is a reporter/boolean, this block must be its child (e.g., a shadow block or another reporter) # and connected via an input. found_as_input = False for input_name, input_value in parent_block.get("inputs", {}).items(): if isinstance(input_value, list) and len(input_value) >= 2 and input_value[1] == block_id: found_as_input = True break if not found_as_input: issues.append(f"Block '{block_id}' (opcode: {opcode}) has reporter/boolean parent '{parent_id}' but is not linked via an input. Potential broken linkage.") else: issues.append(f"Block '{block_id}' (opcode: {opcode}) has an unexpected parent block type '{parent_block_type}' for parent '{parent_id}'.") # Check next linkage: only cap blocks should not have a next. Shadow blocks also shouldn't have next. if next_id: next_block = script_blocks.get(next_id) if not next_block: issues.append(f"Block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' references non-existent next block '{next_id}'.") elif next_block.get("parent") != block_id: issues.append(f"Block '{block_id}' (opcode: {opcode})'s next block '{next_id}' does not link back to it via 'parent'.") # Cap blocks should not have 'next' if current_block_type == "cap": issues.append(f"Cap block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' should not have a 'next' connection as it ends a script.") # Shadow blocks should not have 'next' if block_data.get("shadow"): issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' should not have a 'next' connection.") # Input validation if "inputs" in block_data: for input_name, input_value in block_data["inputs"].items(): if not isinstance(input_value, list) or len(input_value) < 2: issues.append(f"Block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' has malformed input '{input_name}': {input_value}.") continue type_code = input_value[0] value_or_block_id = input_value[1] # Check type code validity if type_code not in [1, 2, 3]: issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' has invalid type code: {type_code}. Expected 1, 2, or 3.") if isinstance(value_or_block_id, str): # It's a block ID, check if it exists and its parent link is correct connected_block = script_blocks.get(value_or_block_id) if not connected_block: issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' references non-existent block ID '{value_or_block_id}'.") continue # Add this line to skip further processing if block is not found elif connected_block.get("parent") != block_id: issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' connects to '{value_or_block_id}', but '{value_or_block_id}'s parent is not '{block_id}'.") connected_block_type = get_block_type(connected_block.get("opcode")) # Refined type code consistency checks if type_code == 2: # Block ID (substack or reporter/boolean plugged in, without shadow) if input_name.startswith("SUBSTACK"): if connected_block_type != "stack": issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' (type 2, SUBSTACK) expects a stack block, but connected block '{value_or_block_id}' is of type '{connected_block_type}'.") elif connected_block_type not in ["reporter", "boolean"]: issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' (type 2, non-SUBSTACK) expects a reporter/boolean, but connected block '{value_or_block_id}' is of type '{connected_block_type}'.") elif type_code == 1: # Literal value or shadow block ID (possibly with an attached block) if isinstance(value_or_block_id, str) and connected_block_type not in ["reporter", "boolean"] and not connected_block.get("shadow"): issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' (type 1) expects a literal or reporter/boolean/shadow, but connected block '{value_or_block_id}' is of type '{connected_block_type}' and not a shadow.") elif type_code == 3: # Shadow block ID with default value, possibly with attached block if isinstance(value_or_block_id, str) and not connected_block.get("shadow"): issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' (type 3) expects a shadow block, but connected block '{value_or_block_id}' is not a shadow.") # Specific checks for C-blocks' SUBSTACK if get_block_type(opcode) == "c_block" and input_name.startswith("SUBSTACK"): # For a C-block, SUBSTACK input must have type code 2 and link to a valid block ID if not (isinstance(value_or_block_id, str) and script_blocks.get(value_or_block_id) and type_code == 2): issues.append(f"C-block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' has an invalid or missing SUBSTACK input configuration.") # Shadow block specific checks if block_data.get("shadow"): if block_data.get("topLevel"): issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' is incorrectly marked as topLevel.") if not block_data.get("parent"): issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' is missing a parent.") if block_data.get("next"): # Shadow blocks should never have 'next' issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' should not have a 'next' connection.") # Enhanced field validation for specific shadow block opcodes if opcode == "math_number": num_field = block_data.get("fields", {}).get("NUM") if not (isinstance(num_field, list) and len(num_field) >= 1 and isinstance(num_field[0], (str, int, float))): issues.append(f"Math_number shadow block '{block_id}' has malformed 'NUM' field: {num_field}. Expected [value, ID_or_null].") elif opcode == "event_broadcast_menu": broadcast_field = block_data.get("fields", {}).get("BROADCAST_OPTION") # Standard field name for broadcast menu if not (isinstance(broadcast_field, list) and len(broadcast_field) >= 1 and isinstance(broadcast_field[0], str) and broadcast_field[0]): # Ensure broadcast name is a non-empty string issues.append(f"Broadcast menu shadow block '{block_id}' has malformed or empty 'BROADCAST_OPTION' field: {broadcast_field}. Expected ['message name', 'ID_or_null'].") if broadcast_field and len(broadcast_field) > 1 and broadcast_field[1] is None: # This indicates a 'default option' is null, which is often a problem if it's meant to be a pre-defined ID. # Flag this as a potential issue, indicating a missing default option. issues.append(f"Broadcast menu shadow block '{block_id}' has a 'BROADCAST_OPTION' field with a null default ID: {broadcast_field}. This might indicate a missing pre-defined broadcast ID.") elif opcode == "sensing_touchingobjectmenu": touching_field = block_data.get("fields", {}).get("TOUCHINGOBJECTMENU") if not (isinstance(touching_field, list) and len(touching_field) >= 1 and isinstance(touching_field[0], str) and touching_field[0]): issues.append(f"Touching object menu shadow block '{block_id}' has malformed or empty 'TOUCHINGOBJECTMENU' field: {touching_field}. Expected ['object name', 'ID_or_null'].") if touching_field and len(touching_field) > 1 and touching_field[1] is None: issues.append(f"Touching object menu shadow block '{block_id}' has a 'TOUCHINGOBJECTMENU' field with a null default ID: {touching_field}. This might indicate a missing pre-defined object ID for 'edge', 'mouse-pointer', or a sprite.") # Add more specific checks for other menu-type shadow blocks if needed # Validate references to variables, lists, and broadcasts if "fields" in block_data: # Variable references (data_setvariableto, data_changevariableby, data_variable) if opcode in ["data_setvariableto", "data_changevariableby", "data_variable"]: var_name_field = block_data["fields"].get("VARIABLE") if var_name_field and isinstance(var_name_field, list) and len(var_name_field) >= 1 and isinstance(var_name_field[0], str): var_name = var_name_field[0] if var_name not in sprite_variables: issues.append(f"Block '{block_id}' (opcode: {opcode}) references non-existent variable '{var_name}'. Recommendation: Ensure '{var_name}' is defined.") else: issues.append(f"Block '{block_id}' (opcode: {opcode}) has malformed 'VARIABLE' field: {var_name_field}.") # List references (data_addtolist, data_deleteoflist, data_itemoflist, data_listcontents) if opcode in ["data_addtolist", "data_deleteoflist", "data_itemoflist", "data_listcontents"]: list_name_field = block_data["fields"].get("LIST") if list_name_field and isinstance(list_name_field, list) and len(list_name_field) >= 1 and isinstance(list_name_field[0], str): list_name = list_name_field[0] if list_name not in sprite_lists: issues.append(f"Block '{block_id}' (opcode: {opcode}) references non-existent list '{list_name}'. Recommendation: Ensure '{list_name}' is defined.") else: issues.append(f"Block '{block_id}' (opcode: {opcode}) has malformed 'LIST' field: {list_name_field}.") # Broadcast references in 'when I receive' hat blocks if opcode == "event_whenbroadcastreceived": broadcast_name_field = block_data["fields"].get("BROADCAST_OPTION") if broadcast_name_field and isinstance(broadcast_name_field, list) and len(broadcast_name_field) >= 1 and isinstance(broadcast_name_field[0], str): broadcast_name = broadcast_name_field[0] if broadcast_name not in sprite_broadcasts: issues.append(f"Hat block '{block_id}' (opcode: {opcode}) listens for non-existent broadcast message '{broadcast_name}'. Recommendation: Ensure '{broadcast_name}' is broadcast elsewhere.") else: issues.append(f"Hat block '{block_id}' (opcode: {opcode}) has malformed 'BROADCAST_OPTION' field: {broadcast_name_field}.") # Custom block call validation if opcode == "procedures_call" and "mutation" in block_data: proccode = block_data["mutation"].get("proccode") if proccode and proccode not in sprite_custom_blocks: issues.append(f"Custom block call '{block_id}' (proccode: {proccode}) references non-existent custom block definition. Recommendation: Define custom block '{proccode}'.") if not proccode: issues.append(f"Custom block call '{block_id}' is missing 'proccode' in its mutation.") # Check for event_broadcast blocks having empty message (via malformed shadow) if opcode == "event_broadcast" or opcode == "event_broadcastandwait": broadcast_input = block_data.get("inputs", {}).get("BROADCAST_INPUT") if broadcast_input and isinstance(broadcast_input, list) and len(broadcast_input) >= 2: # If connected to a shadow block if isinstance(broadcast_input[1], str) and script_blocks.get(broadcast_input[1]) and script_blocks[broadcast_input[1]].get("shadow"): shadow_block = script_blocks[broadcast_input[1]] if shadow_block.get("opcode") == "event_broadcast_menu": broadcast_field = shadow_block.get("fields", {}).get("BROADCAST_OPTION") if not (broadcast_field and isinstance(broadcast_field, list) and len(broadcast_field) >= 1 and isinstance(broadcast_field[0], str) and broadcast_field[0].strip()): issues.append(f"Broadcast block '{block_id}' uses a shadow block '{shadow_block.get('id', 'unknown')}' that has an empty or malformed broadcast message. Recommendation: Ensure the broadcast message is a non-empty string.") # If connected to a literal string (less common for broadcast but possible) elif isinstance(broadcast_input[1], list) and len(broadcast_input[1]) >= 2 and isinstance(broadcast_input[1][1], str) and not broadcast_input[1][1].strip(): issues.append(f"Broadcast block '{block_id}' has an empty literal broadcast message. Recommendation: Provide a non-empty message.") else: issues.append(f"Broadcast block '{block_id}' has a missing or malformed 'BROADCAST_INPUT'. Recommendation: Ensure a valid message or shadow block is connected.") return issues # Node 12: Verification Node def block_verification_node(state: dict) -> dict: """ The block verifier check for any improvements needed through logical if-else and then adds them to the improvement_plan. After the improvement plan, the LLM reviewer node also checks for other errors or issues, if any, and finally provides the review as feedback. Args: state (dict): The current state dictionary containing project_json, etc. Returns: dict: The updated state dictionary with verification feedback. """ logger.info(f"--- Running BlockVerificationNode (Iteration: {state.get('iteration_count', 0)}) ---") # Increased MAX_IMPROVEMENT_ITERATIONS to allow for more verification and improvement cycles. # Set to 5 to allow multiple detection-improvement loops. MAX_IMPROVEMENT_ITERATIONS = 3 current_iteration = state.get("iteration_count", 0) project_json = state["project_json"] targets = project_json["targets"] # Initialize needs_improvement for the current run state["needs_improvement"] = False block_validation_feedback_overall = [] improvement_plan = {"sprite_issues": {}} state["improvement_plan"] = improvement_plan # Ensure it's in state even if empty for target in targets: sprite_name = target["name"] all_blocks_for_sprite = target.get("blocks", {}) # Correctly extract variables (name -> ID mapping) sprite_variables = {var_data[0]: var_id for var_id, var_data in target.get("variables", {}).items()} # Correctly extract lists (name -> ID mapping) sprite_lists = {list_data[0]: list_id for list_id, list_data in target.get("lists", {}).items()} # Broadcasts are typically collected from the 'event_whenbroadcastreceived' blocks # and potentially from the 'event_broadcast_menu' shadow blocks. sprite_broadcasts = set() for block_id, block_data in all_blocks_for_sprite.items(): if block_data.get("opcode") == "event_whenbroadcastreceived" and "BROADCAST_OPTION" in block_data.get("fields", {}): broadcast_name_field = block_data["fields"]["BROADCAST_OPTION"] if isinstance(broadcast_name_field, list) and len(broadcast_name_field) >= 1 and isinstance(broadcast_name_field[0], str): sprite_broadcasts.add(broadcast_name_field[0]) elif block_data.get("opcode") == "event_broadcast_menu" and "BROADCAST_OPTION" in block_data.get("fields", {}) and block_data.get("shadow"): broadcast_name_field = block_data["fields"]["BROADCAST_OPTION"] if isinstance(broadcast_name_field, list) and len(broadcast_name_field) >= 1 and isinstance(broadcast_name_field[0], str): sprite_broadcasts.add(broadcast_name_field[0]) # Extract custom block definitions (proccodes) sprite_custom_blocks = {} for block_id, block_data in all_blocks_for_sprite.items(): if block_data.get("opcode") == "procedures_definition" and "mutation" in block_data: proccode = block_data["mutation"].get("proccode") if proccode: sprite_custom_blocks[proccode] = block_id if not all_blocks_for_sprite: logger.info(f"Sprite '{sprite_name}' has no blocks. Skipping verification.") continue sprite_issues = [] hat_block_ids = [ block_id for block_id, block_data in all_blocks_for_sprite.items() if block_data.get("topLevel") and get_block_type(block_data.get("opcode")) == "hat" ] processed_script_blocks = set() if not hat_block_ids: sprite_issues.append("No top-level hat blocks found for this sprite. Scripts may not run automatically.") for hat_id in hat_block_ids: logger.info(f"Verifying script starting with hat block '{hat_id}' for sprite '{sprite_name}'.") current_script_blocks = filter_script_blocks(all_blocks_for_sprite, hat_id) processed_script_blocks.update(current_script_blocks.keys()) script_issues = analyze_script_structure( current_script_blocks, hat_id, sprite_name, sprite_variables, sprite_lists, sprite_broadcasts, sprite_custom_blocks ) if script_issues: sprite_issues.append(f"Issues in script starting with '{hat_id}':") sprite_issues.extend([f" - {issue}" for issue in script_issues]) else: logger.info(f"Script starting with '{hat_id}' for sprite '{sprite_name}' passed basic verification.") # Identify truly orphaned blocks (not top-level, not part of any script, no parent, not shadow) orphaned_blocks_overall = { block_id for block_id in all_blocks_for_sprite.keys() if block_id not in processed_script_blocks and not all_blocks_for_sprite[block_id].get("topLevel") and not all_blocks_for_sprite[block_id].get("parent") and not all_blocks_for_sprite[block_id].get("shadow") } if orphaned_blocks_overall: sprite_issues.append(f"Found {len(orphaned_blocks_overall)} truly orphaned blocks not connected to any valid script: {', '.join(list(orphaned_blocks_overall)[:5])}{'...' if len(orphaned_blocks_overall) > 5 else ''}.") # Also check for top-level blocks that are not hat blocks and are not processed for block_id in all_blocks_for_sprite.keys(): block_data = all_blocks_for_sprite[block_id] if block_data.get("topLevel") and get_block_type(block_data.get("opcode")) != "hat" and block_id not in processed_script_blocks: sprite_issues.append(f"Top-level block '{block_id}' (opcode: {block_data.get('opcode')}) is not a hat block and is not part of any script, so it will not run automatically.") if sprite_issues: improvement_plan["sprite_issues"][sprite_name] = sprite_issues logger.warning(f"Verification found issues for sprite '{sprite_name}'.") block_validation_feedback_overall.append(f"Issues for {sprite_name}:\n" + "\n".join([f"- {issue}" for issue in sprite_issues])) state["needs_improvement"] = True print(f"\n--- Verification Report (Issues Found for {sprite_name}) ---") print(json.dumps({sprite_name: sprite_issues}, indent=2)) else: logger.info(f"Sprite '{sprite_name}' passed all verification checks.") # Consolidate feedback for the LLM state["block_validation_feedback"] = "\n\n".join(block_validation_feedback_overall) print(f"[OVERALL IMPROVEMENT PLAN ON ITERATION {current_iteration}]: {improvement_plan}") if state["needs_improvement"]: llm_reviewer_prompt = f"""You are an expert Scratch project reviewer. Your task is to analyze the provided \ structural issues found in a Scratch project's sprites and suggest improvements or \ further insights. Focus on clarity, accuracy, and actionable advice. Here are the detected structural issues: {json.dumps(improvement_plan['sprite_issues'], indent=2)} \n Here are the block validation feedback: {json.dumps(state["block_validation_feedback"], indent=2)} \n Please review these issues and provide a consolidated report with potential causes \ and recommendations for fixing them. If an issue is minor or expected in certain \ scenarios (e.g., hidden blocks for backward compatibility), please note that. Structure your response as a JSON object with 'review_summary' as the key, \ containing a dictionary where keys are sprite names and values are lists of suggested improvements. Example: ```json {{ "review_summary": {{ "Sprite1": [ "Issue: Hat block 'abc' is not topLevel. Recommendation: Ensure all scripts start with a top-level hat block that has no parent.", "Issue: Block 'xyz' has unknown opcode 'motion_nonexistent'. Recommendation: Verify the opcode against the Scratch 3.0 block reference. This might be a typo or a deprecated block.", "Issue: Block 'var_block_id' references non-existent variable 'myVariable'. Recommendation: Ensure all variables used in blocks are defined in the sprite's variable list." ], "Sprite2": [ "Issue: Found 3 orphaned blocks. Recommendation: Reconnect these blocks to existing scripts or remove them if no longer needed." ] }} }} ``` """ try: response = agent.invoke({"messages": [{"role": "user", "content": llm_reviewer_prompt}]}) raw_review_response = response["messages"][-1].content try: state["review_block_feedback"] = extract_json_from_llm_response(raw_review_response) except json.JSONDecodeError as error_json: logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") # Use the JSON resolver agent to fix the response correction_prompt = ( "Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n" "Carefully review the JSON for any errors, especially focusing on the reported error at:\n" f"- **Error Details**: {error_json}\n\n" "**Strict Instructions for your response:**\n" "1. **ONLY** output the corrected JSON. Do not include any other text, comments, or explanations outside the JSON.\n" "2. Ensure all property names (keys) are enclosed in **double quotes**.\n" "3. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\`) are properly **escaped**.\n" "4. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n" "5. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n" "6. **Crucially, remove any extraneous characters or duplicate closing braces outside the main JSON object.**\n" "7. The corrected JSON must be a **complete and valid** JSON object.\n\n" "Here is the problematic JSON string to correct:\n" "```json\n" f"{raw_review_response}\n" "```\n" "Corrected JSON:\n" ) correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) print(f"[JSON CORRECTOR RESPONSE AT BLOCKVERFIER ]: {correction_response['messages'][-1].content}") state["review_block_feedback"] = extract_json_from_llm_response(correction_response["messages"][-1].content) logger.info("Agent review feedback added to the state.") print("\n--- Agent Review Feedback ---") print(json.dumps(state["review_block_feedback"], indent=2)) except Exception as e: logger.error(f"Error invoking agent for review in BlockVerificationNode: {e}") # Fallback for LLM error: populate with a general error message state["review_block_feedback"] = {"review_summary": {"Overall": [f"Error during LLM review: {e}"]}} else: logger.info("BlockVerificationNode completed: No issues found in any sprite blocks.") print("\n--- Verification Report (No Issues Found) ---") state["block_validation_feedback"] = "No issues found in sprite blocks." state["review_block_feedback"] = {"review_summary": {"Overall": ["No issues found in sprite blocks. All good!"]}} # Manage iteration count based on overall needs_improvement flag if state["needs_improvement"]: # Increment iteration count *only if* improvements are needed for the next cycle state["iteration_count"] = current_iteration + 1 if state["iteration_count"] >= MAX_IMPROVEMENT_ITERATIONS: logger.warning(f"Max improvement iterations ({MAX_IMPROVEMENT_ITERATIONS}) reached for block verification. Forcing 'needs_improvement' to False.") state["needs_improvement"] = False state["block_validation_feedback"] += "\n(Note: Max iterations reached for block verification, stopping further improvements.)" # Ensure 'Overall' key exists before attempting to append if "review_summary" not in state["review_block_feedback"]: state["review_block_feedback"]["review_summary"] = {} if "Overall" not in state["review_block_feedback"]["review_summary"]: state["review_block_feedback"]["review_summary"]["Overall"] = [] state["review_block_feedback"]["review_summary"]["Overall"].append("Max iterations reached for block verification, stopping further improvements based on this run.") else: # Reset if no more improvement needed for blocks, indicating successful verification state["iteration_count"] = 0 logger.info(f"Block verification completed. Needs Improvement: {state['needs_improvement']}. Feedback: {state['block_validation_feedback'][:100]}...") print("===========================================================================") print(f"[BLOCK VERIFICATION NODE: (improvement_plan)]:{state.get('improvement_plan')}") print(f"[BLOCK VERIFICATION NODE: (review_block_feedback)]:{state.get('review_block_feedback')}") #with open("debug_state.json", "w", encoding="utf-8") as f: # json.dump(state, f, indent=2, ensure_ascii=False) return state # Node 13: Improvised block builder Node def improvement_block_builder_node(state: GameState): logger.info("--- Running ImprovementBlockBuilderNode ---") project_json = state["project_json"] targets = project_json["targets"] sprite_map = {target["name"]: target for target in targets if not target["isStage"]} # Also get the Stage target stage_target = next((target for target in targets if target["isStage"]), None) if stage_target: sprite_map[stage_target["name"]] = stage_target # Pre-load all block-catalog JSONs once all_catalogs = [ hat_block_data, boolean_block_data, c_block_data, cap_block_data, reporter_block_data, stack_block_data ] # Renamed from improvement_plan as it's not directly used here for iteration # improvement_plan = state.get("improvement_plan", {}) block_verification_feedback = state.get("block_validation_feedback", "no feedback") review_block_feedback = state.get("review_block_feedback", {}) # This node is combination of function base issue finder + LLM issue finder. action_plan = state.get("action_plan", {}) if not action_plan: # Check if action_plan is empty instead of review_block_feedback logger.warning("No action plan found in state. Skipping ImprovementBlockBuilderNode.") return state script_y_offset = {} script_x_offset_per_sprite = {name: 0 for name in sprite_map.keys()} # This is the handler which ensure if somehow json response changed it handle it.[DONOT REMOVE BELOW LOGIC] if action_plan.get("action_overall_flow", {}) == {}: plan_data = action_plan.items() else: plan_data = action_plan.get("action_overall_flow", {}).items() for sprite_name, sprite_improvements_data in plan_data: if sprite_name in sprite_map: current_sprite_target = sprite_map[sprite_name] # Ensure 'blocks' key exists for the current sprite if "blocks" not in current_sprite_target: current_sprite_target["blocks"] = {} # Fetch existing blocks for the current sprite existing_sprite_blocks = current_sprite_target.get("blocks", {}) if sprite_name not in script_y_offset: script_y_offset[sprite_name] = 0 # Get review feedback specific to this sprite sprite_review_summary = review_block_feedback.get("review_summary", state.get(sprite_name, [])).get(sprite_name, []) # get review_summary or direct_issues if any sprite_review_feedback_str = "\n".join([f"- {issue}" for issue in sprite_review_summary]) if sprite_review_summary else "No specific issues reported for this sprite in the review." for plan_entry in sprite_improvements_data.get("plans", []): event_opcode = plan_entry["event"] # This is now expected to be an opcode logic_sequence = plan_entry["logic"] # This is the semicolon-separated string # Extract the new opcode lists from the plan_entry motion_opcodes = plan_entry.get("motion", plan_entry.get("motions", [])) # handling of motion and motions by llm without re control_opcodes = plan_entry.get("control", plan_entry.get("controls", [])) # handling of control and controls by llm without re operator_opcodes = plan_entry.get("operator", plan_entry.get("operators", [])) # handling of operator and operators by llm without re sensing_opcodes = plan_entry.get("sensing", []) looks_opcodes = plan_entry.get("looks", plan_entry.get("look", [])) # handling of look and looks by llm without re sound_opcodes = plan_entry.get("sounds", plan_entry.get("sound", [])) # handling of sound and sounds by llm without re events_opcodes = plan_entry.get("events", []) # for stack block events_opcodes_2_raw = plan_entry.get("event", []) # for hat block [NOTE: both are different] events_opcodes_2 = [events_opcodes_2_raw] if isinstance(events_opcodes_2_raw, str) else events_opcodes_2_raw data_opcodes = plan_entry.get("data", []) needed_opcodes = ( motion_opcodes + control_opcodes + operator_opcodes + sensing_opcodes + looks_opcodes + sound_opcodes + events_opcodes + events_opcodes_2 + data_opcodes ) needed_opcodes = list(set(needed_opcodes)) # 2) build filtered runtime catalog (if you still need it) filtered_catalog = { op: ALL_SCRATCH_BLOCKS_CATALOG[op] for op in needed_opcodes if op in ALL_SCRATCH_BLOCKS_CATALOG } # 3) merge human-written catalog + runtime entry for each opcode combined_blocks = {} for op in needed_opcodes: catalog_def = find_block_in_all(op, all_catalogs) or {} runtime_def = ALL_SCRATCH_BLOCKS_CATALOG.get(op, {}) # merge: catalog fields first, then runtime overrides/adds merged = {**catalog_def, **runtime_def} combined_blocks[op] = merged print("Combined blocks for this script:", json.dumps(combined_blocks, indent=2)) llm_block_generation_prompt = f"""You are an AI assistant generating Scratch 3.0 block JSON for a single script based on an improvement plan. The current sprite is '{sprite_name}'. The specific script to generate blocks for is for the event with opcode '{event_opcode}'. The sequential logic to implement is: Logic: {logic_sequence} **Scratch 3.0 Block Shape Description** ### Hat Blocks Description: {hat_description} ### Boolean Blocks Description: {boolean_description} ### C Blocks Description: {c_description} ### Cap Blocks Description: {cap_description} ### Reporter Blocks Description: {reporter_description} ### Stack Blocks Description: {stack_description} **Based on the planning, the following Scratch block opcodes are expected to be used to implement this logic. Focus on using these specific opcodes where applicable, and refer to the ALL_SCRATCH_BLOCKS_CATALOG for their full structure and required inputs/fields:** Here is the comprehensive catalog of required Scratch 3.0 blocks: ```json {json.dumps(combined_blocks, indent=2)} ``` Here is general feedback and suggestions you should take care of: suggestion:{block_verification_feedback} **VERY IMPORTANT: Here is the specific review feedback for this sprite. You MUST address these points in your block generation:** ``` {sprite_review_feedback_str} ``` Current Scratch project JSON for this sprite (for context, its existing blocks if any. Note: If this is a hat block, you should be generating a new script entirely, otherwise, integrate or modify existing blocks as per the action plan and review feedback): ```json {json.dumps(existing_sprite_blocks, indent=2)} ``` **Instructions for generating the block JSON (EXTREMELY IMPORTANT - FOLLOW THESE EXAMPLES PRECISELY):** 1. **Start with the event block (Hat Block):** This block's `topLevel` should be `true` and `parent` should be `null`. Its `x` and `y` coordinates should be set (e.g., `x: 0, y: 0` or reasonable offsets for multiple scripts). 2. **Generate a sequence of connected blocks:** For each block, generate a **unique block ID** (e.g., 'myBlockID123'). 3. **Link blocks correctly:** * Use the `next` field to point to the ID of the block directly below it in the stack. * Use the `parent` field to point to the ID of the block directly above it in the stack. * For the last block in a stack, `next` should be `null`. 4. **Handle `inputs` for parameters and substacks (CRITICAL DETAIL - PAY CLOSE ATTENTION TO EXAMPLES):** * The value for any key within the `inputs` dictionary MUST always be an **array** of two elements: `[type_code, value_or_block_id]`. * **STRICTLY FORBIDDEN MISTAKE:** DO NOT put an array like `["num", "value"]` or `["_edge_", null]` directly as the `value_or_block_id`. This is the source of past errors. * The `type_code` (first element of the array) indicates the nature of the input: * `1`: For a primitive value or a shadow block ID (e.g., a number, string, boolean, or reference to a shadow block). This is the most common type for direct values. * `2`: For a block ID where another block is directly plugged into this input (e.g., an operator block connected to an `if` condition, or the first block of a C-block's substack). * **Correct Example for Numerical Input (e.g., for `motion_movesteps` STEPS, or `motion_gotoxy` X/Y):** If you need to input the number `10` into a block, you MUST create a separate `math_number` shadow block for it, and then reference its ID. ```json // Main block using the number "mainBlockID": {{ "opcode": "motion_movesteps", "inputs": {{ "STEPS": [1, "shadowNumID"] }}, // ... other fields }}, // The separate shadow block for the number '10' "shadowNumID": {{ "opcode": "math_number", "fields": {{ "NUM": ["10", null] }}, "shadow": true, "parent": "mainBlockID", "topLevel": false }} ``` * **Correct Example for Dropdown/Menu Input (e.g., `sensing_touchingobject` with 'edge'):** If you need to select 'edge' for the `touching ()?` block, you MUST create a separate `sensing_touchingobjectmenu` shadow block and reference its ID. ```json // Main block using the dropdown selection "touchingBlockID": {{ "opcode": "sensing_touchingobject", "inputs": {{ "TOUCHINGOBJECTMENU": [1, "shadowEdgeMenuID"] }}, // ... other fields }}, // The separate shadow block for the 'edge' menu option "shadowEdgeMenuID": {{ "opcode": "sensing_touchingobjectmenu", "fields": {{ "TOUCHINGOBJECTMENU": ["_edge_", null] }}, "shadow": true, "parent": "touchingBlockID", "topLevel": false }} ``` * **Correct Example for C-block (e.g., `control_forever`):** The `control_forever` block MUST have a `SUBSTACK` input pointing to the first block inside its loop. The value for `SUBSTACK` must be an array: `[2, "FIRST_BLOCK_IN_FOREVER_LOOP_ID"]`. ```json "foreverBlockID": {{ "opcode": "control_forever", "inputs": {{ "SUBSTACK": [2, "firstBlockInsideForeverID"] }}, "next": null, "parent": "blockAboveForeverID", "shadow": false, "topLevel": false }}, "firstBlockInsideForeverID": {{ // ... definition of the first block inside the forever loop "parent": "foreverBlockID", "next": "secondBlockInsideForeverID" // if there's another block }} ``` * **Correct Example for C-block with condition (e.g., `control_if`):** The `control_if` block MUST have a `CONDITION` input (typically `type_code: 1` referencing a boolean reporter block) and a `SUBSTACK` input (`type_code: 2` referencing the first block inside the if-body). ```json "ifBlockID": {{ "opcode": "control_if", "inputs": {{ "CONDITION": [1, "conditionBlockID"], "SUBSTACK": [2, "firstBlockInsideIfID"] }}, "next": "blockAfterIfID", "parent": "blockAboveIfID", "shadow": false, "topLevel": false }}, "conditionBlockID": {{ "opcode": "sensing_touchingobject", // Example condition block // ... definition for condition block, parent should be "ifBlockID" }}, "firstBlockInsideIfID": {{ // ... definition of the first block inside the if body "parent": "ifBlockID", "next": null // or next block if more }} ``` * **Correct Example for Sensing Blocks (`sensing_touchingobject`, `sensing_touchingcolor`, etc.):** For blocks like `sensing_touchingobject`, the input is typically `"TOUCHINGOBJECTMENU"`. Its value must be `[1, "ID_OF_SENSING_MENU_SHADOW_BLOCK"]`. The corresponding shadow block (e.g., `sensing_touchingobjectmenu`) has a `fields` dictionary with a key matching the input name (e.g., `"TOUCHINGOBJECTMENU"`). The value for this field must be an array `["_mouse_", null]` or `["_edge_", null]` for specific options, or `["SpriteName", "SPRITE_ID_FROM_TARGETS"]` for other sprites. Ensure the string value in the `fields` array for these menus matches the exact Scratch internal string for the option (e.g., `"_edge_"` not `"edge"` unless specifically allowed). Refer to `ALL_SCRATCH_BLOCKS_CATALOG` for exact field values. Example for touching 'edge': ```json "myTouchingBlockID": {{ "opcode": "sensing_touchingobject", "inputs": {{ "TOUCHINGOBJECTMENU": [1, "myTouchingObjectMenuShadowID"] }}, "next": null, "parent": "someParentBlockID", "shadow": false, "topLevel": false }}, "myTouchingObjectMenuShadowID": {{ "opcode": "sensing_touchingobjectmenu", "fields": {{ "TOUCHINGOBJECTMENU": ["_edge_", null] }}, "shadow": true, "parent": "myTouchingBlockID", "topLevel": false }} ``` * **Correct Example for Clone Blocks (`control_create_clone_of`, `control_when_start_as_clone`, `control_delete_this_clone`):** `control_create_clone_of` has an input named `"CLONE_TARGET"`. Its value must be `[1, "ID_OF_CLONE_TARGET_MENU_SHADOW_BLOCK"]`. The corresponding shadow block (`control_create_clone_of_menu`) has a `fields` dictionary with a key named `"CLONE_TARGET"`. The value for `"CLONE_TARGET"` must be an array `["_myself_", null]` or `["SpriteName", "SPRITE_ID_FROM_TARGETS"]`. `control_when_start_as_clone` is a hat block and thus `topLevel: true`, `parent: null`, and `next: null`. `control_delete_this_clone` is a stack block and will have `next` and `parent` connections. Example for creating a clone of 'myself': ```json "myCloneBlockID": {{ "opcode": "control_create_clone_of", "inputs": {{ "CLONE_TARGET": [1, "myCloneTargetMenuShadowID"] }}, "next": null, "parent": "someParentBlockID", "shadow": false, "topLevel": false }}, "myCloneTargetMenuShadowID": {{ "opcode": "control_create_clone_of_menu", "fields": {{ "CLONE_TARGET": ["_myself_", null] }}, "shadow": true, "parent": "myCloneBlockID", "topLevel": false }} ``` 5. **Define ALL Shadow Blocks Separately (THIS IS ESSENTIAL):** Every time a block's input requires a number, string literal, or a selection from a dropdown/menu, you MUST define a **separate block entry** in the top-level blocks dictionary for that shadow. Each shadow block MUST have `"shadow": true` and `"topLevel": false`. 6. **`fields` for direct dropdown values/text:** Use the `fields` dictionary ONLY IF the block directly embeds a dropdown value or text field without an `inputs` connection (e.g., the `KEY_OPTION` field within the `event_whenkeypressed` shadow block itself, or the `variable` field on a `data_setvariableto` block). Example: `"fields": {{"KEY_OPTION": ["space", null]}}`. 7. **`topLevel: true` for hat blocks:** Only the starting (hat) block of a script should have `"topLevel": true`. 8. **Ensure Unique Block IDs:** Every block you generate (main blocks and shadow blocks) must have a unique ID within the entire script's block dictionary. 9. **Strictly Use Catalog Opcodes:** You MUST only use `opcode` values that are present in the provided `ALL_SCRATCH_BLOCKS_CATALOG`. Do NOT use unlisted opcodes like `motion_jump`. 10. **Return ONLY the JSON object representing all the blocks for THIS SINGLE SCRIPT.** Do NOT wrap it in a 'blocks' key or the full project JSON. The output should be a dictionary where keys are block IDs and values are block definitions. You need to ensure that the generated blocks address the provided `sprite_review_feedback_str` and integrate with or replace existing blocks as necessary to fulfill the `logic_sequence` for the `event_opcode`. 11. **Additional Reminders for Review Feedback:** * Pay close attention to issues like "Hat block ... have a 'next' connection" – hat blocks *must* have `next: null`. * Address "Blocks ... have reporter/boolean parents but are not linked via an input" by ensuring `inputs` dictionary correctly links to child blocks. * For "Menu shadow blocks ... have malformed fields," verify the exact structure of the `fields` array for that specific opcode (e.g., `["VALUE", null]` or `["VALUE", "ID"]`). * For "Block ... references non-existent variable 'lives'," ensure that any variable or list referenced by a block (e.g., `data_variable`, `data_setvariableto`) exists in the sprite's `variables` or `lists` section of `project_json`. If it doesn't, the LLM should generate the necessary variable/list definition at the sprite level (though this current node focuses on blocks, the LLM should ideally be aware of project-level data). """ try: response = agent.invoke({"messages": [{"role": "user", "content": llm_block_generation_prompt}]}) raw_response = response["messages"][-1].content#strip_noise(response["messages"][-1].content) logger.info(f"Raw response from LLM [ImprovementBlockBuilderNode - {sprite_name} - {event_opcode}]: {raw_response[:500]}...") try: generated_blocks = extract_json_from_llm_response(raw_response) except json.JSONDecodeError as error_json: logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") # Use the JSON resolver agent to fix the response correction_prompt = ( "Your primary goal is to generate JSON that precisely defines Scratch 3.0 blocks. " "The output MUST be a single, valid JSON object, enclosed within a '```json' markdown block.\n" "**STRICT JSON SYNTAX RULES:**\n" "1. **NO EXTRA TEXT, COMMENTS, OR CONVERSATIONAL FILLER.** Only the JSON.\n" "2. All keys MUST be enclosed in double quotes.\n" "3. String values MUST be enclosed in double quotes. Internal double quotes must be escaped (e.g., `\\`). Newlines (`\\n`) and carriage returns (`\\r`) in strings must be escaped.\n" "4. No trailing commas.\n" "5. Correct nesting of braces `{}` and brackets `[]`.\n" "6. **CRITICAL: Understand and strictly follow the Scratch block input array format.**\n" " - When an input refers to another top-level block (like a variable block or a number block that is defined separately):\n" " `\"INPUT_NAME\": [1, \"ID_OF_REFERENCED_BLOCK\"]`\n" " - When an input *directly embeds* a literal value (like a number, string, or boolean shadow block):\n" " `\"INPUT_NAME\": [1, [\"math_number\", \"10\"]]` (for numbers)\n" " `\"INPUT_NAME\": [1, [\"text\", \"hello\"]]` (for strings)\n" " `\"INPUT_NAME\": [1, [\"boolean\", true]]` (for booleans)\n" " - **Do NOT** generate patterns like `some_id[\"value\", null]]` or `\"\"\": [1, some_id[\"\", null]]`.\n" " - Ensure `\"/X\"` and `\"Y\"` inputs for `motion_gotoxy` and `motion_glidesecstoxy` correctly reference a block ID or directly embed a number block.\n" "7. The 'logic' field should contain a plain string describing the action, without any embedded JSON or Scratch block syntax (like `\"forever\":\"` which breaks the string).\n\n" "Here is the problematic JSON string that needs correction, including the error details:\n" f"- **Error Details**: {error_json}\n" # Keep the error details for context "```json\n" f"{raw_response}\n" "```\n" "Your corrected, valid JSON response:\n" "```json\n" ) correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) print(f"[JSON CORRECTOR RESPONSE AT IMPROVEMENTBLOCKBUILDER ]: {correction_response['messages'][-1].content}") generated_blocks = extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content)) if "blocks" in generated_blocks and isinstance(generated_blocks["blocks"], dict): logger.warning(f"LLM returned nested 'blocks' key for {sprite_name}. Unwrapping.") generated_blocks = generated_blocks["blocks"] # Update block positions for top-level script for block_id, block_data in generated_blocks.items(): if block_data.get("topLevel"): block_data["x"] = script_x_offset_per_sprite.get(sprite_name, 0) block_data["y"] = script_y_offset[sprite_name] script_y_offset[sprite_name] += 150 # Increment for next script # Merge newly generated blocks with existing ones. # This is a crucial step for improvement. New blocks will overwrite existing ones with the same ID. # If the intention is to completely replace an existing script, the LLM should generate the full new script. # If it's to add/modify, the LLM should be aware of existing IDs to avoid conflicts or use them for parent/next links. current_sprite_target["blocks"].update(generated_blocks) logger.info(f"Improvement blocks added/updated for sprite '{sprite_name}', script '{event_opcode}' by ImprovementBlockBuilderNode.") except Exception as e: logger.error(f"Error generating blocks for sprite '{sprite_name}', script '{event_opcode}': {e}") raise state["project_json"] = project_json # Update the state with the modified project_json logger.info("Updated project JSON with improvement nodes.") print("Updated project JSON with improvement nodes:", json.dumps(project_json, indent=2)) # Print for direct visibility #with open("debug_state.json", "w", encoding="utf-8") as f: # json.dump(state, f, indent=2, ensure_ascii=False) return state #temporarry time delay for handling TPM issue import time def delay_for_tpm_node(state: GameState): logger.info("--- Running DelayForTPMNode ---") time.sleep(80) # Adjust the delay as needed logger.info("Delay completed.") return state # Build the LangGraph workflow workflow = StateGraph(GameState) # Add all nodes to the workflow workflow.add_node("parse_query", parse_query_and_set_initial_positions) workflow.add_node("game_description", game_description_node) workflow.add_node("declarations_plan", declaration_planner_node) workflow.add_node("add_declaration", declaration_builder_node) workflow.add_node("time_delay_1", delay_for_tpm_node) # this is a temporary node to handle TPM issues workflow.add_node("time_delay_2", delay_for_tpm_node) # this is a temporary node to handle TPM issues workflow.add_node("time_delay_3", delay_for_tpm_node) # this is a temporary node to handle TPM issues workflow.add_node("time_delay_4", delay_for_tpm_node) # this is a temporary node to handle TPM issues workflow.add_node("time_delay_5", delay_for_tpm_node) # this is a temporary node to handle TPM issues workflow.add_node("time_delay_6", delay_for_tpm_node) # this is a temporary node to handle TPM issues #workflow.add_node("time_delay_7", delay_for_tpm_node) # this is a temporary node to handle TPM issues workflow.add_node("initial_plan_build", overall_planner_node) # High-level planning node workflow.add_node("logic_alignment", plan_logic_aligner_node) workflow.add_node("plan_verifier", plan_verification_node) workflow.add_node("refined_planner", refined_planner_node) # Refines the action plan #workflow.add_node("opcode_counter", plan_opcode_counter_node) workflow.add_node("relation_builder", block_relationship_node) # Verifies the high-level plan workflow.add_node("block_builder", overall_block_builder_node) # Builds blocks from a plan # workflow.add_node("block_verifier", block_verification_node) # Verifies the generated blocks # workflow.add_node("improved_block_builder", improvement_block_builder_node) # For specific block-level improvements # Set the entry point workflow.set_entry_point("game_description") # Define the standard initial flow workflow.add_edge("game_description", "parse_query") workflow.add_edge("parse_query", "time_delay_1") workflow.add_edge("time_delay_1", "declarations_plan") workflow.add_edge("declarations_plan", "add_declaration") workflow.add_edge("add_declaration", "time_delay_2") workflow.add_edge("time_delay_2", "initial_plan_build") workflow.add_edge("initial_plan_build", "time_delay_3") workflow.add_edge("time_delay_3", "logic_alignment") workflow.add_edge("logic_alignment", "time_delay_4") workflow.add_edge("time_delay_4", "plan_verifier") # Define the conditional logic after plan_verifier (for high-level plan issues) def decide_next_step_after_plan_verification(state: GameState): if state.get("needs_improvement", False): # If the plan needs refinement, go to the refined_planner return "refined_planner" else: # If the plan is good, proceed to building blocks from this plan return "relation_builder"#"block_builder" workflow.add_conditional_edges( "plan_verifier", decide_next_step_after_plan_verification, { "refined_planner": "refined_planner", # Path if plan needs refinement "relation_builder": "relation_builder"#"block_builder": "block_builder" # Path if plan is approved, proceeds to block building } ) # --- CRITICAL CHANGE FOR THE PLAN REFINEMENT LOOP --- # After refining the plan, it should go back to plan_verifier for re-verification. workflow.add_edge("refined_planner", "time_delay_5") workflow.add_edge("time_delay_5", "plan_verifier") # This closes the loop for plan refinement and re-verification. workflow.add_edge("relation_builder", "time_delay_6") workflow.add_edge("time_delay_6", "block_builder") workflow.add_edge("block_builder", END) # Note: The original code had workflow.add_edge("time_delay", "block_builder") here, # but after refined_planner -> time_delay -> plan_verifier, the decision is made by plan_verifier. # So, this edge might be redundant or incorrect depending on the desired flow. # Assuming the intent is for plan_verifier to always decide the next step. # After blocks are built, they need to be verified #workflow.add_edge("time_delay_3", "block_builder") # workflow.add_edge("block_builder", "time_delay_5") # workflow.add_edge("time_delay_5", "block_verifier") # # Define the conditional logic after block_verifier (for generated blocks issues) # def decide_after_block_verification(state: GameState): # if state.get("needs_improvement", False): # # If blocks need improvement, go to improved_block_builder. # # This assumes improved_block_builder handles specific block-level fixes. # return "improved_block_builder" # else: # # If blocks are good, end the workflow # return "END" # workflow.add_conditional_edges( # "block_verifier", # decide_after_block_verification, # { # "improved_block_builder": "improved_block_builder", # Path if blocks need improvement # "END": END # Path if blocks are good # } # ) # # Create the loop: If blocks improved, re-verify them # #workflow.add_edge("time_delay_4", "improved_block_builder") # workflow.add_edge("improved_block_builder", "time_delay_6") # workflow.add_edge("time_delay_6", "block_verifier") # Compile the workflow graph app_graph = workflow.compile() # # Build the LangGraph workflow # workflow = StateGraph(GameState) # # Add all nodes to the workflow # workflow.add_node("parse_query", parse_query_and_set_initial_positions) # workflow.add_node("game_description", game_description_node) # workflow.add_node("declarations_plan", declaration_planner_node) # workflow.add_node("add_declaration", declaration_builder_node) # workflow.add_node("initial_plan_build", overall_planner_node) # High-level planning node # workflow.add_node("plan_verifier", plan_verification_node) # Verifies the high-level plan # workflow.add_node("refined_planner", refined_planner_node) # Refines the action plan # workflow.add_node("block_builder", overall_block_builder_node) # Builds blocks from a plan # workflow.add_node("block_verifier", block_verification_node) # Verifies the generated blocks # workflow.add_node("improved_block_builder", improvement_block_builder_node) # For specific block-level improvements # # Set the entry point # workflow.set_entry_point("game_description") # # Define the standard initial flow # workflow.add_edge("game_description", "parse_query") # workflow.add_edge("parse_query", "declarations_plan") # workflow.add_edge("declarations_plan", "add_declaration") # workflow.add_edge("add_declaration", "initial_plan_build") # workflow.add_edge("initial_plan_build", "plan_verifier") # # Define the conditional logic after plan_verifier (for high-level plan issues) # def decide_next_step_after_plan_verification(state: GameState): # if state.get("needs_improvement", False): # # If the plan needs refinement, go to the refined_planner # return "refined_planner" # else: # # If the plan is good, proceed to building blocks from this plan # return "block_builder" # workflow.add_conditional_edges( # "plan_verifier", # decide_next_step_after_plan_verification, # { # "refined_planner": "refined_planner", # Path if plan needs refinement # "block_builder": "block_builder" # Path if plan is approved, proceeds to block building # } # ) # # --- CRITICAL CHANGE FOR THE PLAN REFINEMENT LOOP --- # # After refining the plan, it should go back to plan_verifier for re-verification. # workflow.add_edge("refined_planner", "plan_verifier") # This closes the loop for plan refinement and re-verification. # # After blocks are built, they need to be verified # workflow.add_edge("block_builder", "block_verifier") # # Define the conditional logic after block_verifier (for generated blocks issues) # def decide_after_block_verification(state: GameState): # if state.get("needs_improvement", False): # # If blocks need improvement, go to improved_block_builder. # # This assumes improved_block_builder handles specific block-level fixes. # return "improved_block_builder" # else: # # If blocks are good, end the workflow # return "END" # workflow.add_conditional_edges( # "block_verifier", # decide_after_block_verification, # { # "improved_block_builder": "improved_block_builder", # Path if blocks need improvement # "END": END # Path if blocks are good # } # ) # # Create the loop: If blocks improved, re-verify them # workflow.add_edge("improved_block_builder", "block_verifier") # # Compile the workflow graph # app_graph = workflow.compile() # --- Serve the form --- @app.route("/", methods=["GET"]) def index(): return render_template("index4.html") # --- List static assets for the front-end --- @app.route("/list_assets", methods=["GET"]) def list_assets(): bdir = os.path.join(app.static_folder, "assets", "backdrops") sdir = os.path.join(app.static_folder, "assets", "sprites") sound_dir = os.path.join(app.static_folder, "assets", "sounds") # New sound directory backdrops = [] sprites = [] sounds = [] # List to store sound files try: if os.path.isdir(bdir): backdrops = [f for f in os.listdir(bdir) if f.lower().endswith((".svg", ".png", ".jpg", ".jpeg"))] # Include common image formats if os.path.isdir(sdir): sprites = [f for f in os.listdir(sdir) if f.lower().endswith((".svg", ".png", ".jpg", ".jpeg"))] # Include common image formats if os.path.isdir(sound_dir): # Check and list sound files sounds = [f for f in os.listdir(sound_dir) if f.lower().endswith((".wav", ".mp3", ".aiff"))] # Add more formats as needed logger.info("Successfully listed static assets.") except Exception as e: logger.error(f"Error listing static assets: {e}") return jsonify({"error": "Failed to list assets"}), 500 return jsonify(backdrops=backdrops, sprites=sprites, sounds=sounds) # Return sounds as well # --- Helper: build costume entries --- def make_costume_entry(folder, filename, name): asset_id = filename.rsplit(".", 1)[0] entry = { "name": name, "bitmapResolution": 1, "dataFormat": filename.rsplit(".", 1)[1], "assetId": asset_id, "md5ext": filename, } path = os.path.join(app.static_folder, "assets", folder, filename) # Check for image file extensions if filename.lower().endswith((".svg", ".png", ".jpg", ".jpeg")): if filename.lower().endswith(".svg"): # SVG files # For SVGs, default Scratch values are typically 240x180 for stage, others 0,0 if folder == "backdrops": entry["rotationCenterX"] = 240 entry["rotationCenterY"] = 180 else: entry["rotationCenterX"] = 0 entry["rotationCenterY"] = 0 else: # Raster image files try: img = Image.open(path) w, h = img.size entry["rotationCenterX"] = w // 2 entry["rotationCenterY"] = h // 2 except Exception as e: logger.warning(f"Could not determine image dimensions for {filename}: {e}. Setting center to 0,0.") entry["rotationCenterX"] = 0 entry["rotationCenterY"] = 0 else: logger.warning(f"Unknown asset type for {filename}. Setting center to 0,0.") entry["rotationCenterX"] = 0 entry["rotationCenterY"] = 0 # Default if not an image return entry # --- Helper: build sound entries --- def make_sound_entry(filename, name): asset_id = filename.rsplit(".", 1)[0] entry = { "name": name, "dataFormat": filename.rsplit(".", 1)[1], "rate": 44100, # Common sample rate for sounds "sampleCount": 0, # This would typically be calculated from the audio file "assetId": asset_id, "md5ext": filename, } # For a real implementation, you might want to use a library like pydub to get # the sampleCount and duration if needed for more complex sound handling. return entry # --- New function to create .sb3 archive --- def create_sb3_archive(project_folder, project_id): """ Zips the project folder and renames it to an .sb3 file. Args: project_folder (str): The path to the directory containing the project.json and assets. project_id (str): The unique ID for the project, used for naming the .sb3 file. Returns: str: The path to the created .sb3 file, or None if an error occurred. """ #output_filename = os.path.join("generated_projects", project_id) output_filename = GEN_PROJECT_DIR / project_id zip_path = None sb3_path = None try: # 1. Zip the folder, which will create a .zip file # shutil.make_archive creates a .zip, .tar, etc. archive. # The base_name is the path and name of the archive to create (without extension). # The format is 'zip', 'tar', 'gztar', 'bztar', or 'xztar'. # The root_dir is the directory to start archiving from. # The base_dir is the directory whose contents are archived. zip_path = shutil.make_archive(output_filename, 'zip', root_dir=project_folder) logger.info(f"Project folder zipped to: {zip_path}") # 2. Rename the .zip file to .sb3 sb3_path = f"{output_filename}.sb3" os.rename(zip_path, sb3_path) logger.info(f"Renamed {zip_path} to {sb3_path}") # Optionally, remove the original project folder after creating the .sb3 # Be cautious with this, as you might want to keep the unzipped contents for debugging or other purposes. # shutil.rmtree(project_folder) # logger.info(f"Removed original project folder: {project_folder}") return sb3_path except Exception as e: logger.error(f"Error creating SB3 archive for {project_id}: {e}") # Clean up any partial files if an error occurs if zip_path and os.path.exists(zip_path): os.remove(zip_path) if sb3_path and os.path.exists(sb3_path): os.remove(sb3_path) return None # --- New endpoint to fetch project.json --- @app.route("/get_project/", methods=["GET"]) def get_project(project_id): #project_folder = os.path.join("generated_projects", project_id) project_folder = GEN_PROJECT_DIR / project_id project_folder.mkdir(parents=True, exist_ok=True) #project_folder = send_from_directory(directory="generated_projects", project_id) project_json_path = os.path.join(project_folder, "project.json") try: if os.path.exists(project_json_path): logger.info(f"Serving project.json for project ID: {project_id}") return send_from_directory(project_folder, "project.json", as_attachment=True, download_name=f"{project_id}.json") else: logger.warning(f"Project JSON not found for ID: {project_id}") return jsonify({"error": "Project not found"}), 404 except Exception as e: logger.error(f"Error serving project.json for ID {project_id}: {e}") return jsonify({"error": "Failed to retrieve project"}), 500 # --- New endpoint to fetch assets --- @app.route("/get_asset//", methods=["GET"]) def get_asset(project_id, filename): #project_folder = os.path.join("generated_projects", project_id) project_folder = GEN_PROJECT_DIR / project_id project_folder.mkdir(parents=True, exist_ok=True) #project_folder = send_from_directory(directory="generated_projects", project_id) asset_path = os.path.join(project_folder, filename) try: if os.path.exists(asset_path): logger.info(f"Serving asset '{filename}' for project ID: {project_id}") return send_from_directory(project_folder, filename) else: logger.warning(f"Asset '{filename}' not found for project ID: {project_id}") return jsonify({"error": "Asset not found"}), 404 except Exception as e: logger.error(f"Error serving asset '{filename}' for project ID {project_id}: {e}") return jsonify({"error": "Failed to retrieve asset"}), 500 # This part is just for demonstration purposes to show app_graph workflow # In a real application, app_graph would be properly initialized. try: png_bytes = app_graph.get_graph().draw_mermaid_png() #with open("langgraph_workflow.png", "wb") as f: # f.write(png_bytes) except Exception as e: logger.warning(f"Could not draw or save LangGraph workflow diagram: {e}. This might be expected if app_graph is a mock.") # --- New endpoint to download the .sb3 file --- @app.route("/download_sb3/", methods=["GET"]) def download_sb3(project_id): """ Allows users to download the generated .sb3 Scratch project file. """ sb3_filename = f"{project_id}.sb3" #sb3_filepath = os.path.join("generated_projects", sb3_filename) sb3_filepath = GEN_PROJECT_DIR / sb3_filename #sb3_filepath = send_from_directory(directory="generated_projects", sb3_filename) try: if os.path.exists(sb3_filepath): logger.info(f"Serving SB3 file for project ID: {project_id}") # send_from_directory serves the file and handles content-disposition for download return send_from_directory( directory="generated_projects", path=sb3_filename, as_attachment=True, # This makes the browser download the file download_name=sb3_filename # This sets the filename for the download ) else: logger.warning(f"SB3 file not found for ID: {project_id}") return jsonify({"error": "Scratch project file not found"}), 404 except Exception as e: logger.error(f"Error serving SB3 file for ID {project_id}: {e}") return jsonify({"error": "Failed to retrieve Scratch project file"}), 500 # --- Modified `generate_game` Endpoint --- @app.route("/generate_game", methods=["POST"]) def generate_game(): payload = request.json logger.info(f"Received payload: {json.dumps(payload, indent=2)}") desc = payload.get("description", "") backdrops = payload.get("backdrops", []) sprites = payload.get("sprites", []) backdrop_sounds = payload.get("backdrop_sounds", {}) sprite_sounds = payload.get("sprite_sounds", {}) logger.info(f"Backdrops received: {backdrops}") logger.info(f"Sprites received: {sprites}") logger.info(f"Backdrop sounds received: {backdrop_sounds}") logger.info(f"Sprite sounds received: {sprite_sounds}") logger.info(f"Starting game generation for description: '{desc}'") # 1) Initial skeleton generation project_skeleton = { "targets": [ { "isStage": True, "name":"Stage", "objName": "Stage", "variables":{}, "lists":{}, "broadcasts":{}, "blocks":{}, "comments":{}, "currentCostume": len(backdrops)-1 if backdrops else 0, "costumes": [make_costume_entry("backdrops",b["filename"],b["name"]) for b in backdrops], "sounds": [], # Initialize an empty list to be populated with all unique backdrop sounds "volume":100,"layerOrder":0, "tempo":60,"videoTransparency":50,"videoState":"on", "textToSpeechLanguage": None } ], "monitors": [], "extensions": [], "meta":{ "semver":"3.0.0","vm":"11.1.0", "agent": request.headers.get("User-Agent","") } } # Populate sounds for the Stage target by consolidating sounds from all backdrops stage_target_sounds = [] seen_sound_md5ext = set() # To track unique sounds by their md5ext (filename) for b_data in backdrops: backdrop_name = b_data["name"] if backdrop_name in backdrop_sounds: for s in backdrop_sounds[backdrop_name]: sound_entry = make_sound_entry(s["filename"], s["name"]) if sound_entry["md5ext"] not in seen_sound_md5ext: stage_target_sounds.append(sound_entry) seen_sound_md5ext.add(sound_entry["md5ext"]) project_skeleton["targets"][0]["sounds"] = stage_target_sounds # Assign the collected unique sounds for idx, s_data in enumerate(sprites, start=1): costume = make_costume_entry("sprites", s_data["filename"], s_data["name"]) sprite_name = s_data["name"] # Get sounds for the current sprite current_sprite_sounds = sprite_sounds.get(sprite_name, []) sprite_sounds_list = [make_sound_entry(s["filename"], s["name"]) for s in current_sprite_sounds] project_skeleton["targets"].append({ "isStage": False, "name": sprite_name, "objName": sprite_name, "variables":{}, "lists":{}, "broadcasts":{}, "blocks":{}, "comments":{}, "currentCostume":0, "costumes":[costume], "sounds": sprite_sounds_list, # Add sprite sounds "volume":100, "layerOrder": idx+1, "visible":True, "x":0,"y":0,"size":100,"direction":90, "draggable":False, "rotationStyle":"all around" }) logger.info("Initial project skeleton created with sounds for stage and sprites.") project_id = str(uuid.uuid4()) #project_folder = os.path.join("generated_projects", project_id) #project_folder = send_from_directory(directory="generated_projects", project_id) project_folder = GEN_PROJECT_DIR / project_id project_folder.mkdir(parents=True, exist_ok=True) try: os.makedirs(project_folder, exist_ok=True) # Save initial skeleton and copy assets project_json_path = os.path.join(project_folder, "project.json") with open(project_json_path, "w") as f: json.dump(project_skeleton, f, indent=2) logger.info(f"Initial project skeleton saved to {project_json_path}") # Copy backdrops for b in backdrops: src_path = os.path.join(app.static_folder, "assets", "backdrops", b["filename"]) dst_path = os.path.join(project_folder, b["filename"]) if os.path.exists(src_path): shutil.copy(src_path, dst_path) else: logger.warning(f"Source backdrop asset not found: {src_path}") # Copy sprites for s in sprites: src_path = os.path.join(app.static_folder, "assets", "sprites", s["filename"]) dst_path = os.path.join(project_folder, s["filename"]) if os.path.exists(src_path): shutil.copy(src_path, dst_path) else: logger.warning(f"Source sprite asset not found: {src_path}") # Copy backdrop sounds for backdrop_name, sounds_list in backdrop_sounds.items(): for s in sounds_list: src_path = os.path.join(app.static_folder, "assets", "sounds", s["filename"]) dst_path = os.path.join(project_folder, s["filename"]) if os.path.exists(src_path): shutil.copy(src_path, dst_path) else: logger.warning(f"Source sound asset for backdrop '{backdrop_name}' not found: {src_path}") # Copy sprite sounds for sprite_name, sounds_list in sprite_sounds.items(): for s in sounds_list: src_path = os.path.join(app.static_folder, "assets", "sounds", s["filename"]) dst_path = os.path.join(project_folder, s["filename"]) if os.path.exists(src_path): shutil.copy(src_path, dst_path) else: logger.warning(f"Source sound asset for sprite '{sprite_name}' not found: {src_path}") logger.info("Assets (including sounds for backdrops and sprites) copied to project folder.") # Initialize the state for LangGraph as a dictionary matching the TypedDict structure initial_state_dict = { "project_json": project_skeleton, "description": desc, "project_id": project_id, "sprite_initial_positions": {}, "action_plan": {}, "improvement_plan": {}, "needs_improvement": False, "plan_validation_feedback": {}, "iteration_count": 0, "review_block_feedback": {}, "declaration_plan": {}, "temporary_node": {}, } final_state_dict = app_graph.invoke(initial_state_dict) # Pass dictionary final_project_json = final_state_dict['project_json'] # Access as dict # Save the *final* filled project JSON, overwriting the skeleton with open(project_json_path, "w") as f: json.dump(final_project_json, f, indent=2) logger.info(f"Final project JSON saved to {project_json_path}") # --- Call the new function to create the .sb3 file --- sb3_file_path = create_sb3_archive(project_folder, project_id) if sb3_file_path: logger.info(f"Successfully created SB3 file: {sb3_file_path}") # Instead of returning the local path, return a URL to the download endpoint download_url = f"/download_sb3/{project_id}" return jsonify({"message": "Game generated successfully", "project_id": project_id, "download_url": download_url}) else: return jsonify(error="Failed to create SB3 archive"), 500 except Exception as e: logger.error(f"Error during game generation workflow for project ID {project_id}: {e}", exc_info=True) return jsonify(error=f"Error generating game: {e}"), 500 if __name__=="__main__": logger.info("Starting Flask application...") # Create the 'generated_projects' and 'static/assets' directories if they don't exist #os.makedirs("generated_projects", exist_ok=True) #os.makedirs("static/assets/backdrops", exist_ok=True) #os.makedirs("static/assets/sprites", exist_ok=True) #os.makedirs("static/assets/sounds", exist_ok=True) for d in (GEN_PROJECT_DIR, BACKDROP_DIR, SPRITE_DIR, SOUND_DIR): d.mkdir(parents=True, exist_ok=True) app.run(debug=True,port=5000)