Spaces:
No application file
No application file
from flask import Flask, request, jsonify, render_template, send_from_directory | |
from langgraph.prebuilt import create_react_agent | |
from langchain_groq import ChatGroq | |
#from langgraph.graph import draw | |
from langchain.chat_models import ChatOpenAI | |
from dotenv import load_dotenv | |
from langchain_core.utils.utils import secret_from_env | |
from langchain_openai import ChatOpenAI | |
from pydantic import Field, SecretStr | |
from PIL import Image | |
import os, json, re | |
import shutil | |
import uuid | |
from langgraph.graph import StateGraph, END | |
import logging | |
from typing import Dict, TypedDict, Optional, Any, List | |
# --- Configure logging --- | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
app = Flask(__name__) | |
# --- LLM / Vision model setup --- | |
os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY_2", "default_key_or_placeholder") | |
os.environ["OPENROUTER_API_KEY"] = os.getenv("OPENROUTER_API_KEY", "default_key_or_placeholder") | |
os.environ["OPENROUTER_BASE_URL"] = os.getenv("OPENROUTER_BASE_URL", "default_key_or_placeholder") | |
groq_key = os.environ["GROQ_API_KEY"] | |
if not groq_key or groq_key == "default_key_or_placeholder": | |
logger.critical("GROQ_API_KEY environment variable is not set or invalid. Please set it to proceed.") | |
raise ValueError("GROQ_API_KEY environment variable is not set or invalid.") | |
#Main LLM for the SCRATCH 3.0 Agent | |
llm = ChatGroq( | |
#model="deepseek-r1-distill-llama-70b", | |
#model="llama-3.3-70b-versatile", | |
#model="gemma2-9b-it", | |
#model="compound-beta-mini", | |
#model="meta-llama/llama-4-maverick-17b-128e-instruct", | |
model="meta-llama/llama-4-scout-17b-16e-instruct", | |
temperature=0.0, | |
) | |
# Json debugger [temporary] | |
llm2 = ChatGroq( | |
model="deepseek-r1-distill-llama-70b", | |
#model="llama-3.3-70b-versatile", | |
#model="gemma2-9b-it", | |
#model="compound-beta-mini", | |
#model="meta-llama/llama-4-maverick-17b-128e-instruct", | |
#model="meta-llama/llama-4-scout-17b-16e-instruct", | |
temperature=0.0, | |
) | |
class ChatOpenRouter(ChatOpenAI): | |
openai_api_key: Optional[SecretStr] = Field( | |
alias="api_key", | |
default_factory=secret_from_env("OPENROUTER_API_KEY", default=None), | |
) | |
def lc_secrets(self) -> dict[str, str]: | |
return {"openai_api_key": "OPENROUTER_API_KEY"} | |
def __init__(self, | |
openai_api_key: Optional[str] = None, | |
**kwargs): | |
openai_api_key = ( | |
openai_api_key or os.environ.get("OPENROUTER_API_KEY") | |
) | |
super().__init__( | |
base_url="https://openrouter.ai/api/v1", | |
openai_api_key=openai_api_key, | |
**kwargs | |
) | |
# llm = ChatOpenRouter( | |
# #model_name="deepseek/deepseek-r1-0528:free", | |
# #model_name="google/gemini-2.0-flash-exp:free", | |
# #model_name="deepseek/deepseek-v3-base:free", | |
# model_name="deepseek/deepseek-r1:free" | |
# ) | |
# Refined SYSTEM_PROMPT with more explicit Scratch JSON rules, especially for variables | |
SYSTEM_PROMPT = """ | |
You are an expert AI assistant named GameScratchAgent, specialized in generating and modifying Scratch-VM 3.x game project JSON. | |
Your core task is to process game descriptions and existing Scratch JSON structures, then produce or update JSON segments accurately. | |
You possess deep knowledge of Scratch 3.0 project schema, informed by comprehensive reference materials. When generating or modifying the `blocks` section, pay extremely close attention to the following: | |
**Scratch Project JSON Schema Rules:** | |
1. **Target Structure (`project.json`'s `targets` array):** | |
* Each object in the `targets` array represents a Stage or a Sprite. | |
* `isStage`: A boolean indicating if the target is the Stage (`true`) or a Sprite (`false`). | |
* `name`: The name of the Stage (e.g., `"Stage"`) or the Sprite (e.g., `"Cat"`). This property replaces `objName` found in older Scratch versions. | |
* `variables` dictionary: This dictionary maps unique variable IDs to arrays `[variable_name, initial_value, isCloudVariable?]`. | |
* `variable_name`: The user-defined name of the variable. | |
* `initial_value`: The variable's initial value, which can be a number or a string. | |
* `isCloudVariable?`: (Optional) A boolean indicating if it's a cloud variable (`true`) or a local variable (`false` or absent for regular variables). | |
* Example: `"myVarId123": ["score", 0]`, `"cloudVarId456": ["☁ High Score", "54", true]` | |
* `lists` dictionary: This dictionary maps unique list IDs to arrays `[list_name, [item1, item2, ...]]`. | |
* Example: `"myListId789": ["my list", ["apple", "banana"]]` | |
* `broadcasts` dictionary: This dictionary maps unique broadcast IDs to their names. | |
* Example: `"myBroadcastId": "Game Over"` | |
* `blocks` dictionary: This dictionary contains all the blocks belonging to this target. Keys are block IDs, values are block objects. | |
2. **Block Structure (within a `target`'s `blocks` dictionary):** | |
* Every block object must have the following core properties: | |
* [cite_start]`opcode`: A unique internal identifier for the block's specific functionality (e.g., `"motion_movesteps"`, `"event_whenflagclicked"`)[cite: 31, 18, 439, 452]. | |
* `parent`: The ID of the block directly above it in the script stack (or `null` for a top-level block). | |
* `next`: The ID of the block directly below it in the script stack (or `null` for the end of a stack). | |
* `inputs`: An object defining values or blocks plugged into the block's input slots. Values are **arrays**. | |
* `fields`: An object defining dropdown menu selections or direct internal values within the block. Values are **arrays**. | |
* `shadow`: `true` if it's a shadow block (e.g., a default number input that can be replaced by another block), `false` otherwise. | |
* `topLevel`: `true` if it's a hat block or a standalone block (not connected to a parent), `false` otherwise. | |
3. **`inputs` Property Details (for blocks plugged into input slots):** | |
* **Direct Block Connection (Reporter/Boolean block plugged in):** | |
* Format: `"<INPUT_NAME>": [1, "<blockId_of_plugged_block>"]` | |
* Example: `"CONDITION": [1, "someBooleanBlockId"]` (e.g., for an `if` block). | |
* **Literal Value Input (Shadow block with a literal):** | |
* Format: `"<INPUT_NAME>": [1, [<type_code>, "<value_string>"]]` | |
* `type_code`: A numeric code representing the data type. Common codes include: `4` for number, `7` for string/text, `10` for string/message. | |
* `value_string`: The literal value as a string. | |
* Examples: | |
* Number: `"STEPS": [1, [4, "10"]]` (for `move 10 steps` block). | |
* String/Text: `"MESSAGE": [1, [7, "Hello"]]` (for `say Hello` block). | |
* String/Message (common for text inputs): `"MESSAGE": [1, [10, "Hello!"]]` (for `say Hello! for 2 secs`). | |
* **C-Block Substack (blocks within a loop or conditional):** | |
* Format: `"<SUBSTACK_NAME>": [2, "<blockId_of_first_block_in_substack>"]` | |
* Common `SUBSTACK_NAME` values are `SUBSTACK` (for `if`, `forever`, `repeat`) and `SUBSTACK2` (for `else` in `if else`). | |
* Example: `"SUBSTACK": [2, "firstBlockInLoopId"]` | |
4. **`fields` Property Details (for dropdowns or direct internal values):** | |
* Used for dropdown menus, variable names, list names, or other static selections directly within the block. | |
* Format: `"<FIELD_NAME>": ["<selected_value>", null]` | |
* Examples: | |
* Dropdown: `"KEY_OPTION": ["space", null]` (for `when space key pressed`). | |
* Variable Name: `"VARIABLE": ["score", null]` (for `set score to 0`). | |
* Direction (specific motion block): `"FORWARD_BACKWARD": ["forward", null]` (for `go forward layers`). | |
5. **Unique IDs:** | |
* All block IDs, variable IDs, and list IDs must be unique strings (e.g., "myBlock123", "myVarId456", "myListId789"). Do NOT use placeholder strings like "block_id_here". | |
6. **No Nested `blocks` Dictionary:** | |
* The `blocks` dictionary should only appear once per `target` (sprite/stage). Do NOT nest a `blocks` dictionary inside an individual block definition. Blocks that are part of a substack are linked via the `SUBSTACK` input. | |
7. **Asset Properties (for Costumes/Sounds):** | |
* `assetId`, `md5ext`, `bitmapResolution`, `rotationCenterX`/`rotationCenterY` should be correctly associated with costume and sound objects within the `costumes` and `sounds` arrays. | |
**General Principles and Important Considerations:** | |
* **Backward Compatibility:** Adhere strictly to existing Scratch 3.0 opcodes and schema to ensure backward compatibility with older projects. [cite_start]Opcodes must remain consistent to prevent previously saved projects from failing to load or behaving unexpectedly[cite: 18, 19, 25, 65]. | |
* **Forgiving Inputs:** Recognize that Scratch is designed to be "forgiving in its interpretation of inputs." [cite_start]The Scratch VM handles potentially "invalid" inputs gracefully (e.g., converting a number to a string if expected, returning default values like zero or empty strings, or performing no action) rather than crashing[cite: 20, 21, 22, 38, 39, 41]. This implies that precise type matching for inputs might be handled internally by Scratch, allowing for some flexibility in how values are provided, but the agent should aim for the most common and logical type. | |
""" | |
SYSTEM_PROMPT_JSON_CORRECTOR =""" | |
You are an assistant that outputs JSON responses strictly following the given schema. | |
If the JSON you produce has any formatting errors, missing required fields, or invalid structure, you must identify the problems and correct them. | |
Always return only valid JSON that fully conforms to the schema below, enclosed in triple backticks (```), without any extra text or explanation. | |
If you receive an invalid or incomplete JSON response, fix it by: | |
- Adding any missing required fields with appropriate values. | |
- Correcting syntax errors such as missing commas, brackets, or quotes. | |
- Ensuring the JSON structure matches the schema exactly. | |
Remember: Your output must be valid JSON only, ready to be parsed without errors. | |
""" | |
# Main agent of the system agent for Scratch 3.0 | |
agent = create_react_agent( | |
model=llm, | |
tools=[], # No specific tools are defined here, but could be added later | |
prompt=SYSTEM_PROMPT | |
) | |
# debugger and resolver agent for Scratch 3.0 | |
agent_json_resolver = create_react_agent( | |
model=llm, | |
tools=[], # No specific tools are defined here, but could be added later | |
prompt=SYSTEM_PROMPT_JSON_CORRECTOR | |
) | |
### LangGraph Workflow Definition | |
# Define a state for your graph using TypedDict | |
class GameState(TypedDict): | |
project_json: dict | |
description: str | |
project_id: str | |
sprite_initial_positions: dict # Add this as well, as it's part of your state | |
action_plan: Optional[Dict] | |
#behavior_plan: Optional[Dict] | |
improvement_plan: Optional[Dict] | |
needs_improvement: bool | |
plan_validation_feedback: Optional[Dict] | |
iteration_count: int # Track the number of iterations for improvements | |
review_block_feedback: Optional[Dict] # Feedback from the agent on the blocks after verification | |
declaration_plan: Optional[Dict] | |
# Helper function to update project JSON with sprite positions | |
import copy | |
def update_project_with_sprite_positions(project_json: dict, sprite_positions: dict) -> dict: | |
""" | |
Update the 'x' and 'y' coordinates of sprites in the Scratch project JSON. | |
Args: | |
project_json (dict): Original Scratch project JSON. | |
sprite_positions (dict): Dict mapping sprite names to {'x': int, 'y': int}. | |
Returns: | |
dict: Updated project JSON with new sprite positions. | |
""" | |
updated_project = copy.deepcopy(project_json) | |
for target in updated_project.get("targets", []): | |
if not target.get("isStage", False): | |
sprite_name = target.get("name") | |
if sprite_name in sprite_positions: | |
pos = sprite_positions[sprite_name] | |
if "x" in pos and "y" in pos: | |
target["x"] = pos["x"] | |
target["y"] = pos["y"] | |
return updated_project | |
# Helper function to load the block catalog from a JSON file | |
def _load_block_catalog(file_path: str) -> Dict: | |
"""Loads the Scratch block catalog from a specified JSON file.""" | |
try: | |
with open(file_path, 'r') as f: | |
catalog = json.load(f) | |
logger.info(f"Successfully loaded block catalog from {file_path}") | |
return catalog | |
except FileNotFoundError: | |
logger.error(f"Error: Block catalog file not found at {file_path}") | |
# Return an empty dict or raise an error, depending on desired behavior | |
return {} | |
except json.JSONDecodeError as e: | |
logger.error(f"Error decoding JSON from {file_path}: {e}") | |
return {} | |
except Exception as e: | |
logger.error(f"An unexpected error occurred while loading {file_path}: {e}") | |
return {} | |
# --- Global variable for the block catalog --- | |
ALL_SCRATCH_BLOCKS_CATALOG = {} | |
BLOCK_CATALOG_PATH = r"blocks\blocks.json" # Define the path to your JSON file | |
HAT_BLOCKS_PATH = r"blocks\hat_blocks.json" # Path to the hat blocks JSON file | |
STACK_BLOCKS_PATH = r"blocks\stack_blocks.json" # Path to the stack blocks JSON file | |
REPORTER_BLOCKS_PATH = r"blocks\reporter_blocks.json" # Path to the reporter blocks JSON file | |
BOOLEAN_BLOCKS_PATH = r"blocks\boolean_blocks.json" # Path to the boolean blocks JSON file | |
C_BLOCKS_PATH = r"blocks\c_blocks.json" # Path to the C blocks JSON file | |
CAP_BLOCKS_PATH = r"blocks\cap_blocks.json" # Path to the cap blocks JSON file | |
# Load the block catalogs from their respective JSON files | |
hat_block_data = _load_block_catalog(HAT_BLOCKS_PATH) | |
hat_description = hat_block_data["description"] | |
hat_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in hat_block_data["blocks"]]) | |
print("Hat blocks loaded successfully.", hat_description) | |
boolean_block_data = _load_block_catalog(BOOLEAN_BLOCKS_PATH) | |
boolean_description = boolean_block_data["description"] | |
boolean_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in boolean_block_data["blocks"]]) | |
c_block_data = _load_block_catalog(C_BLOCKS_PATH) | |
c_description = c_block_data["description"] | |
c_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in c_block_data["blocks"]]) | |
cap_block_data = _load_block_catalog(CAP_BLOCKS_PATH) | |
cap_description = cap_block_data["description"] | |
cap_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in cap_block_data["blocks"]]) | |
reporter_block_data = _load_block_catalog(REPORTER_BLOCKS_PATH) | |
reporter_description = reporter_block_data["description"] | |
reporter_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in reporter_block_data["blocks"]]) | |
stack_block_data = _load_block_catalog(STACK_BLOCKS_PATH) | |
stack_description = stack_block_data["description"] | |
stack_opcodes_functionalities = "\n".join([f" - Opcode: {block['op_code']}, functionality: {block['functionality']}" for block in stack_block_data["blocks"]]) | |
# This makes ALL_SCRATCH_BLOCKS_CATALOG available globally | |
ALL_SCRATCH_BLOCKS_CATALOG = _load_block_catalog(BLOCK_CATALOG_PATH) | |
# Helper function to generate a unique block ID | |
def generate_block_id(): | |
"""Generates a short, unique ID for a Scratch block.""" | |
return str(uuid.uuid4())[:10].replace('-', '') # Shorten for readability, ensure uniqueness | |
# Placeholder for your extract_json_from_llm_response function | |
# # Helper function to extract JSON from LLM response | |
def extract_json_from_llm_response(raw_response: str) -> dict: | |
# --- 1) Pull out the JSON code‑block if present --- | |
md = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", raw_response) | |
json_string = md.group(1).strip() if md else raw_response | |
# --- 2) Trim to the outermost { … } so we drop any prefix/suffix junk --- | |
first, last = json_string.find('{'), json_string.rfind('}') | |
if 0 <= first < last: | |
json_string = json_string[first:last+1] | |
# --- 3) PRE‑CLEANUP: remove stray assistant{…}, rogue assistant keys, fix boolean quotes --- | |
json_string = re.sub(r'\b\w+\s*{', '{', json_string) | |
json_string = re.sub(r'"assistant"\s*:', '', json_string) | |
json_string = re.sub(r'\b(false|true)"', r'\1', json_string) | |
logger.debug("Ran pre‑cleanup for stray tokens and boolean quotes.") | |
# --- 3.1) Fix stray inner quotes at start of name/list values --- | |
# e.g., { "name": " \"recent_scoress\"", ... } → "recent_scoress" | |
json_string = re.sub( | |
r'("name"\s*:\s*")\s*"', | |
r'\1', | |
json_string | |
) | |
# --- 4) Escape all embedded quotes in any `logic` value up to the next key --- | |
def _esc(m): | |
prefix, body = m.group(1), m.group(2) | |
return prefix + body.replace('"', r'\"') | |
json_string = re.sub( | |
r'("logic"\s*:\s*")([\s\S]+?)(?=",\s*"[A-Za-z_]\w*"\s*:\s*)', | |
_esc, | |
json_string | |
) | |
logger.debug("Escaped embedded quotes in logic fields.") | |
# --- 5) Quote any unquoted keys (x: 350 → "x": 350) --- | |
# Uncomment if needed: | |
# json_string = re.sub( | |
# r'(?P<prefix>[\{\s,])(?P<key>[A-Za-z_]\w*)\s*:', | |
# lambda m: f'{m.group("prefix")}"{m.group("key")}":', | |
# json_string | |
# ) | |
logger.debug("Quoted unquoted keys.") | |
# --- 6) Remove trailing commas before } or ] --- | |
json_string = re.sub(r',\s*(?=[}\],])', '', json_string) | |
json_string = re.sub(r',\s*,', ',', json_string) | |
logger.debug("Removed trailing commas.") | |
# --- 7) Balance braces: drop extra } at end if needed --- | |
ob, cb = json_string.count('{'), json_string.count('}') | |
if cb > ob: | |
excess = cb - ob | |
json_string = json_string.rstrip()[:-excess] | |
logger.debug(f"Stripped {excess} extra closing brace(s).") | |
# --- 8) Escape literal newlines in *all* string values --- | |
json_string = re.sub( | |
r'"((?:[^"\\]|\\.)*?)"', | |
lambda m: '"' + m.group(1).replace('\n', '\\n').replace('\r', '\\r') + '"', | |
json_string, | |
flags=re.DOTALL | |
) | |
logger.debug("Escaped newlines in strings.") | |
# --- 9) Final parse attempt --- | |
try: | |
return json.loads(json_string) | |
except json.JSONDecodeError: | |
logger.error("Sanitized JSON still invalid:\n%s", json_string) | |
raise | |
def strip_noise(s: str) -> str: | |
# 1. Remove any <|…|> markers | |
s = re.sub(r"<\|.*?\|>", "", s) | |
# 2. Remove any 'assistant<' plus following non‑whitespace | |
s = re.sub(r"assistant<\S*", "", s) | |
# 3. Strip stray angle‑brackets | |
s = re.sub(r"[<>]", "", s) | |
# 4. Fix malformed empty keys like sensing"": → sensing": | |
s = re.sub(r'(\w+)""', r'\1"', s) | |
# 5. Deduplicate any immediately–repeated JSON keys, e.g. | |
# "control": […], "control": […] → keep only the first | |
s = re.sub( | |
r'("(?P<key>\w+)":\s*(?P<first>\[[^\]]*\]))\s*,\s*"\2":\s*\[[^\]]*\]', | |
lambda m: m.group(1), | |
s | |
) | |
# 6. Collapse multiple blank lines and extra spaces | |
s = re.sub(r"\n\s*\n+", "\n\n", s) | |
s = re.sub(r" {2,}", " ", s) | |
return s.strip() | |
# def extract_json_from_llm_response(raw_response: str) -> dict: | |
# """ | |
# Extracts a JSON object from an LLM response string, robustly handling | |
# various common LLM output formats and parsing errors, including extra data. | |
# """ | |
# json_string_to_parse = raw_response | |
# # --- Step 1: Try to extract JSON from markdown code block (most common and cleanest) --- | |
# markdown_match = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", raw_response) | |
# if markdown_match: | |
# json_string_to_parse = markdown_match.group(1).strip() | |
# logger.debug("Extracted potential JSON from markdown block.") | |
# else: | |
# logger.debug("No markdown JSON block found. Attempting to parse raw response directly.") | |
# # --- Step 2: Aggressively clean the string to isolate a single JSON object --- | |
# # Find the first '{' and the last '}' that might form a complete JSON object. | |
# first_brace = json_string_to_parse.find('{') | |
# last_brace = json_string_to_parse.rfind('}') | |
# if first_brace != -1 and last_brace != -1 and last_brace > first_brace: | |
# json_string_to_parse = json_string_to_parse[first_brace : last_brace + 1] | |
# logger.debug(f"Trimmed string to outermost JSON braces. Length: {len(json_string_to_parse)}") | |
# else: | |
# logger.warning("Could not find balanced outermost braces. Proceeding with raw string.") | |
# # IMPORTANT: REMOVED/COMMENTED OUT THIS LINE! | |
# # 2.1) Fix unquoted keys: This is a common LLM error. | |
# # json_string_to_parse = re.sub(r'([{\s,])(\w+)\s*:', r'\1"\2":', json_string_to_parse) | |
# # logger.debug("Fixed unquoted keys.") | |
# # The LLM's raw output for keys is good, and this regex was over-applying. | |
# # 2.2) Remove problematic extra commas like ",," or ",}" or ",]" | |
# json_string_to_parse = re.sub(r',\s*(?=[}\],])', '', json_string_to_parse) # Remove comma before } or ] | |
# json_string_to_parse = re.sub(r',\s*,', ',', json_string_to_parse) # Replace double commas with single | |
# logger.debug("Removed problematic extra commas.") | |
# # 2.3) Escape newlines/carriage returns in string values | |
# # This regex attempts to find string values and replace internal newlines. | |
# # It specifically targets values within double quotes that might contain newlines. | |
# # It also handles existing escaped quotes to prevent malformation: | |
# json_string_to_parse = re.sub(r'"((?:[^"\\]|\\.)*)"', lambda m: '"' + m.group(1).replace('\n', '\\n').replace('\r', '\\r') + '"', json_string_to_parse, flags=re.DOTALL) | |
# logger.debug("Escaped newlines in string values.") | |
# # --- Step 3: Attempt to parse the (sanitized) JSON string --- | |
# try: | |
# parsed_json = json.loads(json_string_to_parse) | |
# logger.info("Successfully parsed JSON from LLM response.") | |
# return parsed_json | |
# except json.JSONDecodeError as original_error: | |
# logger.error(f"Failed to parse JSON even after sanitization. Error: {original_error!r}") | |
# logger.error(f"Problematic JSON string (full length: {len(json_string_to_parse)}):\n{json_string_to_parse}") | |
# raise original_error | |
# def extract_json_from_llm_response(raw_response: str) -> dict: | |
# """ | |
# Extracts a JSON object from an LLM response string, robustly handling | |
# various common LLM output formats and parsing errors. | |
# """ | |
# json_string_to_parse = raw_response | |
# # --- Step 1: Try to extract JSON from markdown code block (most common and cleanest) --- | |
# # This pattern is more robust: it matches "```json" or "```" (for general code blocks) | |
# # and captures everything until the next "```" | |
# markdown_match = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", raw_response) | |
# if markdown_match: | |
# json_string_to_parse = markdown_match.group(1).strip() | |
# logger.debug("Extracted potential JSON from markdown block.") | |
# else: | |
# logger.debug("No markdown JSON block found. Attempting to parse raw response.") | |
# # --- Step 2: Pre-process for common LLM JSON generation errors --- | |
# # 2.1) Escape newlines/carriage returns in string values | |
# # This is a general fix for all string values, not just 'feedback' | |
# # It looks for "key": "value" patterns and replaces newlines within 'value' | |
# # This might be tricky to get right for all cases without a full parser. | |
# # A safer bet is to hope the LLM formats strings correctly, or to rely on direct JSON parsing. | |
# # For now, let's keep your feedback-specific one if you find LLM adds newlines there, | |
# # but the primary error is structural. | |
# # Re-apply feedback escaping if you still suspect newlines in specific fields | |
# feedback_pattern = r'("feedback"\s*:\s*")(.+?)("(?:\s*,|\s*\})?)' # Adjusted regex to handle end of object/array | |
# def _escape_feedback(m): | |
# prefix, val, suffix = m.groups() | |
# safe_val = val.replace("\r", "\\r").replace("\n", "\\n") | |
# return prefix + safe_val + suffix | |
# json_string_to_parse = re.sub(feedback_pattern, _escape_feedback, json_string_to_parse, flags=re.DOTALL) | |
# logger.debug("Applied feedback newline escaping.") | |
# # --- Step 3: Attempt to parse the (sanitized) JSON string --- | |
# try: | |
# parsed_json = json.loads(json_string_to_parse) | |
# logger.info("Successfully parsed JSON from LLM response.") | |
# return parsed_json | |
# except json.JSONDecodeError as original_error: | |
# # If parsing fails, log the problematic string and the error for debugging | |
# logger.error(f"Failed to parse JSON. Error: {original_error!r}") | |
# logger.error(f"Problematic JSON string (start):\n{json_string_to_parse[:1000]}...") # Log first 1000 chars | |
# logger.error(f"Problematic JSON string (full length: {len(json_string_to_parse)}):\n{json_string_to_parse}") # Log full for deep dive | |
# # Attempt a fallback for common structural issues (e.g., extra trailing commas) | |
# # This is speculative and may not always work, but worth a try before failing. | |
# try: | |
# # Remove trailing commas from objects and arrays | |
# # This regex is simplified and might not catch all cases, but handles common ones | |
# # For example, {"a": 1,} -> {"a": 1} | |
# # Or [1, 2,] -> [1, 2] | |
# sanitized_for_trailing_commas = re.sub(r',\s*([}\]])', r'\1', json_string_to_parse) | |
# logger.warning("Attempting to parse after removing potential trailing commas.") | |
# return json.loads(sanitized_for_trailing_commas) | |
# except json.JSONDecodeError as e_fallback: | |
# logger.error(f"Fallback parsing also failed. Error: {e_fallback!r}") | |
# # The original error is more informative for 'Expecting ,' delimiter, so raise that. | |
# raise original_error # Re-raise the original, more specific JSONDecodeError | |
# def extract_json_from_llm_response(raw_response: str) -> dict: | |
# """ | |
# Extracts a JSON object from an LLM response string, robustly handling | |
# various common LLM output formats and parsing errors. | |
# """ | |
# json_string_to_parse = raw_response | |
# # --- Step 1: Try to extract JSON from markdown code block (most common and cleanest) --- | |
# markdown_match = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", raw_response) | |
# if markdown_match: | |
# json_string_to_parse = markdown_match.group(1).strip() | |
# logger.debug("Extracted potential JSON from markdown block.") | |
# else: | |
# logger.debug("No markdown JSON block found. Attempting to parse raw response directly.") | |
# # --- Step 2: Pre-process for common LLM JSON generation errors --- | |
# # 2.1) Remove common LLM conversational filler outside of JSON. | |
# # This assumes the JSON is the primary content, not embedded within prose. | |
# # If the JSON is always in a markdown block, this might be less necessary. | |
# # This is a heuristic and might need adjustment based on LLM's typical output. | |
# json_string_to_parse = re.sub(r"^(.*?){", "{", json_string_to_parse, 1, flags=re.DOTALL) | |
# json_string_to_parse = re.sub(r"}(.*?)$", "}", json_string_to_parse, 1, flags=re.DOTALL) | |
# logger.debug("Removed potential pre/post JSON conversational filler.") | |
# # 2.2) Fix unquoted keys: This is a common LLM error. | |
# # It finds words followed by a colon and ensures they are quoted. | |
# # Example: {key: "value"} -> {"key": "value"} | |
# json_string_to_parse = re.sub(r'([{\s,])(\w+)\s*:', r'\1"\2":', json_string_to_parse) | |
# logger.debug("Fixed unquoted keys.") | |
# # 2.3) Remove problematic extra commas like ",," or ",}" or ",]" | |
# # This is more robust than just trailing commas at the end of objects/arrays. | |
# # It removes any comma that is immediately followed by another comma, | |
# # or by a closing brace/bracket, without a key-value pair in between. | |
# json_string_to_parse = re.sub(r',\s*(?=[}\],])', '', json_string_to_parse) # Remove comma before } or ] | |
# json_string_to_parse = re.sub(r',\s*,', ',', json_string_to_parse) # Replace double commas with single | |
# logger.debug("Removed problematic extra commas.") | |
# # 2.4) Escape newlines/carriage returns in string values | |
# # This is a general fix for all string values. | |
# # This regex attempts to find string values and replace internal newlines. | |
# # A more robust solution might require a custom JSON parser, but this is a common heuristic. | |
# # It specifically targets values within double quotes that might contain newlines. | |
# json_string_to_parse = re.sub(r'"([^"\\]*(?:\\.[^"\\]*)*)"', lambda m: '"' + m.group(1).replace('\n', '\\n').replace('\r', '\\r') + '"', json_string_to_parse, flags=re.DOTALL) | |
# logger.debug("Escaped newlines in string values.") | |
# # --- Step 3: Attempt to parse the (sanitized) JSON string --- | |
# try: | |
# parsed_json = json.loads(json_string_to_parse) | |
# logger.info("Successfully parsed JSON from LLM response.") | |
# return parsed_json | |
# except json.JSONDecodeError as original_error: | |
# logger.error(f"Failed to parse JSON even after sanitization. Error: {original_error!r}") | |
# logger.error(f"Problematic JSON string (full length: {len(json_string_to_parse)}):\n{json_string_to_parse}") | |
# raise original_error | |
# Node 1: Detailed description generator. | |
def game_description_node(state: GameState): | |
""" | |
Generates a detailed narrative description of the game based on the initial query. | |
""" | |
logger.info("--- Running GameDescriptionNode ---") | |
sprite_name = {} | |
initial_description = state.get("description", "A simple game.") | |
project_json = state["project_json"] | |
for target in project_json["targets"]: | |
sprite_name[target["name"]] = target["name"] | |
description_prompt = ( | |
f"You are an AI assistant tasked with generating a detailed narrative description for a Scratch 3.0 game.\n" | |
f"The initial high-level description is: '{initial_description}'.\n" | |
f"The current Scratch project JSON is:\n```json\n{json.dumps(state['project_json'], indent=2)}\n```\n" | |
f"Make sure you donot change Sprite and Stage name. Here are all the name: {sprite_name} \n" | |
f"Create a rich, engaging, and detailed description, including potential gameplay elements, objectives, and overall feel with the available resources\n" | |
f"The output should be a plain text description." | |
) | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": description_prompt}]}) | |
detailed_game_description = response["messages"][-1].content | |
state["description"] = detailed_game_description#strip_noise(detailed_game_description) | |
logger.info("Detailed game description generated by GameDescriptionNode.") | |
print(f"Detailed Game Description: {detailed_game_description}") | |
return state | |
except Exception as e: | |
logger.error(f"Error in GameDescriptionNode: {e}") | |
raise | |
# Node 2: Analysis User Query and Initial Positions | |
def parse_query_and_set_initial_positions(state: GameState): | |
logger.info("--- Running ParseQueryNode ---") | |
llm_query_prompt = f"""Based on the user's game description: '{state['description']}', \ | |
stage description: the Stage's center is `(0,0)` and height is from `(0,-200)` to `(0,200)` and width is `(-200,0)` to `(200,0)`, | |
and the current Scratch project JSON below, \ | |
determine the most appropriate initial 'x' and 'y' coordinates for each sprite. \ | |
Return ONLY a JSON object with a single key 'sprite_initial_positions' mapping sprite names to their {{'x': int, 'y': int}} coordinates. | |
The current Scratch project JSON is: | |
```json | |
{json.dumps(state['project_json'], indent=2)} | |
``` | |
Example Json output: | |
```json | |
{{ | |
"sprite_initial_positions": {{ | |
"Sprite1": {{"x": -160, "y": -110}}, | |
"Sprite2": {{"x": 240, "y": -135}} | |
}} | |
}} | |
``` | |
""" | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": llm_query_prompt}]}) | |
raw_response = response["messages"][-1].content#strip_noise(response["messages"][-1].content) | |
print("Raw response from LLM:", raw_response) | |
# json debugging and solving | |
try: | |
updated_data = extract_json_from_llm_response(raw_response) | |
sprite_positions = updated_data.get("sprite_initial_positions", {}) | |
except json.JSONDecodeError as error_json: | |
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") | |
# Use the JSON resolver agent to fix the response | |
correction_prompt = ( | |
"Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n" | |
"Carefully review the JSON for any errors, especially focusing on the reported error at:\n" | |
f"- **Error Details**: {error_json}\n\n" | |
"**Strict Instructions for your response:**\n" | |
"1. **ONLY** output the corrected JSON. Do not include any other text, comments, or explanations outside the JSON.\n" | |
"2. Ensure all property names (keys) are enclosed in **double quotes**.\n" | |
"3. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\\"`) are properly **escaped**.\n" | |
"4. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n" | |
"5. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n" | |
"6. The corrected JSON must be a **complete and valid** JSON object.\n\n" | |
"Here is the problematic JSON string to correct:\n" | |
"```json\n" # Use markdown for clear JSON distinction | |
f"{raw_response}\n" | |
"```\n" | |
"Corrected JSON:\n" # Indicate where the corrected JSON should start | |
) | |
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) | |
print(f"[JSON CORRECTOR RESPONSE AT PARSER]: {correction_response["messages"][-1].content}") | |
corrected_data = extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content)) | |
sprite_positions = corrected_data.get("sprite_initial_positions", {}) | |
new_project_json = update_project_with_sprite_positions(state["project_json"], sprite_positions) | |
state["project_json"]= new_project_json | |
print("Updated project JSON with sprite positions:", json.dumps(new_project_json, indent=2)) | |
# [TEMPORARY FOR LOGGING] | |
# Optional: Save raw_response that failed parsing | |
with open("debug_state.json", "w", encoding="utf-8") as f: | |
json.dump(state, f, indent=2, ensure_ascii=False) | |
return state | |
#return {"project_json": new_project_json} | |
except Exception as e: | |
logger.error(f"Error in ParseQueryNode: {e}") | |
raise | |
# Node 2: Declaration Planner Node (Updated) | |
def declaration_planner_node(state: GameState): | |
""" | |
Generates a complete declaration plan that defines: | |
- variables and cloud variables | |
- lists | |
- broadcast messages | |
- monitors (usually tied to reporter blocks or variables) | |
""" | |
logger.info("--- Running DeclarationPlannerNode ---") | |
description = state.get("description", "") | |
# project_json = state["project_json"] # Not directly used in planning prompt, but good to have | |
planning_prompt = f"""Game Description: | |
{description} | |
You are given the game design and are tasked to prepare a full declaration plan for Scratch 3.0. | |
Here is the description to have opcode in the monitor which are mostly reporting block | |
--- Scratch 3.0 Block Reference --- | |
### Reporter Blocks | |
Description: {reporter_description} | |
Blocks: | |
{reporter_opcodes_functionalities} | |
Your output must be valid JSON and follow this exact structure: | |
```json | |
{{ | |
"declaration_plan": {{ | |
"variables": [ | |
{{ "name": "score", "default": 0, "cloud": false }}, | |
{{ "name": "☁ High Score", "default": 0, "cloud": true }}, | |
{{ "name": "speed", "default": 0, "cloud": false }} | |
], | |
"lists": [ | |
{{ "name": "recent_scores", "default": [] }} | |
], | |
"broadcasts": [ | |
"Game Over", | |
"Game Start" | |
], | |
"monitors": [ | |
{{ "target": "Stage", "variable": "score", "visible": true, "opcode_hint": "data_variable" }}, | |
{{ "target": "Stage", "variable": "☁ High Score", "visible": true, "opcode_hint": "data_variable" }}, | |
{{ "target": "Sprite1", "reporter_name": "costume #", "visible": false, "opcode_hint": "looks_costumenumbername", "param_name": "NUMBER_NAME" }} | |
] | |
}} | |
}} | |
``` | |
Guidelines: | |
- Add meaningful variables to represent game mechanics (e.g., score, health, level, etc). | |
- If a variable is shared across devices or saved to the cloud, mark it as `cloud: true`. | |
- Lists should reflect repeated values or game logs like `recent_scores`, `highscore_history` etc. | |
- Broadcasts should represent game state changes (e.g., Game Over, Game Start, Reset Game). | |
- Monitors include on-screen displayed data like variable reports, costume index, or stage backdrop number. | |
- For monitors, include an `opcode_hint` to suggest the Scratch block opcode that will report the value (e.g., `data_variable`, `motion_xposition`, `looks_costumenumbername`). | |
- If a monitor needs a specific parameter (like 'number' or 'name' for costume), add a `param_name` field (e.g., `"param_name": "NUMBER_NAME"`). | |
- Use variable and broadcast names that are clean, human-readable, and contextually relevant. | |
- Avoid duplicating variable names. Always use the same case-sensitive name. | |
Start generating the `declaration_plan` for this game.""" | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": planning_prompt}]}) | |
raw_response = response["messages"][-1].content | |
print("Raw response from LLM [DeclarationPlannerNode]:", raw_response) | |
try: | |
declaration_plan = extract_json_from_llm_response(raw_response) | |
except json.JSONDecodeError as error_json: | |
logger.warning("Malformed JSON detected. Attempting to correct...") | |
correction_prompt = ( | |
"Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n" | |
"Carefully review the JSON for any errors, especially focusing on the reported error at:\n" | |
f"- **Error Details**: {error_json}\n\n" | |
"**Strict Instructions for your response:**\n" | |
"1. **ONLY** output the corrected JSON. Do not include any other text, comments, or explanations outside the JSON.\n" | |
"2. Ensure all property names (keys) are enclosed in **double quotes**.\n" | |
"3. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\\"`) are properly **escaped**.\n" | |
"4. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n" | |
"5. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n" | |
"6. **Crucially, remove any extraneous characters or duplicate closing braces outside the main JSON object.**\n" | |
"7. The corrected JSON must be a **complete and valid** JSON object.\n\n" | |
"Here is the problematic JSON string to correct:\n" | |
"```json\n" | |
f"{raw_response}\n" | |
"```\n" | |
"Corrected JSON:\n" | |
) | |
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) | |
declaration_plan = extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content)) | |
state["declaration_plan"] = declaration_plan | |
logger.info("Declaration plan successfully generated.") | |
# Save debug | |
with open("debug_declaration_plan.json", "w", encoding="utf-8") as f: | |
json.dump(declaration_plan, f, indent=2, ensure_ascii=False) | |
return state | |
except Exception as e: | |
logger.error(f"DeclarationPlannerNode failed: {e}") | |
raise # Re-raise to ensure error is propagated | |
# New Node: Declaration Builder Node | |
def declaration_builder_node(state: GameState): | |
logger.info("--- Running DeclarationBuilderNode ---") | |
declaration_plan = state.get("declaration_plan") | |
if not declaration_plan: | |
logger.warning("No declaration plan found in state. Skipping DeclarationBuilderNode.") | |
return state | |
project_json = state["project_json"] | |
targets = project_json["targets"] | |
# Find the Stage target, which holds global variables, lists, and broadcasts | |
stage_target = next((target for target in targets if target["isStage"]), None) | |
if not stage_target: | |
logger.error("Stage target not found in project_json. Cannot build declarations.") | |
return state | |
# Initialize sections if they don't exist | |
if "variables" not in stage_target: | |
stage_target["variables"] = {} | |
if "lists" not in stage_target: | |
stage_target["lists"] = {} | |
if "broadcasts" not in stage_target: | |
stage_target["broadcasts"] = {} | |
if "monitors" not in project_json: # Monitors are typically at the project root | |
project_json["monitors"] = [] | |
# 1. Build Variables | |
logger.info("Building variables...") | |
for var_data in declaration_plan.get("declaration_plan", {}).get("variables", []): | |
var_name = var_data["name"] | |
var_default = str(var_data["default"]) # Scratch stores values as strings | |
var_cloud = var_data["cloud"] | |
# Generate a unique ID for the variable | |
var_id = f"{var_name}_{str(uuid.uuid4())[:8]}" # Using UUID for uniqueness | |
# Check if variable already exists (e.g., if a previous run created it) | |
# This part requires careful handling of existing IDs, usually by looking them up by name | |
existing_var_id = None | |
for existing_id, existing_var_info in stage_target["variables"].items(): | |
if existing_var_info[0] == var_name: | |
existing_var_id = existing_id | |
break | |
if existing_var_id: | |
# Update existing variable if found | |
stage_target["variables"][existing_var_id][1] = var_default | |
if len(stage_target["variables"][existing_var_id]) > 2: | |
stage_target["variables"][existing_var_id][2] = var_cloud | |
else: | |
if var_cloud: # Only add true for cloud variables if not present | |
stage_target["variables"][existing_var_id].append(True) | |
logger.info(f"Updated variable: {var_name}") | |
else: | |
# Add new variable | |
variable_definition = [var_name, var_default] | |
if var_cloud: | |
variable_definition.append(True) | |
stage_target["variables"][var_id] = variable_definition | |
logger.info(f"Added new variable: {var_name}") | |
# 2. Build Lists | |
logger.info("Building lists...") | |
for list_data in declaration_plan.get("declaration_plan", {}).get("lists", []): | |
list_name = list_data["name"] | |
list_default = list_data["default"] | |
# Generate a unique ID for the list | |
list_id = f"{list_name}_{str(uuid.uuid4())[:8]}" | |
# Check for existing list | |
existing_list_id = None | |
for existing_id, existing_list_info in stage_target["lists"].items(): | |
if existing_list_info[0] == list_name: | |
existing_list_id = existing_id | |
break | |
if existing_list_id: | |
# Update existing list | |
stage_target["lists"][existing_list_id][1] = list_default | |
logger.info(f"Updated list: {list_name}") | |
else: | |
# Add new list | |
stage_target["lists"][list_id] = [list_name, list_default] | |
logger.info(f"Added new list: {list_name}") | |
# 3. Build Broadcasts | |
logger.info("Building broadcasts...") | |
for broadcast_name in declaration_plan.get("declaration_plan", {}).get("broadcasts", []): | |
# Generate a unique ID for the broadcast (Scratch uses IDs for broadcasts internally) | |
broadcast_id = f"{broadcast_name}_{str(uuid.uuid4())[:8]}" | |
# Check if broadcast already exists | |
existing_broadcast_id = None | |
for existing_id, existing_broadcast_name in stage_target["broadcasts"].items(): | |
if existing_broadcast_name == broadcast_name: | |
existing_broadcast_id = existing_id | |
break | |
if existing_broadcast_id: | |
logger.info(f"Broadcast '{broadcast_name}' already exists.") | |
else: | |
stage_target["broadcasts"][broadcast_id] = broadcast_name | |
logger.info(f"Added new broadcast: {broadcast_name}") | |
# 4. Build Monitors | |
logger.info("Building monitors...") | |
monitor_y_offset = 0 # Starting Y position for monitors | |
for monitor_data in declaration_plan.get("declaration_plan", {}).get("monitors", []): | |
monitor_target = monitor_data.get("target") | |
monitor_visible = monitor_data.get("visible", False) | |
opcode_hint = monitor_data.get("opcode_hint") | |
param_name = monitor_data.get("param_name") # For things like costume # or name | |
# Determine the actual target object in project_json | |
actual_target = next((t for t in targets if t["name"] == monitor_target), None) | |
if not actual_target: | |
logger.warning(f"Monitor target '{monitor_target}' not found. Skipping monitor.") | |
continue | |
monitor_block_prompt_data = { | |
"opcode_hint": opcode_hint, | |
"target_name": monitor_target, | |
"visible": monitor_visible, | |
"current_project_json": project_json # Provide full project JSON for context to LLM | |
} | |
# Add variable/reporter specific data | |
if "variable" in monitor_data: | |
monitor_block_prompt_data["variable_name"] = monitor_data["variable"] | |
elif "reporter_name" in monitor_data: | |
monitor_block_prompt_data["reporter_name"] = monitor_data["reporter_name"] | |
if param_name: | |
monitor_block_prompt_data["param_name"] = param_name | |
llm_monitor_block_generation_prompt = ( | |
f"You are an AI assistant tasked with generating the Scratch block JSON for a monitor.\n" | |
f"The monitor details are:\n" | |
f"- Target Sprite/Stage: '{monitor_block_prompt_data['target_name']}'\n" | |
f"- Desired Opcode Hint: '{monitor_block_prompt_data['opcode_hint']}'\n" | |
f"- Visibility: {monitor_block_prompt_data['visible']}\n" | |
) | |
if "variable_name" in monitor_block_prompt_data: | |
llm_monitor_block_generation_prompt += f"- Variable Name: '{monitor_block_prompt_data['variable_name']}'\n" | |
if "reporter_name" in monitor_block_prompt_data: | |
llm_monitor_block_generation_prompt += f"- Reporter Name: '{monitor_block_prompt_data['reporter_name']}'\n" | |
if "param_name" in monitor_block_prompt_data: | |
llm_monitor_block_generation_prompt += f"- Parameter Name: '{monitor_block_prompt_data['param_name']}'\n" | |
llm_monitor_block_generation_prompt += ( | |
f"\nHere is a catalog of reporter blocks that might be relevant, including their `op_code` and parameters:\n" | |
f"```json\n{json.dumps(reporter_block_data, indent=2)}\n```\n\n" # Provide reporter blocks catalog | |
f"Current Scratch project JSON (for context, especially existing variables/lists):\n" | |
f"```json\n{json.dumps(project_json, indent=2)}\n```\n\n" | |
f"**CRITICAL INSTRUCTIONS FOR GENERATING THE MONITOR BLOCK JSON:**\n" | |
f"1. **Output ONLY the JSON object for the `monitor_block`.** Do NOT include any other text or markdown fences like ```json.\n" | |
f"2. The top-level key in your output should be `monitor_block` and its value should be the block definition.\n" | |
f"3. Generate a **globally unique ID** for the `monitor_block`.\n" | |
f"4. Set the `opcode` field based on the `opcode_hint` provided (e.g., `data_variable`, `motion_xposition`, `looks_costumenumbername`).\n" | |
f"5. The `mode` should typically be `\"default\"` for standard monitors.\n" | |
f"6. For variable monitors (`data_variable` opcode), the `params` field MUST contain `\"VARIABLE\": \"{{variable_name}}\"`.\n" | |
f"7. For reporter blocks that have parameters (like `looks_costumenumbername`), the `params` field should contain the correct parameter name (e.g., `\"NUMBER_NAME\": \"number\"` or `\"NUMBER_NAME\": \"name\"`). Refer to the `reporter_block_data` for parameter names.\n" | |
f"8. Set `spriteName` to the actual target sprite's name, or `null` if it's a Stage variable/reporter.\n" | |
f"9. Set `value` to a reasonable default (e.g., \"0\" for numbers, \"\" for strings, or the current value from project_json for existing variables).\n" | |
f"10. Set `visible` to `true` or `false` as specified in the plan.\n" | |
f"11. Provide sensible `x` and `y` coordinates. For demonstration, increment `y` for each new monitor.\n" | |
f"12. `sliderMin`, `sliderMax`, and `isDiscrete` are typically for variables; set them appropriately if known or use defaults (0, 100, true).\n" | |
f"13. Ensure all keys are double-quoted.\n" | |
) | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": llm_monitor_block_generation_prompt}]}) | |
raw_response = response["messages"][-1].content#strip_noise(response["messages"][-1].content) | |
logger.info(f"Raw response from LLM [DeclarationBuilderNode - Monitor]: {raw_response[:500]}...") | |
try: | |
# Expecting a dict with a "monitor_block" key | |
generated_monitor_json = extract_json_from_llm_response(raw_response) | |
monitor_block = generated_monitor_json.get("monitor_block") | |
if not monitor_block: | |
raise ValueError("LLM response for monitor block did not contain 'monitor_block' key.") | |
except json.JSONDecodeError as error_json: | |
logger.error("Failed to extract JSON for monitor block. Attempting to correct.") | |
correction_prompt = ( | |
"Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n" | |
"It must represent a single Scratch monitor block definition.\n" | |
f"- **Error Details**: {error_json}\n\n" | |
"**Strict Instructions for your response:**\n" | |
"1. **ONLY** output the corrected JSON object. Do not include any other text, comments, or explanations outside the JSON.\n" | |
"2. The top-level key must be `monitor_block`.\n" | |
"3. Ensure all property names (keys) are enclosed in **double quotes**.\n" | |
"4. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\\"`) are properly **escaped**.\n" | |
"5. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n" | |
"6. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n" | |
"7. **Crucially, remove any extraneous characters or duplicate closing braces outside the main JSON object.**\n" | |
"8. The corrected JSON must be a **complete and valid** JSON object.\n\n" | |
"Here is the problematic JSON string to correct:\n" | |
"```json\n" | |
f"{raw_response}\n" | |
"```\n" | |
"Corrected JSON:\n" | |
) | |
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) | |
generated_monitor_json = extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content)) | |
monitor_block = generated_monitor_json.get("monitor_block") | |
if not monitor_block: | |
raise ValueError("JSON corrector still did not produce a valid monitor_block.") | |
# Assign coordinates for display, avoiding overlaps | |
monitor_block["x"] = 5 + (monitor_y_offset % 2) * 320 # Alternate X for better layout | |
monitor_block["y"] = 5 + (monitor_y_offset // 2) * 50 # Increment Y | |
monitor_y_offset += 1 # Increment for next monitor | |
project_json["monitors"].append(monitor_block) | |
logger.info(f"Added monitor for '{monitor_data.get('variable', monitor_data.get('reporter_name'))}' to project.json.") | |
except Exception as e: | |
logger.error(f"Error generating monitor block for {monitor_data}: {e}") | |
# Decide whether to raise or continue based on error tolerance | |
state["project_json"] = project_json | |
logger.info("Declaration builder node finished updating project JSON.") | |
with open("debug_state.json", "w", encoding="utf-8") as f: | |
json.dump(state, f, indent=2, ensure_ascii=False) | |
print("Updated project JSON after declarations:", json.dumps(project_json, indent=2)) | |
return state | |
# Node 3: Sprite Action Plan Builder | |
def overall_planner_node(state: GameState): | |
""" | |
Generates a comprehensive action plan for sprites, including detailed Scratch block information. | |
This node acts as an overall planner, leveraging knowledge of all block shapes and categories. | |
""" | |
logger.info("--- Running OverallPlannerNode ---") | |
description = state.get("description", "") | |
project_json = state["project_json"] | |
# MODIFICATION 1: Include 'Stage' in the list of names to plan for. | |
# It's crucial to ensure 'Stage' is always present for its global role. | |
target_names = [t["name"] for t in project_json["targets"]] | |
# MODIFICATION 2: Get sprite positions, providing default for Stage as it doesn't have x,y | |
sprite_positions = {} | |
for target in project_json["targets"]: | |
if not target["isStage"]: | |
sprite_positions[target["name"]] = {"x": target.get("x", 0), "y": target.get("y", 0)} | |
else: | |
sprite_positions[target["name"]] = {"x": "N/A", "y": "N/A"} # Stage doesn't have positional coordinates | |
declaration_plan = state["declaration_plan"] | |
planning_prompt = f"""Generate a detailed action plan for the game's sprites and stage based on the user query and sprite details. | |
**Game Description:** '{description}' | |
**Targets in Game (Sprites and Stage):** {', '.join(target_names)} | |
**Current Target Positions (Sprites have x/y, Stage is N/A):** {json.dumps(sprite_positions)} | |
Here is the overall declaration of variable, broadcast and monitors to look for and utilized as per requirment. | |
**Current Declaration Plan:** {json.dumps(declaration_plan)} | |
--- Scratch 3.0 Block Reference --- | |
This section provides a comprehensive reference of Scratch 3.0 blocks, categorized by shape, including their opcodes and functional descriptions. Use this to accurately identify block types and behavior. | |
### Hat Blocks | |
Description: {hat_description} | |
Blocks: | |
{hat_opcodes_functionalities} | |
### Boolean Blocks | |
Description: {boolean_description} | |
Blocks: | |
{boolean_opcodes_functionalities} | |
### C Blocks | |
Description: {c_description} | |
Blocks: | |
{c_opcodes_functionalities} | |
### Cap Blocks | |
Description: {cap_description} | |
Blocks: | |
{cap_opcodes_functionalities} | |
### Reporter Blocks | |
Description: {reporter_description} | |
Blocks: | |
{reporter_opcodes_functionalities} | |
### Stack Blocks | |
Description: {stack_description} | |
Blocks: | |
{stack_opcodes_functionalities} | |
----------------------------------- | |
Your task is to define the primary actions and movements for each sprite AND THE STAGE. | |
The output should be a JSON object with a single key 'action_overall_flow'. Each key inside this object should be a sprite or 'Stage' name (e.g., 'Player', 'Enemy', 'Stage'), and its value must include a 'description' and a list of 'plans'. | |
Each plan must include a **single Scratch Hat Block** (e.g., 'event_whenflagclicked') to start scratch project and should contain: | |
1. **'event'**: the exact `opcode` of the hat block that initiates the logic. | |
2. **'logic'**: a natural language breakdown of each step taken after the event. Separate each step with a semicolon ';'. Ensure clarity and granularity—each described action should map closely to a Scratch block or tight sequence. | |
- For example: for a jump: 'change y by N; wait M seconds; change y by -N'. | |
- Use 'forever: ...' or 'repeat(10): ...' to prefix repeating logic suitable taking reference from the C blocks. | |
- Use Scratch-consistent verbs: 'move', 'change', 'wait', 'hide', 'show', 'say', 'glide', etc. | |
- **CRITICALLY IMPORTANT:** Do NOT use double quotes within the **'logic'** string itself for values like coordinates (e.g., write 'x:0 y:-130' instead of "x":0 "y":-130 or write 'forever' instead of "forever"). The 'logic' field must be a plain string (e.g. "logic":"'forever': jump; 'x':130)" <- this is a valid string content). | |
3. **Opcode Lists**: include relevant Scratch opcodes grouped under `motion`, `control`, `operator`, `sensing`, `looks`, `sounds`, `events`, and `data`. List only the non-empty categories. Use exact opcodes including shadow/helper blocks (e.g., 'math_number'). | |
Use target names exactly as listed in `Targets in Game`. Do NOT rename or invent new targets. | |
Ensure the plan reflects accurate opcode usage derived strictly from the block reference above. | |
Example structure for 'action_overall_flow': | |
```json | |
{{ | |
"action_overall_flow": {{ | |
"Sprite1": {{ | |
"description": "Main character (cat) actions", | |
"plans": [ | |
{{ | |
"event": "event_whenflagclicked", | |
"logic": "go to initial position at starting point.", | |
"motion": ["motion_gotoxy"], | |
"control": [], | |
"operator": [], | |
"sensing": [], | |
"looks": [], | |
"sounds": [], | |
"events": [], | |
"data": [] | |
}}, | |
{{ | |
"event": "event_whenkeypressed", | |
"logic": "repeat(10): change y by 10; wait 0.1 seconds; change y by -10;", | |
"motion": ["motion_changeyby"], | |
"control": ["control_repeat", "control_wait"], | |
"operator": [], | |
"sensing": [], | |
"looks": [], | |
"sounds": [], | |
"events": [], | |
"data": [] | |
}} | |
] | |
}}, | |
"soccer ball": {{ | |
"description": "Obstacle movement and interaction", | |
"plans": [ | |
{{ | |
"event": "event_whenflagclicked", | |
"logic": "go to x:240 y:-135; forever: glide 2 seconds to x:-240 y:-135; if x position < -235, then set x to 240; if touching Sprite1, then hide;", | |
"motion": ["motion_gotoxy", "motion_glidesecstoxy", "motion_xposition", "motion_setx"], | |
"control": ["control_forever", "control_if"], | |
"operator": ["operator_lt"], | |
"sensing": ["sensing_istouching", "sensing_touchingobjectmenu"], | |
"looks": ["looks_hide"], | |
"sounds": [], | |
"events": [], | |
"data": [] | |
}} | |
] | |
}}, | |
"Stage": {{ | |
"description": "Background and global game state management, including broadcasts, rewards, and score.", | |
"plans": [ | |
{{ | |
"event": "event_whenflagclicked", | |
"logic": "switch backdrop to backdrop1; set score to 0; show variable score; broadcast Game Start;", | |
"motion": [], | |
"control": [], | |
"operator": [], | |
"sensing": [], | |
"looks": ["looks_switchbackdropto"], | |
"sounds": [], | |
"events": ["event_broadcast"], | |
"data": ["data_setvariableto", "data_showvariable"] | |
}}, | |
{{ | |
"event": "event_whenbroadcastreceived", | |
"logic": "if score > High Score, then set High Score to score; switch backdrop to HighScore; stop all;", | |
"motion": [], | |
"control": ["control_if", "control_stop"], | |
"operator": ["operator_gt"], | |
"sensing": [], | |
"looks": ["looks_switchbackdropto"], | |
"sounds": [], | |
"events": [], | |
"data": ["data_setvariableto"] | |
}} | |
] | |
}} | |
}} | |
}} | |
``` | |
Based on the provided context, generate the `action_overall_flow`. | |
- Maintain the **exact JSON structure** shown above. | |
- All `logic` fields must be **clear and granular**. | |
- Only include opcode categories that contain relevant opcodes. | |
- Ensure that each opcode matches its intended Scratch functionality. | |
- If feedback suggests major change, **rethink the entire plan** for the affected sprite(s). | |
- If feedback is minor, make precise, minimal improvements only. | |
""" | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": planning_prompt}]}) | |
print("Raw response from LLM [OverallPlannerNode 1]:",response) | |
raw_response = response["messages"][-1].content#strip_noise(response["messages"][-1].content) | |
print("Raw response from LLM [OverallPlannerNode 2]:", raw_response) # Uncomment for debugging | |
# json debugging and solving | |
try: | |
overall_plan = extract_json_from_llm_response(raw_response) | |
except json.JSONDecodeError as error_json: | |
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") | |
# Use the JSON resolver agent to fix the response | |
correction_prompt = ( | |
"Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n" | |
"Carefully review the JSON for any errors, especially focusing on the reported error at:\n" | |
f"- **Error Details**: {error_json}\n\n" | |
"**Strict Instructions for your response:**\n" | |
"1. **ONLY** output the corrected JSON. Do not include any other text, comments, or explanations outside the JSON.\n" | |
"2. Ensure all property names (keys) are enclosed in **double quotes**.\n" | |
"3. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\`) are properly **escaped**.\n" | |
"4. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n" | |
"5. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n" | |
"6. **Crucially, remove any extraneous characters or duplicate closing braces outside the main JSON object.**\n" | |
"7. The corrected JSON must be a **complete and valid** JSON object.\n\n" | |
"Here is the problematic JSON string to correct:\n" | |
"```json\n" | |
f"{raw_response}\n" | |
"```\n" | |
"Corrected JSON:\n" | |
) | |
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) | |
print(f"[JSON CORRECTOR RESPONSE AT OVERALLPLANNERNODE ]: {correction_response["messages"][-1].content}") | |
overall_plan= extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content)) | |
state["action_plan"] = overall_plan | |
logger.info("Overall plan generated by OverallPlannerNode.") | |
with open("debug_state.json", "w", encoding="utf-8") as f: | |
json.dump(state, f, indent=2, ensure_ascii=False) | |
return state | |
except Exception as e: | |
logger.error(f"Error in OverallPlannerNode: {e}") | |
raise | |
# Node 4: Sprite Plan Verification Node | |
def plan_verification_node(state: GameState): | |
""" | |
Validates the generated action plan/blocks, identifies missing logic, | |
provides feedback, and determines if further improvements are needed. | |
Also manages the iteration count for the improvement loop. | |
""" | |
logger.info(f"--- Running VerificationNode (Iteration: {state.get('iteration_count', 0)}) ---") | |
MAX_IMPROVEMENT_ITERATIONS = 1 # Set a sensible limit to prevent infinite loops | |
current_iteration = state.get("iteration_count", 0) | |
project_json = state["project_json"] | |
action_plan = state.get("action_plan", {}) | |
print(f"[action_plan before verification] on ({current_iteration}): {json.dumps(action_plan, indent=2)}") | |
#improvement_plan = state.get("improvement_plan", {}) # May contain prior improvement guidance | |
# Corrected validation_prompt | |
validation_prompt = f"""You are an AI validator for Scratch project plans and generated blocks. \ | |
Your task is to review the current state of the game's action plan and block structure. \ | |
Critically analyze if there are any missing logic, structural inconsistencies, or unclear intentions. \ | |
Provide **precise** and **constructive** feedback for improvement. | |
**Game Description:** | |
{state.get('description', '')} | |
**Current Action Plan (High-Level Logic):** | |
```json | |
{json.dumps(action_plan, indent=2)} | |
``` | |
**Current Project JSON (Generated Blocks):** | |
```json | |
{json.dumps(project_json, indent=2)} | |
``` | |
**Previous Feedback (if any):** | |
{state.get('plan_validation_feedback', 'None')} | |
Based on the above, return a response strictly in the following JSON format: | |
```json | |
{{ | |
"feedback": "Detailed comments on any missing logic, inconsistencies, or unclear intent. Be concise but specific. If everything is perfect, state that explicitly.", | |
"needs_improvement": true, | |
"suggested_description_updates": "Concise revision of the game description if needed. Use an empty string if no change is required." | |
}} | |
``` | |
**Important:** | |
- The `needs_improvement` field must be strictly `true` or `false` (boolean). Do **not** include any other text or explanation inside the JSON. | |
- Be strict in evaluation. If **any** part of the plan or block logic appears incomplete, ambiguous, or incorrect, set `needs_improvement` to `true`.""" | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": validation_prompt}]}) | |
raw_response = response["messages"][-1].content#strip_noise(response["messages"][-1].content) | |
logger.info(f"Raw response from LLM [VerificationNode]: {raw_response[:500]}...") | |
# json debugging and solving | |
try: | |
validation_result = extract_json_from_llm_response(raw_response) | |
except json.JSONDecodeError as error_json: | |
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") | |
# Use the JSON resolver agent to fix the response | |
correction_prompt = ( | |
"Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n" | |
"Carefully review the JSON for any errors, especially focusing on the reported error at:\n" | |
f"- **Error Details**: {error_json}\n\n" | |
"**Strict Instructions for your response:**\n" | |
"1. **ONLY** output the corrected JSON. Do not include any other text, comments, or explanations outside the JSON.\n" | |
"2. Ensure all property names (keys) are enclosed in **double quotes**.\n" | |
"3. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\`) are properly **escaped**.\n" | |
"4. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n" | |
"5. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n" | |
"6. **Crucially, remove any extraneous characters or duplicate closing braces outside the main JSON object.**\n" # Added instruction | |
"7. The corrected JSON must be a **complete and valid** JSON object.\n\n" | |
"Here is the problematic JSON string to correct:\n" | |
"```json\n" | |
f"{raw_response}\n" | |
"```\n" | |
"Corrected JSON:\n" | |
) | |
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) | |
print(f"[JSON CORRECTOR RESPONSE AT PLANVERIFICATIONNODE ]: {correction_response["messages"][-1].content}") | |
validation_result = extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content)) | |
# Update state with feedback and improvement flag | |
state["plan_validation_feedback"] = validation_result.get("feedback", "No specific feedback provided.") | |
state["needs_improvement"] = validation_result.get("needs_improvement", False) | |
suggested_description_updates = validation_result.get("suggested_description_updates", "") | |
if suggested_description_updates: | |
# You might want to append or intelligently merge this with the existing detailed_game_description | |
# For simplicity, let's just append for now or update a specific field | |
#current_description = state.get("detailed_game_description", state.get("description", "")) | |
current_description = state.get(state.get("description", "")) | |
#state["detailed_game_description"] = f"{current_description}\n\nSuggested Update: {suggested_description_updates}"\ | |
state["description"] = f"{current_description}\n\nSuggested Update: {suggested_description_updates}" | |
logger.info("Updated detailed game description based on validation feedback.") | |
# Manage iteration count | |
if state["needs_improvement"]: | |
state["iteration_count"] = current_iteration + 1 | |
if state["iteration_count"] >= MAX_IMPROVEMENT_ITERATIONS: | |
logger.warning(f"Max improvement iterations ({MAX_IMPROVEMENT_ITERATIONS}) reached. Forcing 'needs_improvement' to False.") | |
state["needs_improvement"] = False | |
state["plan_validation_feedback"] += "\n(Note: Max iterations reached, stopping further improvements.)" | |
else: | |
state["iteration_count"] = 0 # Reset if no more improvement needed | |
logger.info(f"Verification completed. Needs Improvement: {state['needs_improvement']}. Feedback: {state['plan_validation_feedback'][:100]}...") | |
print(f"[updated action_plan after verification] on ({current_iteration}): {json.dumps(state.get("action_plan", {}), indent=2)}") | |
with open("debug_state.json", "w", encoding="utf-8") as f: | |
json.dump(state, f, indent=2, ensure_ascii=False) | |
return state | |
except Exception as e: | |
logger.error(f"Error in VerificationNode: {e}") | |
state["needs_improvement"] = False # Force end loop on error | |
state["plan_validation_feedback"] = f"Validation error: {e}" | |
raise | |
# Node 5: Refined Planner Node | |
def refined_planner_node(state: GameState): | |
""" | |
Refines the action plan based on validation feedback and game description. | |
""" | |
logger.info("--- Running RefinedPlannerNode ---") | |
#detailed_game_description = state.get("detailed_game_description", state.get("description", "A game.")) | |
detailed_game_description = state.get("description","") | |
current_action_plan = state.get("action_plan", {}) | |
print(f"[current_action_plan before refinement] on ({state.get('iteration_count', 0)}): {json.dumps(current_action_plan, indent=2)}") | |
plan_validation_feedback = state.get("plan_validation_feedback", "No specific feedback provided. Assume general refinement is needed.") | |
project_json = state["project_json"] | |
target_names = [t["name"] for t in project_json["targets"]] | |
# MODIFICATION 2: Get sprite positions, providing default for Stage as it doesn't have x,y | |
sprite_positions = {} | |
for target in project_json["targets"]: | |
if not target["isStage"]: | |
sprite_positions[target["name"]] = {"x": target.get("x", 0), "y": target.get("y", 0)} | |
else: | |
sprite_positions[target["name"]] = {"x": "N/A", "y": "N/A"} # Stage doesn't have positional coordinates | |
declaration_plan = state["declaration_plan"] | |
refinement_prompt = f"""Refine the existing action plan for the game's sprites based on the detailed game description and the validation feedback provided. | |
**Game Description:** '{detailed_game_description}' | |
**Targets in Game (Sprites and Stage):** {', '.join(target_names)} | |
**Current Target Positions (Sprites have x/y, Stage is N/A):** {json.dumps(sprite_positions)} | |
Here is the overall declaration of variable, broadcast and monitors to look for and utilized as per requirment. | |
**Current Declaration Plan:** {json.dumps(declaration_plan)} | |
**Validation Feedback:** | |
'{plan_validation_feedback}' | |
--- Scratch 3.0 Block Reference --- | |
### Hat Blocks | |
Description: {hat_description} | |
Blocks: | |
{hat_opcodes_functionalities} | |
### Boolean Blocks | |
Description: {boolean_description} | |
Blocks: | |
{boolean_opcodes_functionalities} | |
### C Blocks | |
Description: {c_description} | |
Blocks: | |
{c_opcodes_functionalities} | |
### Cap Blocks | |
Description: {cap_description} | |
Blocks: | |
{cap_opcodes_functionalities} | |
### Reporter Blocks | |
Description: {reporter_description} | |
Blocks: | |
{reporter_opcodes_functionalities} | |
### Stack Blocks | |
Description: {stack_description} | |
Blocks: | |
{stack_opcodes_functionalities} | |
----------------------------------- | |
Your task is to align to description, refine and correct the JSON object 'action_overall_flow'. | |
Use sprite names exactly as provided in `sprite_names` (e.g., 'Sprite1', 'soccer ball'); and also the stage, do **NOT** rename them. | |
Follow this exact format for the output (example): | |
Ensure the plan reflects accurate opcode usage derived strictly from the block reference above. | |
Example structure for 'action_overall_flow': | |
```json | |
{{ | |
"action_overall_flow": {{ | |
"Sprite1": {{ | |
"description": "Main character (cat) actions", | |
"plans": [ | |
{{ | |
"event": "event_whenflagclicked", | |
"logic": "go to initial position at starting point.", | |
"motion": ["motion_gotoxy"], | |
"control": [], | |
"operator": [], | |
"sensing": [], | |
"looks": [], | |
"sounds": [], | |
"events": [], | |
"data": [] | |
}}, | |
{{ | |
"event": "event_whenkeypressed", | |
"logic": "repeat(10): change y by 10; wait 0.1 seconds; change y by -10;", | |
"motion": ["motion_changeyby"], | |
"control": ["control_repeat", "control_wait"], | |
"operator": [], | |
"sensing": [], | |
"looks": [], | |
"sounds": [], | |
"events": [], | |
"data": [] | |
}} | |
] | |
}}, | |
"soccer ball": {{ | |
"description": "Obstacle movement and interaction", | |
"plans": [ | |
{{ | |
"event": "event_whenflagclicked", | |
"logic": "go to x:240 y:-135; forever: glide 2 seconds to x:-240 y:-135; if x position < -235, then set x to 240; if touching Sprite1, then hide;", | |
"motion": ["motion_gotoxy", "motion_glidesecstoxy", "motion_xposition", "motion_setx"], | |
"control": ["control_forever", "control_if"], | |
"operator": ["operator_lt"], | |
"sensing": ["sensing_istouching", "sensing_touchingobjectmenu"], | |
"looks": ["looks_hide"], | |
"sounds": [], | |
"events": [], | |
"data": [] | |
}} | |
] | |
}}, | |
"Stage": {{ | |
"description": "Background and global game state management, including broadcasts, rewards, and score.", | |
"plans": [ | |
{{ | |
"event": "event_whenflagclicked", | |
"logic": "switch backdrop to backdrop1; set score to 0; show variable score; broadcast Game Start;", | |
"motion": [], | |
"control": [], | |
"operator": [], | |
"sensing": [], | |
"looks": ["looks_switchbackdropto"], | |
"sounds": [], | |
"events": ["event_broadcast"], | |
"data": ["data_setvariableto", "data_showvariable"] | |
}}, | |
{{ | |
"event": "event_whenbroadcastreceived", | |
"logic": "if score > High Score, then set High Score to score; switch backdrop to HighScore; stop all;", | |
"motion": [], | |
"control": ["control_if", "control_stop"], | |
"operator": ["operator_gt"], | |
"sensing": [], | |
"looks": ["looks_switchbackdropto"], | |
"sounds": [], | |
"events": [], | |
"data": ["data_setvariableto"] | |
}} | |
] | |
}} | |
}} | |
}} | |
``` | |
Use the validation feedback to address errors, fill in missing logic, or enhance clarity. | |
example of few possible improvements: 1.event_whenflagclicked is used to control sprite but its used for actual start scratch project and reset scratch. 2. looping like forever used where we should use iterative. 3. missing of for variable we used in the block | |
- Maintain the **exact JSON structure** shown above. | |
- All `logic` fields must be **clear and granular**. | |
- Only include opcode categories that contain relevant opcodes. | |
- Ensure that each opcode matches its intended Scratch functionality. | |
- If feedback suggests major change, **rethink the entire plan** for the affected sprite(s). | |
- If feedback is minor, make precise, minimal improvements only.""" | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": refinement_prompt}]}) | |
raw_response = response["messages"][-1].content#strip_noise(response["messages"][-1].content) | |
logger.info(f"Raw response from LLM [RefinedPlannerNode]: {raw_response[:500]}...") | |
# json debugging and solving | |
try: | |
refined_plan = extract_json_from_llm_response(raw_response) | |
except json.JSONDecodeError as error_json: | |
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") | |
# Use the JSON resolver agent to fix the response | |
correction_prompt = ( | |
"Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n" | |
"Carefully review the JSON for any errors, especially focusing on the reported error at:\n" | |
f"- **Error Details**: {error_json}\n\n" | |
"**Strict Instructions for your response:**\n" | |
"1. **ONLY** output the corrected JSON. Do not include any other text, comments, or explanations outside the JSON.\n" | |
"2. Ensure all property names (keys) are enclosed in **double quotes**.\n" | |
"3. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\`) are properly **escaped**.\n" | |
"4. IN `logic` field make sure content enclosed in **double quotes** should not have invalid **double quotes**, **eliminate** all quotes inside the content if any. " | |
"4. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n" | |
"5. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n" | |
"6. **Crucially, remove any extraneous characters or duplicate closing braces outside the main JSON object.**\n" # Added instruction | |
"7. The corrected JSON must be a **complete and valid** JSON object.\n\n" | |
"Here is the problematic JSON string to correct:\n" | |
"```json\n" | |
f"{raw_response}\n" | |
"```\n" | |
"Corrected JSON:\n" | |
) | |
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) | |
print(f"[JSON CORRECTOR RESPONSE AT REFINEPLANNER ]: {correction_response["messages"][-1].content}") | |
refined_plan = extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content)) | |
logger.info("Refined plan corrected by JSON resolver agent.") | |
if refined_plan: | |
#state["action_plan"] = refined_plan.get("action_overall_flow", {}) # Update to the key 'action_overall_flow' [error] | |
state["action_plan"] = refined_plan.get("action_overall_flow", {}) # Update the main the prompt includes updated only | |
logger.info("Action plan refined by RefinedPlannerNode.") | |
else: | |
logger.warning("RefinedPlannerNode did not return a valid 'action_overall_flow' structure. Keeping previous plan.") | |
print("[Refined Action Plan]:", json.dumps(state["action_plan"], indent=2)) | |
print("[current state after refinement]:", json.dumps(state, indent=2)) | |
with open("debug_state.json", "w", encoding="utf-8") as f: | |
json.dump(state, f, indent=2, ensure_ascii=False) | |
return state | |
except Exception as e: | |
logger.error(f"Error in RefinedPlannerNode: {e}") | |
raise | |
# Helper function to get a block by its opcode from a single catalog | |
def get_block_by_opcode(catalog_data: dict, opcode: str) -> dict | None: | |
""" | |
Search a single catalog (with keys "description" and "blocks": List[dict]) | |
for a block whose 'op_code' matches the given opcode. | |
Returns the block dict or None if not found. | |
""" | |
for block in catalog_data["blocks"]: | |
if block.get("op_code") == opcode: | |
return block | |
return None | |
# Helper function to find a block in all catalogs by opcode | |
def find_block_in_all(opcode: str, all_catalogs: list[dict]) -> dict | None: | |
""" | |
Search across multiple catalogs for a given opcode. | |
Returns the first matching block dict or None. | |
""" | |
for catalog in all_catalogs: | |
blk = get_block_by_opcode(catalog, opcode) | |
if blk is not None: | |
return blk | |
return None | |
# Helper function to get a block by its opcode from a single catalog | |
def get_block_by_opcode(catalog_data: dict, opcode: str) -> dict | None: | |
""" | |
Search a single catalog (with keys "description" and "blocks": List[dict]) | |
for a block whose 'op_code' matches the given opcode. | |
Returns the block dict or None if not found. | |
""" | |
for block in catalog_data["blocks"]: | |
if block.get("op_code") == opcode: | |
return block | |
return None | |
# Helper function to find a block in all catalogs by opcode | |
def find_block_in_all(opcode: str, all_catalogs: list[dict]) -> dict | None: | |
""" | |
Search across multiple catalogs for a given opcode. | |
Returns the first matching block dict or None. | |
""" | |
for catalog in all_catalogs: | |
blk = get_block_by_opcode(catalog, opcode) | |
if blk is not None: | |
return blk | |
return None | |
# Node 4: Overall Block Builder Node | |
def overall_block_builder_node(state: dict): | |
logger.info("--- Running OverallBlockBuilderNode ---") | |
project_json = state["project_json"] | |
targets = project_json["targets"] | |
monitors = project_json["monitors"] | |
# --- Sprite and Stage Target Mapping --- | |
sprite_map = {target["name"]: target for target in targets if not target["isStage"]} | |
stage_target = next((target for target in targets if target["isStage"]), None) | |
if stage_target: | |
sprite_map[stage_target["name"]] = stage_target | |
# --- Pre-load all block-catalog JSONs once --- | |
all_catalogs = [ | |
hat_block_data, | |
boolean_block_data, | |
c_block_data, | |
cap_block_data, | |
reporter_block_data, | |
stack_block_data | |
] | |
action_plan = state.get("action_plan", {}) | |
print("[Overall Action Plan received at the block generator]:", json.dumps(action_plan, indent=2)) | |
if not action_plan: | |
logger.warning("No action plan found in state. Skipping OverallBlockBuilderNode.") | |
return state | |
# Initialize offsets for script placement on the Scratch canvas | |
script_y_offset = {} | |
script_x_offset_per_sprite = {name: 0 for name in sprite_map.keys()} | |
# This handles potential variations in the action_plan structure. | |
if action_plan.get("action_overall_flow", {}) == {}: | |
plan_data = action_plan.items() | |
else: | |
plan_data = action_plan.get("action_overall_flow", {}).items() | |
# --- Extract global project context for LLM --- | |
all_sprite_names = list(sprite_map.keys()) | |
all_variable_names = {} | |
all_list_names = {} | |
all_broadcast_messages = {} | |
for target in targets: | |
for var_id, var_info in target.get("variables", {}).items(): | |
all_variable_names[var_info[0]] = var_id # Store name -> ID mapping (e.g., "myVariable": "myVarId123") | |
for list_id, list_info in target.get("lists", {}).items(): | |
all_list_names[list_info[0]] = list_id # Store name -> ID mapping | |
for broadcast_id, broadcast_name in target.get("broadcasts", {}).items(): | |
all_broadcast_messages[broadcast_name] = broadcast_id # Store name -> ID mapping | |
# --- Process each sprite's action plan --- | |
for sprite_name, sprite_actions_data in plan_data: | |
if sprite_name in sprite_map: | |
current_sprite_target = sprite_map[sprite_name] | |
if "blocks" not in current_sprite_target: | |
current_sprite_target["blocks"] = {} | |
if sprite_name not in script_y_offset: | |
script_y_offset[sprite_name] = 0 | |
for plan_entry in sprite_actions_data.get("plans", []): | |
event_opcode = plan_entry["event"] | |
logic_sequence = plan_entry["logic"] | |
# Gather all opcodes expected to be used in this script | |
needed_opcodes = [] | |
needed_opcodes.extend(plan_entry.get("motion", [])) | |
needed_opcodes.extend(plan_entry.get("control", [])) | |
needed_opcodes.extend(plan_entry.get("operator", [])) | |
needed_opcodes.extend(plan_entry.get("sensing", [])) | |
needed_opcodes.extend(plan_entry.get("looks", [])) | |
needed_opcodes.extend(plan_entry.get("sound", [])) | |
needed_opcodes.extend(plan_entry.get("events", [])) | |
needed_opcodes.extend(plan_entry.get("data", [])) | |
needed_opcodes = list(set(needed_opcodes)) # Remove duplicates | |
# Merge human-written catalog and runtime catalog for comprehensive block definitions | |
combined_blocks = {} | |
for op in needed_opcodes: | |
catalog_def = find_block_in_all(op, all_catalogs) or {} | |
runtime_def = ALL_SCRATCH_BLOCKS_CATALOG.get(op, {}) | |
merged = {**catalog_def, **runtime_def} | |
combined_blocks[op] = merged | |
print("[Combined blocks for this script]:", json.dumps(combined_blocks, indent=2)) | |
# --- LLM Block Generation Prompt (Self-Contained and Explicit) --- | |
# All necessary instructions and context are directly included in the prompt. | |
llm_block_generation_prompt = f"""You are an AI assistant generating a **single complete Scratch 3.0 script** in JSON format. | |
The current sprite is '{sprite_name}'. | |
This script must start with the event block (Hat Block) for opcode '{event_opcode}'. | |
The sequential logic to implement for this script is: | |
Logic: {logic_sequence} | |
**Project Context:** | |
Here is a summary of the current Scratch project's elements. You MUST reference these when generating blocks that interact with sprites, variables, lists, or broadcasts to use it in block where you requird. Do NOT invent new names if an existing one is suitable. | |
- **All Sprite Names:** {json.dumps(all_sprite_names)} | |
- **Existing Variables (Name: ID):** {json.dumps(all_variable_names)} | |
- **Existing Lists (Name: ID):** {json.dumps(all_list_names)} | |
- **Existing Broadcast Messages (Name: ID):** {json.dumps(all_broadcast_messages)} | |
Here is the **monitors** available as reporting blocks which are displayed on the screen (e.g. scores, highscore, etc): | |
- **monitor:** {json.dumps(monitors)} | |
**Required Block Opcodes & Catalog:** | |
Based on the planning, the following specific Scratch block opcodes are expected to be used. You MUST use these opcodes where applicable. Here is the comprehensive catalog for these blocks, including their structure and required inputs/fields: | |
```json | |
{json.dumps(combined_blocks, indent=2)} | |
``` | |
Current Scratch project JSON for this sprite (provided for context; you are generating a NEW, complete script): | |
```json | |
{json.dumps(current_sprite_target, indent=2)} | |
``` | |
**CRITICAL INSTRUCTIONS FOR GENERATING THE BLOCK JSON (READ CAREFULLY AND FOLLOW PRECISELY):** | |
1. **Unique Block IDs:** Generate a **globally unique ID** for EVERY block (main and shadow blocks) within the entire JSON output for this script. Example: 'myBlockID123'. These IDs must be unique within the *entire* generated JSON for this script. | |
2. **Script Initiation (Hat Block - VERY IMPORTANT):** | |
* The **first block** of the script (the Hat block, opcode '{event_opcode}') MUST have `"topLevel": true` and `"parent": null`. | |
* ONLY this Hat block should have `"topLevel": true`. All other blocks in the script MUST have `"topLevel": false`. | |
* Set its `x` and `y` coordinates (e.g., `x: 0, y: 0` or similar for clear placement). | |
3. **Strict Block Chaining (`next` and `parent`):** | |
* Use the `next` field to point to the ID of the block DIRECTLY BELOW it in the stack. If a block has a `next`, its `next` block's `parent` MUST point back to the current block's ID. | |
* Use the `parent` field to point to the ID of the block DIRECTLY ABOVE it in the stack. If a block has a `parent`, then that `parent` block's `next` MUST point to the current block's ID. | |
* The **last block** in a linear stack MUST have `"next": null`. | |
* Blocks plugged into inputs or substacks DO NOT use `next` to connect; their connection is solely via the `inputs` field of their parent. | |
4. **`inputs` Field Structure (ABSOLUTELY CRITICAL - ADHERE TO THIS RIGIDLY):** | |
* The value for ANY key within the `inputs` dictionary MUST be an **array of EXACTLY two elements**: `[type_code, value_or_block_id]`. | |
* `type_code` is usually `1` for connected blocks or `2` for input blocks (e.g., boolean or reporter). However, for simplicity and typical block connections, you can generally use `1` when referencing a separate block ID. | |
* **NEVER embed direct primitive values or nested arrays** inside the second element of `inputs`. For example, **FORBID** `["STEPS": [1, ["math_number","10"]]]`. This is incorrect. | |
* **ALWAYS** generate a separate shadow block with its own unique ID for every literal value (numbers, strings, booleans) or dropdown/menu selection that is an input. Reference that shadow block ID in the parent `inputs`. Example: | |
**Correct Example for Numerical Input (e.g., for `motion_movesteps` STEPS, or `motion_gotoxy` X/Y):** | |
If you need to input the number `10` into a block, you MUST create a separate `math_number` shadow block for it, and then reference its ID. | |
```json | |
// Main block using the number | |
"mainBlockID": {{ | |
"opcode": "motion_movesteps", | |
"inputs": {{ | |
"STEPS": [1, "shadowNumID"] | |
}}, | |
// ... other fields | |
}}, | |
// The separate shadow block for the number '10' | |
"shadowNumID": {{ | |
"opcode": "math_number", | |
"fields": {{ | |
"NUM": ["10", null] | |
}}, | |
"shadow": true, | |
"parent": "mainBlockID", | |
"topLevel": false | |
}} | |
``` | |
* **Correct Example for Dropdown/Menu Input (e.g., `sensing_touchingobject` with 'edge'):** | |
If you need to select 'edge' for the `touching ()?` block, you MUST create a separate `sensing_touchingobjectmenu` shadow block and reference its ID. | |
```json | |
// Main block using the dropdown selection | |
"touchingBlockID": {{ | |
"opcode": "sensing_touchingobject", | |
"inputs": {{ | |
"TOUCHINGOBJECTMENU": [1, "shadowEdgeMenuID"] | |
}}, | |
// ... other fields | |
}}, | |
// The separate shadow block for the 'edge' menu option | |
"shadowEdgeMenuID": {{ | |
"opcode": "sensing_touchingobjectmenu", | |
"fields": {{ | |
"TOUCHINGOBJECTMENU": ["_edge_", null] | |
}}, | |
"shadow": true, | |
"parent": "touchingBlockID", | |
"topLevel": false | |
}} | |
``` | |
* **Correct Example for C-block (e.g., `control_forever`):** | |
The `control_forever` block MUST have a `SUBSTACK` input pointing to the first block inside its loop. The value for `SUBSTACK` must be an array: `[2, "FIRST_BLOCK_IN_FOREVER_LOOP_ID"]`. | |
```json | |
"foreverBlockID": {{ | |
"opcode": "control_forever", | |
"inputs": {{ | |
"SUBSTACK": [2, "firstBlockInsideForeverID"] | |
}}, | |
"next": null, | |
"parent": "blockAboveForeverID", | |
"shadow": false, | |
"topLevel": false | |
}}, | |
"firstBlockInsideForeverID": {{ | |
// ... definition of the first block inside the forever loop | |
"parent": "foreverBlockID", | |
"next": "secondBlockInsideForeverID" // if there's another block | |
}} | |
``` | |
* **Correct Example for C-block with condition (e.g., `control_if`):** | |
The `control_if` block MUST have a `CONDITION` input (typically `type_code: 1` referencing a boolean reporter block) and a `SUBSTACK` input (`type_code: 2` referencing the first block inside the if-body). | |
```json | |
"ifBlockID": {{ | |
"opcode": "control_if", | |
"inputs": {{ | |
"CONDITION": [1, "conditionBlockID"], | |
"SUBSTACK": [2, "firstBlockInsideIfID"] | |
}}, | |
"next": "blockAfterIfID", | |
"parent": "blockAboveIfID", | |
"shadow": false, | |
"topLevel": false | |
}}, | |
"conditionBlockID": {{ | |
"opcode": "sensing_touchingobject", // Example condition block | |
// ... definition for condition block, parent should be "ifBlockID" | |
}}, | |
"firstBlockInsideIfID": {{ | |
// ... definition of the first block inside the if body | |
"parent": "ifBlockID", | |
"next": null // or next block if more | |
}} | |
``` | |
5. **Define ALL Shadow Blocks Separately (THIS IS ESSENTIAL):** Every time a block's input requires a number, string literal, or a selection from a dropdown/menu, you MUST define a **separate block entry** in the top-level blocks dictionary for that shadow. Each shadow block MUST have `"shadow": true` and `"topLevel": false`. | |
6. **`fields` for direct dropdown values/text:** Use the `fields` dictionary ONLY IF the block directly embeds a dropdown value or text field without an `inputs` connection (e.g., the `KEY_OPTION` field within the `event_whenkeypressed` shadow block itself, or the `variable` field on a `data_setvariableto` block). Example: `"fields": {{"KEY_OPTION": ["space", null]}}`. | |
7. **`topLevel: true` for hat blocks:** Only the starting (hat) block of a script should have `"topLevel": true`. | |
8. **Ensure Unique Block IDs:** Every block you generate (main blocks and shadow blocks) must have a unique ID within the entire script's block dictionary. | |
9. **Strictly Use Catalog Opcodes:** You MUST only use `opcode` values that are present in the provided `ALL_SCRATCH_BLOCKS_CATALOG`. Do NOT use unlisted opcodes like `motion_jump`. | |
10. **Return ONLY the JSON object representing all the blocks for THIS SINGLE SCRIPT.** Do NOT wrap it in a 'blocks' key or the full project JSON. | |
""" | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": llm_block_generation_prompt}]}) | |
raw_response = response["messages"][-1].content | |
logger.info(f"Raw response from LLM [OverallBlockBuilderNode - {sprite_name} - {event_opcode}]: {raw_response[:500]}...") | |
print(f"Raw response from LLM [OverallBlockBuilderNode - {sprite_name} - {event_opcode}]: {raw_response}") | |
try: | |
generated_blocks = extract_json_from_llm_response(raw_response) | |
except json.JSONDecodeError as error_json: | |
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") | |
# --- JSON Correction Agent --- | |
correction_prompt = ( | |
"Your primary goal is to generate JSON that precisely defines Scratch 3.0 blocks. " | |
"The output MUST be a single, valid JSON object, enclosed within a '```json' markdown block.\n" | |
"**STRICT JSON SYNTAX RULES:**\n" | |
"1. **NO EXTRA TEXT, COMMENTS, OR CONVERSATIONAL FILLER.** Only the JSON.\n" | |
"2. All keys MUST be enclosed in double quotes.\n" | |
"3. String values MUST be enclosed in double quotes. Internal double quotes must be escaped (e.g., `\\\"`). Newlines (`\\n`) and carriage returns (`\\r`) in strings must be escaped.\n" | |
"4. No trailing commas.\n" | |
"5. Correct nesting of braces `{}` and brackets `[]`.\n" | |
"6. **CRITICAL: Understand and strictly follow the Scratch block input array format.**\n" | |
" - When an input refers to another top-level block (like a variable block or a number block that is defined separately):\n" | |
" `\"INPUT_NAME\": [1, \"ID_OF_REFERENCED_BLOCK\"]`\n" | |
" - When an input *directly embeds* a literal value (like a number, string, or boolean shadow block) as per the Scratch schema (which is less common for simple literals but necessary for certain types):\n" | |
" `\"INPUT_NAME\": [1, [\"math_number\", \"10\"]]` (for numbers, as a shadow block definition)\n" | |
" `\"INPUT_NAME\": [1, [\"text\", \"hello\"]]` (for strings, as a shadow block definition)\n" | |
" `\"INPUT_NAME\": [1, [\"boolean\", true]]` (for booleans, as a shadow block definition)\n" | |
" **However, for most literal inputs, the standard is to define a separate shadow block and reference its ID.** Refer to the earlier prompt for the preferred method using separate shadow blocks. The nested array format `[1, [\"opcode\", \"value\"]]` should generally be avoided unless specifically indicated by the Scratch schema for certain block types where the shadow block is *always* an implicit part of the input. **For clarity and consistency, prioritize creating separate shadow blocks and referencing their IDs in the input array as `[1, \"ShadowBlockID\"]`.**\n" | |
" - **Do NOT** generate patterns like `some_id[\"value\", null]]` or `\"\"\": [1, some_id[\"\", null]]`.\n" | |
" - Ensure `\"X\"` and `\"Y\"` inputs for `motion_gotoxy` and `motion_glidesecstoxy` correctly reference a block ID (usually a math_number shadow block).\n" | |
"7. The 'logic' field should contain a plain string describing the action, without any embedded JSON or Scratch block syntax (like `\"forever\":\"` which breaks the string).\n\n" | |
"Here is the problematic JSON string that needs correction, including the error details:\n" | |
f"- **Error Details**: {error_json}\n" | |
"```json\n" | |
f"{raw_response}\n" | |
"```\n" | |
"Your corrected, valid JSON response:\n" | |
"```json\n" | |
) | |
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) | |
print(f"[JSON CORRECTOR RESPONSE AT OVERALLBLOCKBUILDER ]: {correction_response["messages"][-1].content}") | |
generated_blocks = extract_json_from_llm_response(correction_response["messages"][-1].content) | |
if "blocks" in generated_blocks and isinstance(generated_blocks["blocks"], dict): | |
logger.warning(f"LLM returned nested 'blocks' key for {sprite_name}. Unwrapping.") | |
generated_blocks = generated_blocks["blocks"] | |
# Update block positions for top-level script | |
for block_id, block_data in generated_blocks.items(): | |
if block_data.get("topLevel"): | |
block_data["x"] = script_x_offset_per_sprite.get(sprite_name, 0) | |
block_data["y"] = script_y_offset[sprite_name] | |
script_y_offset[sprite_name] += 150 # Increment for next script | |
current_sprite_target["blocks"].update(generated_blocks) | |
state["iteration_count"] = 0 | |
logger.info(f"Action blocks added for sprite '{sprite_name}', script '{event_opcode}' by OverallBlockBuilderNode.") | |
except Exception as e: | |
logger.error(f"Error generating blocks for sprite '{sprite_name}', script '{event_opcode}': {e}") | |
raise | |
state["project_json"] = project_json | |
logger.info("Updated project JSON with action nodes.") | |
with open("debug_state.json", "w", encoding="utf-8") as f: | |
json.dump(state, f, indent=2, ensure_ascii=False) | |
print("Updated project JSON with action nodes:", json.dumps(project_json, indent=2)) | |
return state | |
#helper function to identify the shape of block utilized by the block verifier | |
def get_block_type(opcode: str) -> str: | |
"""Determines the general type of a Scratch block based on its opcode, using the catalog.""" | |
return ALL_SCRATCH_BLOCKS_CATALOG.get(opcode, {}).get("blockType", "unknown") | |
def filter_script_blocks(all_blocks: dict, hat_block_id: str) -> dict: | |
""" | |
Filters and returns only the blocks that are part of a specific script | |
starting from the given hat_block_id, including connected reporters/shadows. | |
""" | |
script_blocks = {} | |
q = [hat_block_id] | |
visited = set() | |
while q: | |
current_block_id = q.pop(0) | |
if current_block_id in visited: | |
continue | |
visited.add(current_block_id) | |
block_data = all_blocks.get(current_block_id) | |
if not block_data: | |
continue | |
script_blocks[current_block_id] = block_data | |
# Add next block in sequence (only for stack/c-blocks) | |
if get_block_type(block_data.get("opcode")) in ["stack", "c_block", "hat"]: # Hat blocks have next too! | |
next_id = block_data.get("next") | |
if next_id and next_id in all_blocks: | |
q.append(next_id) | |
# Add blocks connected via inputs (e.g., reporters, shadow blocks, substacks) | |
if "inputs" in block_data: | |
for input_name, input_value in block_data["inputs"].items(): | |
if isinstance(input_value, list) and len(input_value) >= 2: | |
value_or_block_id = input_value[1] | |
if isinstance(value_or_block_id, str) and value_or_block_id in all_blocks: | |
q.append(value_or_block_id) | |
# For type code 3 (reporter with default value), the third element might be a connected block | |
if len(input_value) >= 3 and isinstance(input_value[2], str) and input_value[2] in all_blocks: | |
q.append(input_value[2]) | |
# For C-blocks, add blocks in substacks (if present) | |
if get_block_type(block_data.get("opcode")) == "c_block": | |
for input_key, input_val in block_data.get("inputs", {}).items(): | |
# Check for substack inputs (type code 2, typically starts with SUBSTACK) | |
if isinstance(input_val, list) and len(input_val) >= 2 and input_val[0] == 2 and isinstance(input_val[1], str) and input_val[1] in all_blocks: | |
q.append(input_val[1]) | |
return script_blocks | |
def analyze_script_structure( | |
script_blocks: dict, | |
hat_block_id: str, | |
sprite_name: str, | |
sprite_variables: dict, | |
sprite_lists: dict, | |
sprite_broadcasts: dict, | |
sprite_custom_blocks: dict # proccode -> block_id mapping | |
) -> list: | |
""" | |
Analyzes the structure of a single Scratch script for common errors. | |
Returns a list of issue strings. | |
""" | |
issues = [] | |
# 1. Validate the hat block | |
hat_block = script_blocks.get(hat_block_id) | |
if not hat_block: | |
issues.append(f"Script for sprite '{sprite_name}' (hat ID: {hat_block_id}) has no hat block data.") | |
return issues # Cannot proceed without a hat block | |
if not hat_block.get("topLevel") or hat_block.get("parent") is not None: | |
issues.append(f"Hat block '{hat_block_id}' for sprite '{sprite_name}' is not marked as topLevel or has a parent, which is incorrect for a script start.") | |
# Hat blocks can and do have a 'next' connection to the first block in their script. | |
# The previous check was a false positive. Removed the explicit "should not have next" check for hat blocks. | |
# 2. Check all blocks within the script | |
for block_id, block_data in script_blocks.items(): | |
opcode = block_data.get("opcode") | |
if not opcode: | |
issues.append(f"Block '{block_id}' for sprite '{sprite_name}' is missing an opcode.") | |
continue | |
# Check if opcode exists in the catalog | |
if opcode not in ALL_SCRATCH_BLOCKS_CATALOG: | |
issues.append(f"Block '{block_id}' for sprite '{sprite_name}' has unknown opcode '{opcode}'. Recommendation: Verify against Scratch 3.0 block reference.") | |
# Parent-Child and Next-Previous Linkage | |
parent_id = block_data.get("parent") | |
next_id = block_data.get("next") | |
current_block_type = get_block_type(opcode) | |
if parent_id: | |
parent_block = script_blocks.get(parent_id) | |
if not parent_block: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' references non-existent parent '{parent_id}'.") | |
else: | |
# Validate the parent-child connection type based on block types | |
parent_block_type = get_block_type(parent_block.get("opcode")) | |
if parent_block_type in ["stack", "hat", "c_block"]: | |
# If parent is a stack/hat/C-block, this block should be its 'next' or an input/substack | |
if parent_block.get("next") == block_id: | |
# Correct sequential connection | |
pass | |
else: | |
found_as_input = False | |
for input_name, input_value in parent_block.get("inputs", {}).items(): | |
if isinstance(input_value, list) and len(input_value) >= 2 and input_value[1] == block_id: | |
found_as_input = True | |
break | |
if not found_as_input: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) has parent '{parent_id}' but is not a sequential 'next' block nor connected via an input slot. Potential broken linkage.") | |
elif parent_block_type in ["reporter", "boolean"]: | |
# If parent is a reporter/boolean, this block must be its child (e.g., a shadow block or another reporter) | |
# and connected via an input. | |
found_as_input = False | |
for input_name, input_value in parent_block.get("inputs", {}).items(): | |
if isinstance(input_value, list) and len(input_value) >= 2 and input_value[1] == block_id: | |
found_as_input = True | |
break | |
if not found_as_input: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) has reporter/boolean parent '{parent_id}' but is not linked via an input. Potential broken linkage.") | |
else: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) has an unexpected parent block type '{parent_block_type}' for parent '{parent_id}'.") | |
# Check next linkage: only cap blocks should not have a next. Shadow blocks also shouldn't have next. | |
if next_id: | |
next_block = script_blocks.get(next_id) | |
if not next_block: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' references non-existent next block '{next_id}'.") | |
elif next_block.get("parent") != block_id: | |
issues.append(f"Block '{block_id}' (opcode: {opcode})'s next block '{next_id}' does not link back to it via 'parent'.") | |
# Cap blocks should not have 'next' | |
if current_block_type == "cap": | |
issues.append(f"Cap block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' should not have a 'next' connection as it ends a script.") | |
# Shadow blocks should not have 'next' | |
if block_data.get("shadow"): | |
issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' should not have a 'next' connection.") | |
# Input validation | |
if "inputs" in block_data: | |
for input_name, input_value in block_data["inputs"].items(): | |
if not isinstance(input_value, list) or len(input_value) < 2: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' has malformed input '{input_name}': {input_value}.") | |
continue | |
type_code = input_value[0] | |
value_or_block_id = input_value[1] | |
# Check type code validity | |
if type_code not in [1, 2, 3]: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' has invalid type code: {type_code}. Expected 1, 2, or 3.") | |
if isinstance(value_or_block_id, str): | |
# It's a block ID, check if it exists and its parent link is correct | |
connected_block = script_blocks.get(value_or_block_id) | |
if not connected_block: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' references non-existent block ID '{value_or_block_id}'.") | |
elif connected_block.get("parent") != block_id: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' connects to '{value_or_block_id}', but '{value_or_block_id}'s parent is not '{block_id}'.") | |
connected_block_type = get_block_type(connected_block.get("opcode")) | |
# Refined type code consistency checks | |
if type_code == 2: # Block ID (substack or reporter/boolean plugged in, without shadow) | |
if input_name.startswith("SUBSTACK"): | |
if connected_block_type != "stack": | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' (type 2, SUBSTACK) expects a stack block, but connected block '{value_or_block_id}' is of type '{connected_block_type}'.") | |
elif connected_block_type not in ["reporter", "boolean"]: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' (type 2, non-SUBSTACK) expects a reporter/boolean, but connected block '{value_or_block_id}' is of type '{connected_block_type}'.") | |
elif type_code == 1: # Literal value or shadow block ID (possibly with an attached block) | |
if isinstance(value_or_block_id, str) and connected_block_type not in ["reporter", "boolean"] and not connected_block.get("shadow"): | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' (type 1) expects a literal or reporter/boolean/shadow, but connected block '{value_or_block_id}' is of type '{connected_block_type}' and not a shadow.") | |
elif type_code == 3: # Shadow block ID with default value, possibly with attached block | |
if isinstance(value_or_block_id, str) and not connected_block.get("shadow"): | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) input '{input_name}' (type 3) expects a shadow block, but connected block '{value_or_block_id}' is not a shadow.") | |
# Specific checks for C-blocks' SUBSTACK | |
if get_block_type(opcode) == "c_block" and input_name.startswith("SUBSTACK"): | |
# For a C-block, SUBSTACK input must have type code 2 and link to a valid block ID | |
if not (isinstance(value_or_block_id, str) and script_blocks.get(value_or_block_id) and type_code == 2): | |
issues.append(f"C-block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' has an invalid or missing SUBSTACK input configuration.") | |
# Shadow block specific checks | |
if block_data.get("shadow"): | |
if block_data.get("topLevel"): | |
issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' is incorrectly marked as topLevel.") | |
if not block_data.get("parent"): | |
issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' is missing a parent.") | |
if block_data.get("next"): # Shadow blocks should never have 'next' | |
issues.append(f"Shadow block '{block_id}' (opcode: {opcode}) for sprite '{sprite_name}' should not have a 'next' connection.") | |
# Enhanced field validation for specific shadow block opcodes | |
if opcode == "math_number": | |
num_field = block_data.get("fields", {}).get("NUM") | |
if not (isinstance(num_field, list) and len(num_field) >= 1 and isinstance(num_field[0], (str, int, float))): | |
issues.append(f"Math_number shadow block '{block_id}' has malformed 'NUM' field: {num_field}. Expected [value, ID_or_null].") | |
elif opcode == "event_broadcast_menu": | |
broadcast_field = block_data.get("fields", {}).get("BROADCAST_OPTION") # Standard field name for broadcast menu | |
if not (isinstance(broadcast_field, list) and len(broadcast_field) >= 1 and isinstance(broadcast_field[0], str) and broadcast_field[0]): # Ensure broadcast name is a non-empty string | |
issues.append(f"Broadcast menu shadow block '{block_id}' has malformed or empty 'BROADCAST_OPTION' field: {broadcast_field}. Expected ['message name', 'ID_or_null'].") | |
if broadcast_field and len(broadcast_field) > 1 and broadcast_field[1] is None: | |
# This indicates a 'default option' is null, which is often a problem if it's meant to be a pre-defined ID. | |
# Flag this as a potential issue, indicating a missing default option. | |
issues.append(f"Broadcast menu shadow block '{block_id}' has a 'BROADCAST_OPTION' field with a null default ID: {broadcast_field}. This might indicate a missing pre-defined broadcast ID.") | |
elif opcode == "sensing_touchingobjectmenu": | |
touching_field = block_data.get("fields", {}).get("TOUCHINGOBJECTMENU") | |
if not (isinstance(touching_field, list) and len(touching_field) >= 1 and isinstance(touching_field[0], str) and touching_field[0]): | |
issues.append(f"Touching object menu shadow block '{block_id}' has malformed or empty 'TOUCHINGOBJECTMENU' field: {touching_field}. Expected ['object name', 'ID_or_null'].") | |
if touching_field and len(touching_field) > 1 and touching_field[1] is None: | |
issues.append(f"Touching object menu shadow block '{block_id}' has a 'TOUCHINGOBJECTMENU' field with a null default ID: {touching_field}. This might indicate a missing pre-defined object ID for 'edge', 'mouse-pointer', or a sprite.") | |
# Add more specific checks for other menu-type shadow blocks if needed | |
# Validate references to variables, lists, and broadcasts | |
if "fields" in block_data: | |
# Variable references (data_setvariableto, data_changevariableby, data_variable) | |
if opcode in ["data_setvariableto", "data_changevariableby", "data_variable"]: | |
var_name_field = block_data["fields"].get("VARIABLE") | |
if var_name_field and isinstance(var_name_field, list) and len(var_name_field) >= 1 and isinstance(var_name_field[0], str): | |
var_name = var_name_field[0] | |
if var_name not in sprite_variables: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) references non-existent variable '{var_name}'. Recommendation: Ensure '{var_name}' is defined.") | |
else: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) has malformed 'VARIABLE' field: {var_name_field}.") | |
# List references (data_addtolist, data_deleteoflist, data_itemoflist, data_listcontents) | |
if opcode in ["data_addtolist", "data_deleteoflist", "data_itemoflist", "data_listcontents"]: | |
list_name_field = block_data["fields"].get("LIST") | |
if list_name_field and isinstance(list_name_field, list) and len(list_name_field) >= 1 and isinstance(list_name_field[0], str): | |
list_name = list_name_field[0] | |
if list_name not in sprite_lists: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) references non-existent list '{list_name}'. Recommendation: Ensure '{list_name}' is defined.") | |
else: | |
issues.append(f"Block '{block_id}' (opcode: {opcode}) has malformed 'LIST' field: {list_name_field}.") | |
# Broadcast references in 'when I receive' hat blocks | |
if opcode == "event_whenbroadcastreceived": | |
broadcast_name_field = block_data["fields"].get("BROADCAST_OPTION") | |
if broadcast_name_field and isinstance(broadcast_name_field, list) and len(broadcast_name_field) >= 1 and isinstance(broadcast_name_field[0], str): | |
broadcast_name = broadcast_name_field[0] | |
if broadcast_name not in sprite_broadcasts: | |
issues.append(f"Hat block '{block_id}' (opcode: {opcode}) listens for non-existent broadcast message '{broadcast_name}'. Recommendation: Ensure '{broadcast_name}' is broadcast elsewhere.") | |
else: | |
issues.append(f"Hat block '{block_id}' (opcode: {opcode}) has malformed 'BROADCAST_OPTION' field: {broadcast_name_field}.") | |
# Custom block call validation | |
if opcode == "procedures_call" and "mutation" in block_data: | |
proccode = block_data["mutation"].get("proccode") | |
if proccode and proccode not in sprite_custom_blocks: | |
issues.append(f"Custom block call '{block_id}' (proccode: {proccode}) references non-existent custom block definition. Recommendation: Define custom block '{proccode}'.") | |
if not proccode: | |
issues.append(f"Custom block call '{block_id}' is missing 'proccode' in its mutation.") | |
# Check for event_broadcast blocks having empty message (via malformed shadow) | |
if opcode == "event_broadcast" or opcode == "event_broadcastandwait": | |
broadcast_input = block_data.get("inputs", {}).get("BROADCAST_INPUT") | |
if broadcast_input and isinstance(broadcast_input, list) and len(broadcast_input) >= 2: | |
# If connected to a shadow block | |
if isinstance(broadcast_input[1], str) and script_blocks.get(broadcast_input[1]) and script_blocks[broadcast_input[1]].get("shadow"): | |
shadow_block = script_blocks[broadcast_input[1]] | |
if shadow_block.get("opcode") == "event_broadcast_menu": | |
broadcast_field = shadow_block.get("fields", {}).get("BROADCAST_OPTION") | |
if not (broadcast_field and isinstance(broadcast_field, list) and len(broadcast_field) >= 1 and isinstance(broadcast_field[0], str) and broadcast_field[0].strip()): | |
issues.append(f"Broadcast block '{block_id}' uses a shadow block '{shadow_block.get('id', 'unknown')}' that has an empty or malformed broadcast message. Recommendation: Ensure the broadcast message is a non-empty string.") | |
# If connected to a literal string (less common for broadcast but possible) | |
elif isinstance(broadcast_input[1], list) and len(broadcast_input[1]) >= 2 and isinstance(broadcast_input[1][1], str) and not broadcast_input[1][1].strip(): | |
issues.append(f"Broadcast block '{block_id}' has an empty literal broadcast message. Recommendation: Provide a non-empty message.") | |
else: | |
issues.append(f"Broadcast block '{block_id}' has a missing or malformed 'BROADCAST_INPUT'. Recommendation: Ensure a valid message or shadow block is connected.") | |
return issues | |
# Node 5: Verification Node | |
def block_verification_node(state: dict) -> dict: | |
""" | |
The block verifier check for any improvements needed through logical if-else | |
and then adds them to the improvement_plan. | |
After the improvement plan, the LLM reviewer node also checks for other | |
errors or issues, if any, and finally provides the review as feedback. | |
Args: | |
state (dict): The current state dictionary containing project_json, etc. | |
Returns: | |
dict: The updated state dictionary with verification feedback. | |
""" | |
logger.info(f"--- Running BlockVerificationNode (Iteration: {state.get('iteration_count', 0)}) ---") | |
MAX_IMPROVEMENT_ITERATIONS = 2 # Set a sensible limit to prevent infinite loops | |
current_iteration = state.get("iteration_count", 0) | |
project_json = state["project_json"] | |
targets = project_json["targets"] | |
# Initialize needs_improvement for the current run | |
state["needs_improvement"] = False | |
block_validation_feedback_overall = [] | |
improvement_plan = {"sprite_issues": {}} | |
state["improvement_plan"] = improvement_plan # Ensure it's in state even if empty | |
for target in targets: | |
sprite_name = target["name"] | |
all_blocks_for_sprite = target.get("blocks", {}) | |
# Correctly extract variables (name -> ID mapping) | |
sprite_variables = {var_data[0]: var_id for var_id, var_data in target.get("variables", {}).items()} | |
# Correctly extract lists (name -> ID mapping) | |
sprite_lists = {list_data[0]: list_id for list_id, list_data in target.get("lists", {}).items()} | |
# Broadcasts are typically collected from the 'event_whenbroadcastreceived' blocks | |
# and potentially from the 'event_broadcast_menu' shadow blocks. | |
sprite_broadcasts = set() | |
for block_id, block_data in all_blocks_for_sprite.items(): | |
if block_data.get("opcode") == "event_whenbroadcastreceived" and "BROADCAST_OPTION" in block_data.get("fields", {}): | |
broadcast_name_field = block_data["fields"]["BROADCAST_OPTION"] | |
if isinstance(broadcast_name_field, list) and len(broadcast_name_field) >= 1 and isinstance(broadcast_name_field[0], str): | |
sprite_broadcasts.add(broadcast_name_field[0]) | |
elif block_data.get("opcode") == "event_broadcast_menu" and "BROADCAST_OPTION" in block_data.get("fields", {}) and block_data.get("shadow"): | |
broadcast_name_field = block_data["fields"]["BROADCAST_OPTION"] | |
if isinstance(broadcast_name_field, list) and len(broadcast_name_field) >= 1 and isinstance(broadcast_name_field[0], str): | |
sprite_broadcasts.add(broadcast_name_field[0]) | |
# Extract custom block definitions (proccodes) | |
sprite_custom_blocks = {} | |
for block_id, block_data in all_blocks_for_sprite.items(): | |
if block_data.get("opcode") == "procedures_definition" and "mutation" in block_data: | |
proccode = block_data["mutation"].get("proccode") | |
if proccode: | |
sprite_custom_blocks[proccode] = block_id | |
if not all_blocks_for_sprite: | |
logger.info(f"Sprite '{sprite_name}' has no blocks. Skipping verification.") | |
continue | |
sprite_issues = [] | |
hat_block_ids = [ | |
block_id for block_id, block_data in all_blocks_for_sprite.items() | |
if block_data.get("topLevel") and get_block_type(block_data.get("opcode")) == "hat" | |
] | |
processed_script_blocks = set() | |
if not hat_block_ids: | |
sprite_issues.append("No top-level hat blocks found for this sprite. Scripts may not run automatically.") | |
for hat_id in hat_block_ids: | |
logger.info(f"Verifying script starting with hat block '{hat_id}' for sprite '{sprite_name}'.") | |
current_script_blocks = filter_script_blocks(all_blocks_for_sprite, hat_id) | |
processed_script_blocks.update(current_script_blocks.keys()) | |
script_issues = analyze_script_structure( | |
current_script_blocks, | |
hat_id, | |
sprite_name, | |
sprite_variables, | |
sprite_lists, | |
sprite_broadcasts, | |
sprite_custom_blocks | |
) | |
if script_issues: | |
sprite_issues.append(f"Issues in script starting with '{hat_id}':") | |
sprite_issues.extend([f" - {issue}" for issue in script_issues]) | |
else: | |
logger.info(f"Script starting with '{hat_id}' for sprite '{sprite_name}' passed basic verification.") | |
# Identify truly orphaned blocks (not top-level, not part of any script, no parent, not shadow) | |
orphaned_blocks_overall = { | |
block_id for block_id in all_blocks_for_sprite.keys() | |
if block_id not in processed_script_blocks | |
and not all_blocks_for_sprite[block_id].get("topLevel") | |
and not all_blocks_for_sprite[block_id].get("parent") | |
and not all_blocks_for_sprite[block_id].get("shadow") | |
} | |
if orphaned_blocks_overall: | |
sprite_issues.append(f"Found {len(orphaned_blocks_overall)} truly orphaned blocks not connected to any valid script: {', '.join(list(orphaned_blocks_overall)[:5])}{'...' if len(orphaned_blocks_overall) > 5 else ''}.") | |
# Also check for top-level blocks that are not hat blocks and are not processed | |
for block_id in all_blocks_for_sprite.keys(): | |
block_data = all_blocks_for_sprite[block_id] | |
if block_data.get("topLevel") and get_block_type(block_data.get("opcode")) != "hat" and block_id not in processed_script_blocks: | |
sprite_issues.append(f"Top-level block '{block_id}' (opcode: {block_data.get('opcode')}) is not a hat block and is not part of any script, so it will not run automatically.") | |
if sprite_issues: | |
improvement_plan["sprite_issues"][sprite_name] = sprite_issues | |
logger.warning(f"Verification found issues for sprite '{sprite_name}'.") | |
block_validation_feedback_overall.append(f"Issues for {sprite_name}:\n" + "\n".join([f"- {issue}" for issue in sprite_issues])) | |
state["needs_improvement"] = True | |
print(f"\n--- Verification Report (Issues Found for {sprite_name}) ---") | |
print(json.dumps({sprite_name: sprite_issues}, indent=2)) | |
else: | |
logger.info(f"Sprite '{sprite_name}' passed all verification checks.") | |
# Consolidate feedback for the LLM | |
state["block_validation_feedback"] = "\n\n".join(block_validation_feedback_overall) | |
print(f"[OVERALL IMPROVEMENT PLAN ON ITERATION {current_iteration}]: {improvement_plan}") | |
if state["needs_improvement"]: | |
llm_reviewer_prompt = f"""You are an expert Scratch project reviewer. Your task is to analyze the provided \ | |
structural issues found in a Scratch project's sprites and suggest improvements or \ | |
further insights. Focus on clarity, accuracy, and actionable advice. | |
Here are the detected structural issues: | |
{json.dumps(improvement_plan['sprite_issues'], indent=2)} | |
Here are the block validation feedback: | |
{json.dumps(state["block_validation_feedback"], indent=2)} | |
Please review these issues and provide a consolidated report with potential causes \ | |
and recommendations for fixing them. If an issue is minor or expected in certain \ | |
scenarios (e.g., hidden blocks for backward compatibility), please note that. | |
Structure your response as a JSON object with 'review_summary' as the key, \ | |
containing a dictionary where keys are sprite names and values are lists of suggested improvements. | |
Example: | |
```json | |
{{ | |
"review_summary": {{ | |
"Sprite1": [ | |
"Issue: Hat block 'abc' is not topLevel. Recommendation: Ensure all scripts start with a top-level hat block that has no parent.", | |
"Issue: Block 'xyz' has unknown opcode 'motion_nonexistent'. Recommendation: Verify the opcode against the Scratch 3.0 block reference. This might be a typo or a deprecated block.", | |
"Issue: Block 'var_block_id' references non-existent variable 'myVariable'. Recommendation: Ensure all variables used in blocks are defined in the sprite's variable list." | |
], | |
"Sprite2": [ | |
"Issue: Found 3 orphaned blocks. Recommendation: Reconnect these blocks to existing scripts or remove them if no longer needed." | |
] | |
}} | |
}} | |
``` | |
""" | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": llm_reviewer_prompt}]}) | |
raw_review_response = response["messages"][-1].content | |
try: | |
state["review_block_feedback"] = extract_json_from_llm_response(raw_review_response) | |
except json.JSONDecodeError as error_json: | |
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") | |
# Use the JSON resolver agent to fix the response | |
correction_prompt = ( | |
"Your task is to correct the provided JSON string to ensure it is **syntactically perfect and adheres strictly to JSON rules**.\n" | |
"Carefully review the JSON for any errors, especially focusing on the reported error at:\n" | |
f"- **Error Details**: {error_json}\n\n" | |
"**Strict Instructions for your response:**\n" | |
"1. **ONLY** output the corrected JSON. Do not include any other text, comments, or explanations outside the JSON.\n" | |
"2. Ensure all property names (keys) are enclosed in **double quotes**.\n" | |
"3. Ensure string values are correctly enclosed in **double quotes** and any internal special characters (like newlines `\\n`, tabs `\\t`, backslashes `\\\\`, or double quotes `\\`) are properly **escaped**.\n" | |
"4. Verify that there are **no extra commas**, especially between key-value pairs or after the last element in an object or array.\n" | |
"5. Ensure proper nesting and matching of curly braces `{}` and square brackets `[]`.\n" | |
"6. **Crucially, remove any extraneous characters or duplicate closing braces outside the main JSON object.**\n" | |
"7. The corrected JSON must be a **complete and valid** JSON object.\n\n" | |
"Here is the problematic JSON string to correct:\n" | |
"```json\n" | |
f"{raw_review_response}\n" | |
"```\n" | |
"Corrected JSON:\n" | |
) | |
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) | |
print(f"[JSON CORRECTOR RESPONSE AT BLOCKVERFIER ]: {correction_response["messages"][-1].content}") | |
state["review_block_feedback"] = extract_json_from_llm_response(correction_response["messages"][-1].content) | |
logger.info("Agent review feedback added to the state.") | |
print("\n--- Agent Review Feedback ---") | |
print(json.dumps(state["review_block_feedback"], indent=2)) | |
except Exception as e: | |
logger.error(f"Error invoking agent for review in BlockVerificationNode: {e}") | |
# Fallback for LLM error: populate with a general error message | |
state["review_block_feedback"] = {"review_summary": {"Overall": [f"Error during LLM review: {e}"]}} | |
else: | |
logger.info("BlockVerificationNode completed: No issues found in any sprite blocks.") | |
print("\n--- Verification Report (No Issues Found) ---") | |
state["block_validation_feedback"] = "No issues found in sprite blocks." | |
state["review_block_feedback"] = {"review_summary": {"Overall": ["No issues found in sprite blocks. All good!"]}} | |
# Manage iteration count based on overall needs_improvement flag | |
if state["needs_improvement"]: | |
state["iteration_count"] = current_iteration + 1 | |
if state["iteration_count"] >= MAX_IMPROVEMENT_ITERATIONS: | |
logger.warning(f"Max improvement iterations ({MAX_IMPROVEMENT_ITERATIONS}) reached for block verification. Forcing 'needs_improvement' to False.") | |
state["needs_improvement"] = False | |
state["block_validation_feedback"] += "\n(Note: Max iterations reached for block verification, stopping further improvements.)" | |
# Ensure 'Overall' key exists before attempting to append | |
if "review_summary" not in state["review_block_feedback"]: | |
state["review_block_feedback"]["review_summary"] = {} | |
if "Overall" not in state["review_block_feedback"]["review_summary"]: | |
state["review_block_feedback"]["review_summary"]["Overall"] = [] | |
state["review_block_feedback"]["review_summary"]["Overall"].append("Max iterations reached for block verification, stopping further improvements based on this run.") | |
else: | |
state["iteration_count"] = 0 # Reset if no more improvement needed for blocks | |
logger.info(f"Block verification completed. Needs Improvement: {state['needs_improvement']}. Feedback: {state['block_validation_feedback'][:100]}...") | |
print("===========================================================================") | |
print(f"[BLOCK VERIFICATION NODE: (improvement_plan)]:{state.get("improvement_plan")}") | |
print(f"[BLOCK VERIFICATION NODE: (review_block_feedback)]:{state.get("review_block_feedback")}") | |
with open("debug_state.json", "w", encoding="utf-8") as f: | |
json.dump(state, f, indent=2, ensure_ascii=False) | |
return state | |
def improvement_block_builder_node(state: GameState): | |
logger.info("--- Running ImprovementBlockBuilderNode ---") | |
project_json = state["project_json"] | |
targets = project_json["targets"] | |
sprite_map = {target["name"]: target for target in targets if not target["isStage"]} | |
# Also get the Stage target | |
stage_target = next((target for target in targets if target["isStage"]), None) | |
if stage_target: | |
sprite_map[stage_target["name"]] = stage_target | |
# Pre-load all block-catalog JSONs once | |
all_catalogs = [ | |
hat_block_data, | |
boolean_block_data, | |
c_block_data, | |
cap_block_data, | |
reporter_block_data, | |
stack_block_data | |
] | |
# Renamed from improvement_plan as it's not directly used here for iteration | |
# improvement_plan = state.get("improvement_plan", {}) | |
block_verification_feedback = state.get("block_validation_feedback", "no feedback") | |
review_block_feedback = state.get("review_block_feedback", {}) # This node is combination of function base issue finder + LLM issue finder. | |
action_plan = state.get("action_plan", {}) | |
if not action_plan: # Check if action_plan is empty instead of review_block_feedback | |
logger.warning("No action plan found in state. Skipping ImprovementBlockBuilderNode.") | |
return state | |
script_y_offset = {} | |
script_x_offset_per_sprite = {name: 0 for name in sprite_map.keys()} | |
# This is the handler which ensure if somehow json response changed it handle it.[DONOT REMOVE BELOW LOGIC] | |
if action_plan.get("action_overall_flow", {}) == {}: | |
plan_data = action_plan.items() | |
else: | |
plan_data = action_plan.get("action_overall_flow", {}).items() | |
for sprite_name, sprite_improvements_data in plan_data: | |
if sprite_name in sprite_map: | |
current_sprite_target = sprite_map[sprite_name] | |
# Ensure 'blocks' key exists for the current sprite | |
if "blocks" not in current_sprite_target: | |
current_sprite_target["blocks"] = {} | |
# Fetch existing blocks for the current sprite | |
existing_sprite_blocks = current_sprite_target.get("blocks", {}) | |
if sprite_name not in script_y_offset: | |
script_y_offset[sprite_name] = 0 | |
# Get review feedback specific to this sprite | |
sprite_review_summary = review_block_feedback.get("review_summary", state.get(sprite_name, [])).get(sprite_name, []) # get review_summary or direct_issues if any | |
sprite_review_feedback_str = "\n".join([f"- {issue}" for issue in sprite_review_summary]) if sprite_review_summary else "No specific issues reported for this sprite in the review." | |
for plan_entry in sprite_improvements_data.get("plans", []): | |
event_opcode = plan_entry["event"] # This is now expected to be an opcode | |
logic_sequence = plan_entry["logic"] # This is the semicolon-separated string | |
# Extract the new opcode lists from the plan_entry | |
motion_opcodes = plan_entry.get("motion", []) | |
control_opcodes = plan_entry.get("control", []) | |
operator_opcodes = plan_entry.get("operator", []) | |
sensing_opcodes = plan_entry.get("sensing", []) | |
looks_opcodes = plan_entry.get("looks", []) | |
sound_opcodes = plan_entry.get("sound", []) | |
events_opcodes = plan_entry.get("events", []) | |
data_opcodes = plan_entry.get("data", []) | |
needed_opcodes = ( | |
motion_opcodes + control_opcodes + operator_opcodes + | |
sensing_opcodes + looks_opcodes + sound_opcodes + | |
events_opcodes + data_opcodes | |
) | |
needed_opcodes = list(set(needed_opcodes)) | |
# 2) build filtered runtime catalog (if you still need it) | |
filtered_catalog = { | |
op: ALL_SCRATCH_BLOCKS_CATALOG[op] | |
for op in needed_opcodes | |
if op in ALL_SCRATCH_BLOCKS_CATALOG | |
} | |
# 3) merge human-written catalog + runtime entry for each opcode | |
combined_blocks = {} | |
for op in needed_opcodes: | |
catalog_def = find_block_in_all(op, all_catalogs) or {} | |
runtime_def = ALL_SCRATCH_BLOCKS_CATALOG.get(op, {}) | |
# merge: catalog fields first, then runtime overrides/adds | |
merged = {**catalog_def, **runtime_def} | |
combined_blocks[op] = merged | |
print("Combined blocks for this script:", json.dumps(combined_blocks, indent=2)) | |
llm_block_generation_prompt = f"""You are an AI assistant generating Scratch 3.0 block JSON for a single script based on an improvement plan. | |
The current sprite is '{sprite_name}'. | |
The specific script to generate blocks for is for the event with opcode '{event_opcode}'. | |
The sequential logic to implement is: | |
Logic: {logic_sequence} | |
**Based on the planning, the following Scratch block opcodes are expected to be used to implement this logic. Focus on using these specific opcodes where applicable, and refer to the ALL_SCRATCH_BLOCKS_CATALOG for their full structure and required inputs/fields:** | |
Here is the comprehensive catalog of required Scratch 3.0 blocks: | |
```json | |
{json.dumps(combined_blocks, indent=2)} | |
``` | |
Here is general feedback and suggestions you should take care of: | |
suggestion:{block_verification_feedback} | |
**VERY IMPORTANT: Here is the specific review feedback for this sprite. You MUST address these points in your block generation:** | |
``` | |
{sprite_review_feedback_str} | |
``` | |
Current Scratch project JSON for this sprite (for context, its existing blocks if any. Note: If this is a hat block, you should be generating a new script entirely, otherwise, integrate or modify existing blocks as per the action plan and review feedback): | |
```json | |
{json.dumps(existing_sprite_blocks, indent=2)} | |
``` | |
**Instructions for generating the block JSON (EXTREMELY IMPORTANT - FOLLOW THESE EXAMPLES PRECISELY):** | |
1. **Start with the event block (Hat Block):** This block's `topLevel` should be `true` and `parent` should be `null`. Its `x` and `y` coordinates should be set (e.g., `x: 0, y: 0` or reasonable offsets for multiple scripts). | |
2. **Generate a sequence of connected blocks:** For each block, generate a **unique block ID** (e.g., 'myBlockID123'). | |
3. **Link blocks correctly:** | |
* Use the `next` field to point to the ID of the block directly below it in the stack. | |
* Use the `parent` field to point to the ID of the block directly above it in the stack. | |
* For the last block in a stack, `next` should be `null`. | |
4. **Handle `inputs` for parameters and substacks (CRITICAL DETAIL - PAY CLOSE ATTENTION TO EXAMPLES):** | |
* The value for any key within the `inputs` dictionary MUST always be an **array** of two elements: `[type_code, value_or_block_id]`. | |
* **STRICTLY FORBIDDEN MISTAKE:** DO NOT put an array like `["num", "value"]` or `["_edge_", null]` directly as the `value_or_block_id`. This is the source of past errors. | |
* The `type_code` (first element of the array) indicates the nature of the input: | |
* `1`: For a primitive value or a shadow block ID (e.g., a number, string, boolean, or reference to a shadow block). This is the most common type for direct values. | |
* `2`: For a block ID where another block is directly plugged into this input (e.g., an operator block connected to an `if` condition, or the first block of a C-block's substack). | |
* **Correct Example for Numerical Input (e.g., for `motion_movesteps` STEPS, or `motion_gotoxy` X/Y):** | |
If you need to input the number `10` into a block, you MUST create a separate `math_number` shadow block for it, and then reference its ID. | |
```json | |
// Main block using the number | |
"mainBlockID": {{ | |
"opcode": "motion_movesteps", | |
"inputs": {{ | |
"STEPS": [1, "shadowNumID"] | |
}}, | |
// ... other fields | |
}}, | |
// The separate shadow block for the number '10' | |
"shadowNumID": {{ | |
"opcode": "math_number", | |
"fields": {{ | |
"NUM": ["10", null] | |
}}, | |
"shadow": true, | |
"parent": "mainBlockID", | |
"topLevel": false | |
}} | |
``` | |
* **Correct Example for Dropdown/Menu Input (e.g., `sensing_touchingobject` with 'edge'):** | |
If you need to select 'edge' for the `touching ()?` block, you MUST create a separate `sensing_touchingobjectmenu` shadow block and reference its ID. | |
```json | |
// Main block using the dropdown selection | |
"touchingBlockID": {{ | |
"opcode": "sensing_touchingobject", | |
"inputs": {{ | |
"TOUCHINGOBJECTMENU": [1, "shadowEdgeMenuID"] | |
}}, | |
// ... other fields | |
}}, | |
// The separate shadow block for the 'edge' menu option | |
"shadowEdgeMenuID": {{ | |
"opcode": "sensing_touchingobjectmenu", | |
"fields": {{ | |
"TOUCHINGOBJECTMENU": ["_edge_", null] | |
}}, | |
"shadow": true, | |
"parent": "touchingBlockID", | |
"topLevel": false | |
}} | |
``` | |
* **Correct Example for C-block (e.g., `control_forever`):** | |
The `control_forever` block MUST have a `SUBSTACK` input pointing to the first block inside its loop. The value for `SUBSTACK` must be an array: `[2, "FIRST_BLOCK_IN_FOREVER_LOOP_ID"]`. | |
```json | |
"foreverBlockID": {{ | |
"opcode": "control_forever", | |
"inputs": {{ | |
"SUBSTACK": [2, "firstBlockInsideForeverID"] | |
}}, | |
"next": null, | |
"parent": "blockAboveForeverID", | |
"shadow": false, | |
"topLevel": false | |
}}, | |
"firstBlockInsideForeverID": {{ | |
// ... definition of the first block inside the forever loop | |
"parent": "foreverBlockID", | |
"next": "secondBlockInsideForeverID" // if there's another block | |
}} | |
``` | |
* **Correct Example for C-block with condition (e.g., `control_if`):** | |
The `control_if` block MUST have a `CONDITION` input (typically `type_code: 1` referencing a boolean reporter block) and a `SUBSTACK` input (`type_code: 2` referencing the first block inside the if-body). | |
```json | |
"ifBlockID": {{ | |
"opcode": "control_if", | |
"inputs": {{ | |
"CONDITION": [1, "conditionBlockID"], | |
"SUBSTACK": [2, "firstBlockInsideIfID"] | |
}}, | |
"next": "blockAfterIfID", | |
"parent": "blockAboveIfID", | |
"shadow": false, | |
"topLevel": false | |
}}, | |
"conditionBlockID": {{ | |
"opcode": "sensing_touchingobject", // Example condition block | |
// ... definition for condition block, parent should be "ifBlockID" | |
}}, | |
"firstBlockInsideIfID": {{ | |
// ... definition of the first block inside the if body | |
"parent": "ifBlockID", | |
"next": null // or next block if more | |
}} | |
``` | |
5. **Define ALL Shadow Blocks Separately (THIS IS ESSENTIAL):** Every time a block's input requires a number, string literal, or a selection from a dropdown/menu, you MUST define a **separate block entry** in the top-level blocks dictionary for that shadow. Each shadow block MUST have `"shadow": true` and `"topLevel": false`. | |
6. **`fields` for direct dropdown values/text:** Use the `fields` dictionary ONLY IF the block directly embeds a dropdown value or text field without an `inputs` connection (e.g., the `KEY_OPTION` field within the `event_whenkeypressed` shadow block itself, or the `variable` field on a `data_setvariableto` block). Example: `"fields": {{"KEY_OPTION": ["space", null]}}`. | |
7. **`topLevel: true` for hat blocks:** Only the starting (hat) block of a script should have `"topLevel": true`. | |
8. **Ensure Unique Block IDs:** Every block you generate (main blocks and shadow blocks) must have a unique ID within the entire script's block dictionary. | |
9. **Strictly Use Catalog Opcodes:** You MUST only use `opcode` values that are present in the provided `ALL_SCRATCH_BLOCKS_CATALOG`. Do NOT use unlisted opcodes like `motion_jump`. | |
10. **Return ONLY the JSON object representing all the blocks for THIS SINGLE SCRIPT.** Do NOT wrap it in a 'blocks' key or the full project JSON. The output should be a dictionary where keys are block IDs and values are block definitions. You need to ensure that the generated blocks address the provided `sprite_review_feedback_str` and integrate with or replace existing blocks as necessary to fulfill the `logic_sequence` for the `event_opcode`.""" | |
try: | |
response = agent.invoke({"messages": [{"role": "user", "content": llm_block_generation_prompt}]}) | |
raw_response = response["messages"][-1].content#strip_noise(response["messages"][-1].content) | |
logger.info(f"Raw response from LLM [ImprovementBlockBuilderNode - {sprite_name} - {event_opcode}]: {raw_response[:500]}...") | |
try: | |
generated_blocks = extract_json_from_llm_response(raw_response) | |
except json.JSONDecodeError as error_json: | |
logger.error("Failed to extract JSON from LLM response. Attempting to correct the response.") | |
# Use the JSON resolver agent to fix the response | |
correction_prompt = ( | |
"Your primary goal is to generate JSON that precisely defines Scratch 3.0 blocks. " | |
"The output MUST be a single, valid JSON object, enclosed within a '```json' markdown block.\n" | |
"**STRICT JSON SYNTAX RULES:**\n" | |
"1. **NO EXTRA TEXT, COMMENTS, OR CONVERSATIONAL FILLER.** Only the JSON.\n" | |
"2. All keys MUST be enclosed in double quotes.\n" | |
"3. String values MUST be enclosed in double quotes. Internal double quotes must be escaped (e.g., `\\\"`). Newlines (`\\n`) and carriage returns (`\\r`) in strings must be escaped.\n" | |
"4. No trailing commas.\n" | |
"5. Correct nesting of braces `{}` and brackets `[]`.\n" | |
"6. **CRITICAL: Understand and strictly follow the Scratch block input array format.**\n" | |
" - When an input refers to another top-level block (like a variable block or a number block that is defined separately):\n" | |
" `\"INPUT_NAME\": [1, \"ID_OF_REFERENCED_BLOCK\"]`\n" | |
" - When an input *directly embeds* a literal value (like a number, string, or boolean shadow block):\n" | |
" `\"INPUT_NAME\": [1, [\"math_number\", \"10\"]]` (for numbers)\n" | |
" `\"INPUT_NAME\": [1, [\"text\", \"hello\"]]` (for strings)\n" | |
" `\"INPUT_NAME\": [1, [\"boolean\", true]]` (for booleans)\n" | |
" - **Do NOT** generate patterns like `some_id[\"value\", null]]` or `\"\"\": [1, some_id[\"\", null]]`.\n" | |
" - Ensure `\"/X\"` and `\"Y\"` inputs for `motion_gotoxy` and `motion_glidesecstoxy` correctly reference a block ID or directly embed a number block.\n" | |
"7. The 'logic' field should contain a plain string describing the action, without any embedded JSON or Scratch block syntax (like `\"forever\":\"` which breaks the string).\n\n" | |
"Here is the problematic JSON string that needs correction, including the error details:\n" | |
f"- **Error Details**: {error_json}\n" # Keep the error details for context | |
"```json\n" | |
f"{raw_response}\n" | |
"```\n" | |
"Your corrected, valid JSON response:\n" | |
"```json\n" | |
) | |
correction_response = agent_json_resolver.invoke({"messages": [{"role": "user", "content": correction_prompt}]}) | |
print(f"[JSON CORRECTOR RESPONSE AT IMPROVEMENTBLOCKBUILDER ]: {correction_response["messages"][-1].content}") | |
generated_blocks = extract_json_from_llm_response(correction_response["messages"][-1].content)#strip_noise(correction_response["messages"][-1].content)) | |
if "blocks" in generated_blocks and isinstance(generated_blocks["blocks"], dict): | |
logger.warning(f"LLM returned nested 'blocks' key for {sprite_name}. Unwrapping.") | |
generated_blocks = generated_blocks["blocks"] | |
# Update block positions for top-level script | |
for block_id, block_data in generated_blocks.items(): | |
if block_data.get("topLevel"): | |
block_data["x"] = script_x_offset_per_sprite.get(sprite_name, 0) | |
block_data["y"] = script_y_offset[sprite_name] | |
script_y_offset[sprite_name] += 150 # Increment for next script | |
# Merge newly generated blocks with existing ones. | |
# This is a crucial step for improvement. New blocks will overwrite existing ones with the same ID. | |
# If the intention is to completely replace an existing script, the LLM should generate the full new script. | |
# If it's to add/modify, the LLM should be aware of existing IDs to avoid conflicts or use them for parent/next links. | |
current_sprite_target["blocks"].update(generated_blocks) | |
logger.info(f"Improvement blocks added/updated for sprite '{sprite_name}', script '{event_opcode}' by ImprovementBlockBuilderNode.") | |
except Exception as e: | |
logger.error(f"Error generating blocks for sprite '{sprite_name}', script '{event_opcode}': {e}") | |
raise | |
state["project_json"] = project_json # Update the state with the modified project_json | |
logger.info("Updated project JSON with improvement nodes.") | |
print("Updated project JSON with improvement nodes:", json.dumps(project_json, indent=2)) # Print for direct visibility | |
with open("debug_state.json", "w", encoding="utf-8") as f: | |
json.dump(state, f, indent=2, ensure_ascii=False) | |
return state | |
#temporarry time delay for handling TPM issue | |
import time | |
def delay_for_tpm_node(state: GameState): | |
logger.info("--- Running DelayForTPMNode ---") | |
time.sleep(80) # Adjust the delay as needed | |
logger.info("Delay completed.") | |
return state | |
# Build the LangGraph workflow | |
workflow = StateGraph(GameState) | |
# Add all nodes to the workflow | |
workflow.add_node("parse_query", parse_query_and_set_initial_positions) | |
workflow.add_node("game_description", game_description_node) | |
workflow.add_node("declarations_plan", declaration_planner_node) | |
workflow.add_node("add_declaration", declaration_builder_node) | |
workflow.add_node("time_delay_1", delay_for_tpm_node) # this is a temporary node to handle TPM issues | |
workflow.add_node("time_delay_2", delay_for_tpm_node) # this is a temporary node to handle TPM issues | |
workflow.add_node("time_delay_3", delay_for_tpm_node) # this is a temporary node to handle TPM issues | |
workflow.add_node("time_delay_4", delay_for_tpm_node) # this is a temporary node to handle TPM issues | |
workflow.add_node("time_delay_5", delay_for_tpm_node) # this is a temporary node to handle TPM issues | |
workflow.add_node("time_delay_6", delay_for_tpm_node) # this is a temporary node to handle TPM issues | |
workflow.add_node("initial_plan_build", overall_planner_node) # High-level planning node | |
workflow.add_node("plan_verifier", plan_verification_node) # Verifies the high-level plan | |
workflow.add_node("refined_planner", refined_planner_node) # Refines the action plan | |
workflow.add_node("block_builder", overall_block_builder_node) # Builds blocks from a plan | |
workflow.add_node("block_verifier", block_verification_node) # Verifies the generated blocks | |
workflow.add_node("improved_block_builder", improvement_block_builder_node) # For specific block-level improvements | |
# Set the entry point | |
workflow.set_entry_point("game_description") | |
# Define the standard initial flow | |
workflow.add_edge("game_description", "parse_query") | |
workflow.add_edge("parse_query", "time_delay_1") | |
workflow.add_edge("time_delay_1", "declarations_plan") | |
workflow.add_edge("declarations_plan", "add_declaration") | |
workflow.add_edge("add_declaration", "time_delay_2") | |
workflow.add_edge("time_delay_2", "initial_plan_build") | |
workflow.add_edge("initial_plan_build", "time_delay_3") | |
workflow.add_edge("time_delay_3", "plan_verifier") | |
# Define the conditional logic after plan_verifier (for high-level plan issues) | |
def decide_next_step_after_plan_verification(state: GameState): | |
if state.get("needs_improvement", False): | |
# If the plan needs refinement, go to the refined_planner | |
return "refined_planner" | |
else: | |
# If the plan is good, proceed to building blocks from this plan | |
return "block_builder" | |
workflow.add_conditional_edges( | |
"plan_verifier", | |
decide_next_step_after_plan_verification, | |
{ | |
"refined_planner": "refined_planner", # Path if plan needs refinement | |
"block_builder": "block_builder" # Path if plan is approved, proceeds to block building | |
} | |
) | |
# --- CRITICAL CHANGE FOR THE PLAN REFINEMENT LOOP --- | |
# After refining the plan, it should go back to plan_verifier for re-verification. | |
workflow.add_edge("refined_planner", "time_delay_4") | |
workflow.add_edge("time_delay_4", "plan_verifier") # This closes the loop for plan refinement and re-verification. | |
# Note: The original code had workflow.add_edge("time_delay", "block_builder") here, | |
# but after refined_planner -> time_delay -> plan_verifier, the decision is made by plan_verifier. | |
# So, this edge might be redundant or incorrect depending on the desired flow. | |
# Assuming the intent is for plan_verifier to always decide the next step. | |
# After blocks are built, they need to be verified | |
#workflow.add_edge("time_delay_3", "block_builder") | |
workflow.add_edge("block_builder", "time_delay_5") | |
workflow.add_edge("time_delay_5", "block_verifier") | |
# Define the conditional logic after block_verifier (for generated blocks issues) | |
def decide_after_block_verification(state: GameState): | |
if state.get("needs_improvement", False): | |
# If blocks need improvement, go to improved_block_builder. | |
# This assumes improved_block_builder handles specific block-level fixes. | |
return "improved_block_builder" | |
else: | |
# If blocks are good, end the workflow | |
return "END" | |
workflow.add_conditional_edges( | |
"block_verifier", | |
decide_after_block_verification, | |
{ | |
"improved_block_builder": "improved_block_builder", # Path if blocks need improvement | |
"END": END # Path if blocks are good | |
} | |
) | |
# Create the loop: If blocks improved, re-verify them | |
#workflow.add_edge("time_delay_4", "improved_block_builder") | |
workflow.add_edge("improved_block_builder", "time_delay_6") | |
workflow.add_edge("time_delay_6", "block_verifier") | |
# Compile the workflow graph | |
app_graph = workflow.compile() | |
# # Build the LangGraph workflow | |
# workflow = StateGraph(GameState) | |
# # Add all nodes to the workflow | |
# workflow.add_node("parse_query", parse_query_and_set_initial_positions) | |
# workflow.add_node("game_description", game_description_node) | |
# workflow.add_node("declarations_plan", declaration_planner_node) | |
# workflow.add_node("add_declaration", declaration_builder_node) | |
# workflow.add_node("initial_plan_build", overall_planner_node) # High-level planning node | |
# workflow.add_node("plan_verifier", plan_verification_node) # Verifies the high-level plan | |
# workflow.add_node("refined_planner", refined_planner_node) # Refines the action plan | |
# workflow.add_node("block_builder", overall_block_builder_node) # Builds blocks from a plan | |
# workflow.add_node("block_verifier", block_verification_node) # Verifies the generated blocks | |
# workflow.add_node("improved_block_builder", improvement_block_builder_node) # For specific block-level improvements | |
# # Set the entry point | |
# workflow.set_entry_point("game_description") | |
# # Define the standard initial flow | |
# workflow.add_edge("game_description", "parse_query") | |
# workflow.add_edge("parse_query", "declarations_plan") | |
# workflow.add_edge("declarations_plan", "add_declaration") | |
# workflow.add_edge("add_declaration", "initial_plan_build") | |
# workflow.add_edge("initial_plan_build", "plan_verifier") | |
# # Define the conditional logic after plan_verifier (for high-level plan issues) | |
# def decide_next_step_after_plan_verification(state: GameState): | |
# if state.get("needs_improvement", False): | |
# # If the plan needs refinement, go to the refined_planner | |
# return "refined_planner" | |
# else: | |
# # If the plan is good, proceed to building blocks from this plan | |
# return "block_builder" | |
# workflow.add_conditional_edges( | |
# "plan_verifier", | |
# decide_next_step_after_plan_verification, | |
# { | |
# "refined_planner": "refined_planner", # Path if plan needs refinement | |
# "block_builder": "block_builder" # Path if plan is approved, proceeds to block building | |
# } | |
# ) | |
# # --- CRITICAL CHANGE FOR THE PLAN REFINEMENT LOOP --- | |
# # After refining the plan, it should go back to plan_verifier for re-verification. | |
# workflow.add_edge("refined_planner", "plan_verifier") # This closes the loop for plan refinement and re-verification. | |
# # After blocks are built, they need to be verified | |
# workflow.add_edge("block_builder", "block_verifier") | |
# # Define the conditional logic after block_verifier (for generated blocks issues) | |
# def decide_after_block_verification(state: GameState): | |
# if state.get("needs_improvement", False): | |
# # If blocks need improvement, go to improved_block_builder. | |
# # This assumes improved_block_builder handles specific block-level fixes. | |
# return "improved_block_builder" | |
# else: | |
# # If blocks are good, end the workflow | |
# return "END" | |
# workflow.add_conditional_edges( | |
# "block_verifier", | |
# decide_after_block_verification, | |
# { | |
# "improved_block_builder": "improved_block_builder", # Path if blocks need improvement | |
# "END": END # Path if blocks are good | |
# } | |
# ) | |
# # Create the loop: If blocks improved, re-verify them | |
# workflow.add_edge("improved_block_builder", "block_verifier") | |
# # Compile the workflow graph | |
# app_graph = workflow.compile() | |
# --- Serve the form --- | |
# --- Serve the form --- | |
def index(): | |
return render_template("index4.html") | |
# --- List static assets for the front-end --- | |
def list_assets(): | |
bdir = os.path.join(app.static_folder, "assets", "backdrops") | |
sdir = os.path.join(app.static_folder, "assets", "sprites") | |
sound_dir = os.path.join(app.static_folder, "assets", "sounds") # New sound directory | |
backdrops = [] | |
sprites = [] | |
sounds = [] # List to store sound files | |
try: | |
if os.path.isdir(bdir): | |
backdrops = [f for f in os.listdir(bdir) if f.lower().endswith((".svg", ".png", ".jpg", ".jpeg"))] # Include common image formats | |
if os.path.isdir(sdir): | |
sprites = [f for f in os.listdir(sdir) if f.lower().endswith((".svg", ".png", ".jpg", ".jpeg"))] # Include common image formats | |
if os.path.isdir(sound_dir): # Check and list sound files | |
sounds = [f for f in os.listdir(sound_dir) if f.lower().endswith((".wav", ".mp3", ".aiff"))] # Add more formats as needed | |
logger.info("Successfully listed static assets.") | |
except Exception as e: | |
logger.error(f"Error listing static assets: {e}") | |
return jsonify({"error": "Failed to list assets"}), 500 | |
return jsonify(backdrops=backdrops, sprites=sprites, sounds=sounds) # Return sounds as well | |
# --- Helper: build costume entries --- | |
def make_costume_entry(folder, filename, name): | |
asset_id = filename.rsplit(".", 1)[0] | |
entry = { | |
"name": name, | |
"bitmapResolution": 1, | |
"dataFormat": filename.rsplit(".", 1)[1], | |
"assetId": asset_id, | |
"md5ext": filename, | |
} | |
path = os.path.join(app.static_folder, "assets", folder, filename) | |
# Check for image file extensions | |
if filename.lower().endswith((".svg", ".png", ".jpg", ".jpeg")): | |
if filename.lower().endswith(".svg"): # SVG files | |
# For SVGs, default Scratch values are typically 240x180 for stage, others 0,0 | |
if folder == "backdrops": | |
entry["rotationCenterX"] = 240 | |
entry["rotationCenterY"] = 180 | |
else: | |
entry["rotationCenterX"] = 0 | |
entry["rotationCenterY"] = 0 | |
else: # Raster image files | |
try: | |
img = Image.open(path) | |
w, h = img.size | |
entry["rotationCenterX"] = w // 2 | |
entry["rotationCenterY"] = h // 2 | |
except Exception as e: | |
logger.warning(f"Could not determine image dimensions for {filename}: {e}. Setting center to 0,0.") | |
entry["rotationCenterX"] = 0 | |
entry["rotationCenterY"] = 0 | |
else: | |
logger.warning(f"Unknown asset type for {filename}. Setting center to 0,0.") | |
entry["rotationCenterX"] = 0 | |
entry["rotationCenterY"] = 0 # Default if not an image | |
return entry | |
# --- Helper: build sound entries --- | |
def make_sound_entry(filename, name): | |
asset_id = filename.rsplit(".", 1)[0] | |
entry = { | |
"name": name, | |
"dataFormat": filename.rsplit(".", 1)[1], | |
"rate": 44100, # Common sample rate for sounds | |
"sampleCount": 0, # This would typically be calculated from the audio file | |
"assetId": asset_id, | |
"md5ext": filename, | |
} | |
# For a real implementation, you might want to use a library like pydub to get | |
# the sampleCount and duration if needed for more complex sound handling. | |
return entry | |
# --- New endpoint to fetch project.json --- | |
def get_project(project_id): | |
project_folder = os.path.join("generated_projects", project_id) | |
project_json_path = os.path.join(project_folder, "project.json") | |
try: | |
if os.path.exists(project_json_path): | |
logger.info(f"Serving project.json for project ID: {project_id}") | |
return send_from_directory(project_folder, "project.json", as_attachment=True, download_name=f"{project_id}.json") | |
else: | |
logger.warning(f"Project JSON not found for ID: {project_id}") | |
return jsonify({"error": "Project not found"}), 404 | |
except Exception as e: | |
logger.error(f"Error serving project.json for ID {project_id}: {e}") | |
return jsonify({"error": "Failed to retrieve project"}), 500 | |
# --- New endpoint to fetch assets --- | |
def get_asset(project_id, filename): | |
project_folder = os.path.join("generated_projects", project_id) | |
asset_path = os.path.join(project_folder, filename) | |
try: | |
if os.path.exists(asset_path): | |
logger.info(f"Serving asset '{filename}' for project ID: {project_id}") | |
return send_from_directory(project_folder, filename) | |
else: | |
logger.warning(f"Asset '{filename}' not found for project ID: {project_id}") | |
return jsonify({"error": "Asset not found"}), 404 | |
except Exception as e: | |
logger.error(f"Error serving asset '{filename}' for project ID {project_id}: {e}") | |
return jsonify({"error": "Failed to retrieve asset"}), 500 | |
# This part is just for demonstration purposes to show app_graph workflow | |
# In a real application, app_graph would be properly initialized. | |
try: | |
png_bytes = app_graph.get_graph().draw_mermaid_png() | |
with open("langgraph_workflow.png", "wb") as f: | |
f.write(png_bytes) | |
except Exception as e: | |
logger.warning(f"Could not draw or save LangGraph workflow diagram: {e}. This might be expected if app_graph is a mock.") | |
# --- Modified `generate_game` Endpoint --- | |
def generate_game(): | |
payload = request.json | |
logger.info(f"Received payload: {json.dumps(payload, indent=2)}") # Add this line | |
desc = payload.get("description", "") | |
backdrops = payload.get("backdrops", []) | |
sprites = payload.get("sprites", []) | |
backdrop_sounds = payload.get("backdrop_sounds", {}) | |
sprite_sounds = payload.get("sprite_sounds", {}) | |
logger.info(f"Backdrops received: {backdrops}") | |
logger.info(f"Sprites received: {sprites}") | |
logger.info(f"Backdrop sounds received: {backdrop_sounds}") # Add this | |
logger.info(f"Sprite sounds received: {sprite_sounds}") | |
logger.info(f"Starting game generation for description: '{desc}'") | |
# 1) Initial skeleton generation | |
project_skeleton = { | |
"targets": [ | |
{ | |
"isStage": True, | |
"name":"Stage", | |
"objName": "Stage", | |
"variables":{}, | |
"lists":{}, "broadcasts":{}, | |
"blocks":{}, "comments":{}, | |
"currentCostume": len(backdrops)-1 if backdrops else 0, | |
"costumes": [make_costume_entry("backdrops",b["filename"],b["name"]) | |
for b in backdrops], | |
"sounds": [], # Initialize an empty list to be populated with all unique backdrop sounds | |
"volume":100,"layerOrder":0, | |
"tempo":60,"videoTransparency":50,"videoState":"on", | |
"textToSpeechLanguage": None | |
} | |
], | |
"monitors": [], "extensions": [], "meta":{ | |
"semver":"3.0.0","vm":"11.1.0", | |
"agent": request.headers.get("User-Agent","") | |
} | |
} | |
# Populate sounds for the Stage target by consolidating sounds from all backdrops | |
stage_target_sounds = [] | |
seen_sound_md5ext = set() # To track unique sounds by their md5ext (filename) | |
for b_data in backdrops: | |
backdrop_name = b_data["name"] | |
if backdrop_name in backdrop_sounds: | |
for s in backdrop_sounds[backdrop_name]: | |
sound_entry = make_sound_entry(s["filename"], s["name"]) | |
if sound_entry["md5ext"] not in seen_sound_md5ext: | |
stage_target_sounds.append(sound_entry) | |
seen_sound_md5ext.add(sound_entry["md5ext"]) | |
project_skeleton["targets"][0]["sounds"] = stage_target_sounds # Assign the collected unique sounds | |
for idx, s_data in enumerate(sprites, start=1): | |
costume = make_costume_entry("sprites", s_data["filename"], s_data["name"]) | |
sprite_name = s_data["name"] | |
# Get sounds for the current sprite | |
current_sprite_sounds = sprite_sounds.get(sprite_name, []) | |
sprite_sounds_list = [make_sound_entry(s["filename"], s["name"]) for s in current_sprite_sounds] | |
project_skeleton["targets"].append({ | |
"isStage": False, | |
"name": sprite_name, | |
"objName": sprite_name, | |
"variables":{}, "lists":{}, "broadcasts":{}, | |
"blocks":{}, | |
"comments":{}, | |
"currentCostume":0, | |
"costumes":[costume], | |
"sounds": sprite_sounds_list, # Add sprite sounds | |
"volume":100, | |
"layerOrder": idx+1, | |
"visible":True, "x":0,"y":0,"size":100,"direction":90, | |
"draggable":False, "rotationStyle":"all around" | |
}) | |
logger.info("Initial project skeleton created with sounds for stage and sprites.") | |
project_id = str(uuid.uuid4()) | |
project_folder = os.path.join("generated_projects", project_id) | |
try: | |
os.makedirs(project_folder, exist_ok=True) | |
# Save initial skeleton and copy assets | |
project_json_path = os.path.join(project_folder, "project.json") | |
with open(project_json_path, "w") as f: | |
json.dump(project_skeleton, f, indent=2) | |
logger.info(f"Initial project skeleton saved to {project_json_path}") | |
# Copy backdrops | |
for b in backdrops: | |
src_path = os.path.join(app.static_folder, "assets", "backdrops", b["filename"]) | |
dst_path = os.path.join(project_folder, b["filename"]) | |
if os.path.exists(src_path): | |
shutil.copy(src_path, dst_path) | |
else: | |
logger.warning(f"Source backdrop asset not found: {src_path}") | |
# Copy sprites | |
for s in sprites: | |
src_path = os.path.join(app.static_folder, "assets", "sprites", s["filename"]) | |
dst_path = os.path.join(project_folder, s["filename"]) | |
if os.path.exists(src_path): | |
shutil.copy(src_path, dst_path) | |
else: | |
logger.warning(f"Source sprite asset not found: {src_path}") | |
# Copy backdrop sounds | |
for backdrop_name, sounds_list in backdrop_sounds.items(): | |
for s in sounds_list: | |
src_path = os.path.join(app.static_folder, "assets", "sounds", s["filename"]) | |
dst_path = os.path.join(project_folder, s["filename"]) | |
if os.path.exists(src_path): | |
shutil.copy(src_path, dst_path) | |
else: | |
logger.warning(f"Source sound asset for backdrop '{backdrop_name}' not found: {src_path}") | |
# Copy sprite sounds | |
for sprite_name, sounds_list in sprite_sounds.items(): | |
for s in sounds_list: | |
src_path = os.path.join(app.static_folder, "assets", "sounds", s["filename"]) | |
dst_path = os.path.join(project_folder, s["filename"]) | |
if os.path.exists(src_path): | |
shutil.copy(src_path, dst_path) | |
else: | |
logger.warning(f"Source sound asset for sprite '{sprite_name}' not found: {src_path}") | |
logger.info("Assets (including sounds for backdrops and sprites) copied to project folder.") | |
# Initialize the state for LangGraph as a dictionary matching the TypedDict structure | |
initial_state_dict = { # Use a dict directly or ensure GameState constructor matches | |
"project_json": project_skeleton, | |
"description": desc, | |
"project_id": project_id, | |
"sprite_initial_positions": {}, | |
"action_plan": {}, | |
"improvement_plan": {}, | |
"needs_improvement": False, | |
"plan_validation_feedback": {}, | |
"iteration_count": 0, | |
"review_block_feedback": {}, | |
"declaration_plan": {}, | |
} | |
# Convert to GameState object if app_graph.invoke expects it, otherwise use dict | |
# If your actual `app_graph.invoke` expects a dictionary, `initial_state_dict` is fine. | |
# If it expects an instance of `GameState` (a pydantic model or similar), you'd do: | |
# initial_state_obj = GameState(**initial_state_dict) | |
# final_state_obj = app_graph.invoke(initial_state_obj) | |
# final_project_json = final_state_obj.project_json | |
# Using the mock GameState and app_graph, which expects a dict or object with dict-like access | |
final_state_dict = app_graph.invoke(initial_state_dict) # Pass dictionary | |
# Access elements from the final_state_dict | |
final_project_json = final_state_dict['project_json'] # Access as dict | |
# Save the *final* filled project JSON, overwriting the skeleton | |
with open(project_json_path, "w") as f: | |
json.dump(final_project_json, f, indent=2) | |
logger.info(f"Final project JSON saved to {project_json_path}") | |
return jsonify({"message": "Game generated successfully", "project_id": project_id}) | |
except Exception as e: | |
logger.error(f"Error during game generation workflow for project ID {project_id}: {e}", exc_info=True) | |
return jsonify(error=f"Error generating game: {e}"), 500 | |
if __name__=="__main__": | |
logger.info("Starting Flask application...") | |
# Create the 'generated_projects' and 'static/assets' directories if they don't exist | |
os.makedirs("generated_projects", exist_ok=True) | |
os.makedirs("static/assets/backdrops", exist_ok=True) | |
os.makedirs("static/assets/sprites", exist_ok=True) | |
os.makedirs("static/assets/sounds", exist_ok=True) | |
app.run(debug=True,port=5000) | |