import json import re from typing import Any, Dict import logging logger = logging.getLogger(__name__) # Dummy data for demonstration. You should replace this with your actual opcode data. # Each item should have at least an 'opcode' and 'text' field. hat_block_data = [ {"opcode": "event_whenflagclicked", "text": "when green flag clicked"}, {"opcode": "event_whenkeypressed", "text": "when [key] pressed"}, {"opcode": "event_whenbroadcastreceived", "text": "when I receive [message]"}, ] boolean_block_data = [ {"opcode": "operator_gt", "text": "< ( ) > ( ) >"}, {"opcode": "sensing_touchingobject", "text": ""}, {"opcode": "operator_equals", "text": "< ( ) = ( ) >"}, ] c_block_data = [ {"opcode": "control_forever", "text": "forever"}, {"opcode": "control_if", "text": "if < > then"}, {"opcode": "control_repeat", "text": "repeat ( )"}, ] cap_block_data = [ {"opcode": "control_stop", "text": "stop [all]"}, ] reporter_block_data = [ {"opcode": "motion_xposition", "text": "(x position)"}, {"opcode": "motion_yposition", "text": "(y position)"}, {"opcode": "data_variable", "text": "(variable)"}, {"opcode": "sensing_answer", "text": "(answer)"}, ] stack_block_data = [ {"opcode": "motion_gotoxy", "text": "go to x: ( ) y: ( )"}, {"opcode": "motion_changeyby", "text": "change y by ( )"}, {"opcode": "motion_setx", "text": "set x to ( )"}, {"opcode": "motion_glidesecstoxy", "text": "glide ( ) secs to x: ( ) y: ( )"}, {"opcode": "data_setvariableto", "text": "set [variable] to ( )"}, {"opcode": "looks_hide", "text": "hide"}, {"opcode": "looks_show", "text": "show"}, {"opcode": "event_broadcast", "text": "broadcast [message]"}, ] # Combine all block data into a single list for easier lookup all_opcodes_list = [] for category_data in [ hat_block_data, boolean_block_data, c_block_data, cap_block_data, reporter_block_data, stack_block_data, ]: all_opcodes_list.extend(category_data) def extract_json_from_llm_response(response_text: str) -> Dict[str, Any]: """Extracts JSON from an LLM response string.""" try: json_match = re.search(r"```json\n(.*)\n```", response_text, re.DOTALL) if json_match: return json.loads(json_match.group(1)) return json.loads(response_text) # Try parsing directly if no code block except json.JSONDecodeError as e: logger.error(f"Failed to decode JSON: {e} from response: {response_text}") raise # Node 9:plan with exact count of the opcode used per logic def plan_opcode_counter_node(state: Dict[str, Any]) -> Dict[str, Any]: """ For each plan in state["action_plan"]["action_overall_flow"], calls the LLM agent to analyze the `logic` string and return a list of {opcode, count} for each category. """ logger.info("=== Running OPCODE COUTER LOGIC with LLM counts ===") game_description = state.get("description", "No game description provided.") sprite_name = {target["name"]: target["name"] for target in state["project_json"]["targets"]} # Adjusted for direct use action_flow = state.get("action_plan", {}).get("action_overall_flow", {}) if not action_flow: logger.warning("No action_overall_flow found; skipping.") return state # Prepare block reference strings for the prompt hat_description = "Blocks that start a script when an event happens." hat_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in hat_block_data]) boolean_description = "Blocks that report a true or false value and fit into hexagonal inputs." boolean_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in boolean_block_data]) c_description = "Blocks that run scripts inside them repeatedly or conditionally." c_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in c_block_data]) cap_description = "Blocks that end a script." cap_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in cap_block_data]) reporter_description = "Blocks that report a value (number or string) and fit into rounded inputs." reporter_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in reporter_block_data]) stack_description = "Blocks that perform a main action in a script." stack_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in stack_block_data]) refined_flow: Dict[str, Any] = {} for sprite, sprite_data in action_flow.items(): # Use .items() for direct iteration refined_plans = [] for plan in sprite_data.get("plans", []): logic = plan.get("logic", "") event = plan.get("event", "") # This is where the core change for counting opcodes will happen. # We will use the 'logic' string to determine the actual opcodes and their counts. opcode_counts = { "motion": [], "control": [], "operator": [], "sensing": [], "looks": [], "sounds": [], "events": [], "data": [], } # Initialize a dictionary to hold counts for each opcode temp_opcode_counts = {} # Add the event block explicitly if event: event_opcode = event.replace('v', '').strip() # Clean the event string temp_opcode_counts[event_opcode] = temp_opcode_counts.get(event_opcode, 0) + 1 # Iterate through all known opcodes and check if their 'text' appears in the logic for block_info in all_opcodes_list: opcode = block_info["opcode"] # Use a more robust regex for matching, accounting for variable names or block inputs # We need to be careful with common words that are also part of opcodes, e.g., "if" # A more robust solution might involve parsing the Scratch-like logic more deeply. # For now, let's try to match the "text" from the block definition. # Escape special characters in the block text for regex block_text_escaped = re.escape(block_info["text"]) # Replace placeholders like [key], [object], ( ) with regex wildcards block_text_pattern = block_text_escaped.replace(r"\[key\]", r".*?").replace(r"\[message\]", r".*?").replace(r"\[object\]", r".*?").replace(r"\( \)", r".*?") block_text_pattern = block_text_pattern.replace(r"\[variable\]", r".*?") # For blocks that might have variations in text (e.g., if-then, if-then-else) if opcode == "control_if": if_regex = r"if <.+?> then" if_else_regex = r"if <.+?> then\n.*else" if re.search(if_else_regex, logic, re.DOTALL): temp_opcode_counts["control_if_else"] = temp_opcode_counts.get("control_if_else", 0) + 1 elif re.search(if_regex, logic, re.DOTALL): temp_opcode_counts["control_if"] = temp_opcode_counts.get("control_if", 0) + 1 continue # Skip general matching for control_if if opcode == "control_forever" and "forever" in logic: temp_opcode_counts[opcode] = temp_opcode_counts.get(opcode, 0) + 1 continue # Skip general matching # General regex match for other blocks # We need to make sure we're not just matching substrings of other blocks # A simple word boundary or line-by-line check might be better # For now, a simple count of occurrences of the "text" within the logic # will be used, but this is a simplification. count = len(re.findall(block_text_pattern, logic)) if count > 0: temp_opcode_counts[opcode] = temp_opcode_counts.get(opcode, 0) + count # Fill the opcode_counts for each category based on temp_opcode_counts def add_to_category(category_list, opcode_name, count): if count > 0: category_list.append({"opcode": opcode_name, "count": count}) for opcode, count in temp_opcode_counts.items(): if opcode.startswith("motion_"): add_to_category(opcode_counts["motion"], opcode, count) elif opcode.startswith("control_"): add_to_category(opcode_counts["control"], opcode, count) elif opcode.startswith("operator_"): add_to_category(opcode_counts["operator"], opcode, count) elif opcode.startswith("sensing_"): add_to_category(opcode_counts["sensing"], opcode, count) elif opcode.startswith("looks_"): add_to_category(opcode_counts["looks"], opcode, count) elif opcode.startswith("sounds_"): add_to_category(opcode_counts["sounds"], opcode, count) elif opcode.startswith("event_"): add_to_category(opcode_counts["events"], opcode, count) elif opcode.startswith("data_"): add_to_category(opcode_counts["data"], opcode, count) # Assign the new opcode_counts to the plan plan["opcode_counts"] = opcode_counts # The original plan structure also had categories as direct keys. # You can choose to keep this or remove it, depending on your downstream needs. # If you want to keep it, you'd populate them based on opcode_counts. # For simplicity, let's keep the new 'opcode_counts' key as requested. # Clear previous lists if you are relying solely on 'opcode_counts' plan["motion"] = [] plan["control"] = [] plan["operator"] = [] plan["sensing"] = [] plan["looks"] = [] plan["sounds"] = [] plan["events"] = [] plan["data"] = [] # Populate the individual lists based on the newly calculated opcode_counts if needed for category, opcodes_list in opcode_counts.items(): for item in opcodes_list: # Append just the opcode string to the category list plan[category].extend([item['opcode']] * item['count']) refined_plans.append(plan) refined_flow[sprite] = { "description": sprite_data.get("description", ""), "plans": refined_plans } state["temporary_node"] = refined_flow print(f"[OPCODE COUTER LOGIC]: {refined_flow}") logger.info("=== OPCODE COUTER LOGIC completed ===") return state