scratch_agent / utils /opcode_counter.py
WebashalarForML's picture
Upload 24 files
3d3703f verified
import json
import re
from typing import Any, Dict
import logging
logger = logging.getLogger(__name__)
# Dummy data for demonstration. You should replace this with your actual opcode data.
# Each item should have at least an 'opcode' and 'text' field.
hat_block_data = [
{"opcode": "event_whenflagclicked", "text": "when green flag clicked"},
{"opcode": "event_whenkeypressed", "text": "when [key] pressed"},
{"opcode": "event_whenbroadcastreceived", "text": "when I receive [message]"},
]
boolean_block_data = [
{"opcode": "operator_gt", "text": "< ( ) > ( ) >"},
{"opcode": "sensing_touchingobject", "text": "<touching [object]?>"},
{"opcode": "operator_equals", "text": "< ( ) = ( ) >"},
]
c_block_data = [
{"opcode": "control_forever", "text": "forever"},
{"opcode": "control_if", "text": "if < > then"},
{"opcode": "control_repeat", "text": "repeat ( )"},
]
cap_block_data = [
{"opcode": "control_stop", "text": "stop [all]"},
]
reporter_block_data = [
{"opcode": "motion_xposition", "text": "(x position)"},
{"opcode": "motion_yposition", "text": "(y position)"},
{"opcode": "data_variable", "text": "(variable)"},
{"opcode": "sensing_answer", "text": "(answer)"},
]
stack_block_data = [
{"opcode": "motion_gotoxy", "text": "go to x: ( ) y: ( )"},
{"opcode": "motion_changeyby", "text": "change y by ( )"},
{"opcode": "motion_setx", "text": "set x to ( )"},
{"opcode": "motion_glidesecstoxy", "text": "glide ( ) secs to x: ( ) y: ( )"},
{"opcode": "data_setvariableto", "text": "set [variable] to ( )"},
{"opcode": "looks_hide", "text": "hide"},
{"opcode": "looks_show", "text": "show"},
{"opcode": "event_broadcast", "text": "broadcast [message]"},
]
# Combine all block data into a single list for easier lookup
all_opcodes_list = []
for category_data in [
hat_block_data,
boolean_block_data,
c_block_data,
cap_block_data,
reporter_block_data,
stack_block_data,
]:
all_opcodes_list.extend(category_data)
def extract_json_from_llm_response(response_text: str) -> Dict[str, Any]:
"""Extracts JSON from an LLM response string."""
try:
json_match = re.search(r"```json\n(.*)\n```", response_text, re.DOTALL)
if json_match:
return json.loads(json_match.group(1))
return json.loads(response_text) # Try parsing directly if no code block
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON: {e} from response: {response_text}")
raise
# Node 9:plan with exact count of the opcode used per logic
def plan_opcode_counter_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
For each plan in state["action_plan"]["action_overall_flow"], calls the LLM agent
to analyze the `logic` string and return a list of {opcode, count} for each category.
"""
logger.info("=== Running OPCODE COUTER LOGIC with LLM counts ===")
game_description = state.get("description", "No game description provided.")
sprite_name = {target["name"]: target["name"] for target in state["project_json"]["targets"]} # Adjusted for direct use
action_flow = state.get("action_plan", {}).get("action_overall_flow", {})
if not action_flow:
logger.warning("No action_overall_flow found; skipping.")
return state
# Prepare block reference strings for the prompt
hat_description = "Blocks that start a script when an event happens."
hat_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in hat_block_data])
boolean_description = "Blocks that report a true or false value and fit into hexagonal inputs."
boolean_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in boolean_block_data])
c_description = "Blocks that run scripts inside them repeatedly or conditionally."
c_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in c_block_data])
cap_description = "Blocks that end a script."
cap_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in cap_block_data])
reporter_description = "Blocks that report a value (number or string) and fit into rounded inputs."
reporter_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in reporter_block_data])
stack_description = "Blocks that perform a main action in a script."
stack_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in stack_block_data])
refined_flow: Dict[str, Any] = {}
for sprite, sprite_data in action_flow.items(): # Use .items() for direct iteration
refined_plans = []
for plan in sprite_data.get("plans", []):
logic = plan.get("logic", "")
event = plan.get("event", "")
# This is where the core change for counting opcodes will happen.
# We will use the 'logic' string to determine the actual opcodes and their counts.
opcode_counts = {
"motion": [],
"control": [],
"operator": [],
"sensing": [],
"looks": [],
"sounds": [],
"events": [],
"data": [],
}
# Initialize a dictionary to hold counts for each opcode
temp_opcode_counts = {}
# Add the event block explicitly
if event:
event_opcode = event.replace('v', '').strip() # Clean the event string
temp_opcode_counts[event_opcode] = temp_opcode_counts.get(event_opcode, 0) + 1
# Iterate through all known opcodes and check if their 'text' appears in the logic
for block_info in all_opcodes_list:
opcode = block_info["opcode"]
# Use a more robust regex for matching, accounting for variable names or block inputs
# We need to be careful with common words that are also part of opcodes, e.g., "if"
# A more robust solution might involve parsing the Scratch-like logic more deeply.
# For now, let's try to match the "text" from the block definition.
# Escape special characters in the block text for regex
block_text_escaped = re.escape(block_info["text"])
# Replace placeholders like [key], [object], ( ) with regex wildcards
block_text_pattern = block_text_escaped.replace(r"\[key\]", r".*?").replace(r"\[message\]", r".*?").replace(r"\[object\]", r".*?").replace(r"\( \)", r".*?")
block_text_pattern = block_text_pattern.replace(r"\[variable\]", r".*?")
# For blocks that might have variations in text (e.g., if-then, if-then-else)
if opcode == "control_if":
if_regex = r"if <.+?> then"
if_else_regex = r"if <.+?> then\n.*else"
if re.search(if_else_regex, logic, re.DOTALL):
temp_opcode_counts["control_if_else"] = temp_opcode_counts.get("control_if_else", 0) + 1
elif re.search(if_regex, logic, re.DOTALL):
temp_opcode_counts["control_if"] = temp_opcode_counts.get("control_if", 0) + 1
continue # Skip general matching for control_if
if opcode == "control_forever" and "forever" in logic:
temp_opcode_counts[opcode] = temp_opcode_counts.get(opcode, 0) + 1
continue # Skip general matching
# General regex match for other blocks
# We need to make sure we're not just matching substrings of other blocks
# A simple word boundary or line-by-line check might be better
# For now, a simple count of occurrences of the "text" within the logic
# will be used, but this is a simplification.
count = len(re.findall(block_text_pattern, logic))
if count > 0:
temp_opcode_counts[opcode] = temp_opcode_counts.get(opcode, 0) + count
# Fill the opcode_counts for each category based on temp_opcode_counts
def add_to_category(category_list, opcode_name, count):
if count > 0:
category_list.append({"opcode": opcode_name, "count": count})
for opcode, count in temp_opcode_counts.items():
if opcode.startswith("motion_"):
add_to_category(opcode_counts["motion"], opcode, count)
elif opcode.startswith("control_"):
add_to_category(opcode_counts["control"], opcode, count)
elif opcode.startswith("operator_"):
add_to_category(opcode_counts["operator"], opcode, count)
elif opcode.startswith("sensing_"):
add_to_category(opcode_counts["sensing"], opcode, count)
elif opcode.startswith("looks_"):
add_to_category(opcode_counts["looks"], opcode, count)
elif opcode.startswith("sounds_"):
add_to_category(opcode_counts["sounds"], opcode, count)
elif opcode.startswith("event_"):
add_to_category(opcode_counts["events"], opcode, count)
elif opcode.startswith("data_"):
add_to_category(opcode_counts["data"], opcode, count)
# Assign the new opcode_counts to the plan
plan["opcode_counts"] = opcode_counts
# The original plan structure also had categories as direct keys.
# You can choose to keep this or remove it, depending on your downstream needs.
# If you want to keep it, you'd populate them based on opcode_counts.
# For simplicity, let's keep the new 'opcode_counts' key as requested.
# Clear previous lists if you are relying solely on 'opcode_counts'
plan["motion"] = []
plan["control"] = []
plan["operator"] = []
plan["sensing"] = []
plan["looks"] = []
plan["sounds"] = []
plan["events"] = []
plan["data"] = []
# Populate the individual lists based on the newly calculated opcode_counts if needed
for category, opcodes_list in opcode_counts.items():
for item in opcodes_list:
# Append just the opcode string to the category list
plan[category].extend([item['opcode']] * item['count'])
refined_plans.append(plan)
refined_flow[sprite] = {
"description": sprite_data.get("description", ""),
"plans": refined_plans
}
state["temporary_node"] = refined_flow
print(f"[OPCODE COUTER LOGIC]: {refined_flow}")
logger.info("=== OPCODE COUTER LOGIC completed ===")
return state