Spaces:
No application file
No application file
import json | |
import re | |
from typing import Any, Dict | |
import logging | |
logger = logging.getLogger(__name__) | |
# Dummy data for demonstration. You should replace this with your actual opcode data. | |
# Each item should have at least an 'opcode' and 'text' field. | |
hat_block_data = [ | |
{"opcode": "event_whenflagclicked", "text": "when green flag clicked"}, | |
{"opcode": "event_whenkeypressed", "text": "when [key] pressed"}, | |
{"opcode": "event_whenbroadcastreceived", "text": "when I receive [message]"}, | |
] | |
boolean_block_data = [ | |
{"opcode": "operator_gt", "text": "< ( ) > ( ) >"}, | |
{"opcode": "sensing_touchingobject", "text": "<touching [object]?>"}, | |
{"opcode": "operator_equals", "text": "< ( ) = ( ) >"}, | |
] | |
c_block_data = [ | |
{"opcode": "control_forever", "text": "forever"}, | |
{"opcode": "control_if", "text": "if < > then"}, | |
{"opcode": "control_repeat", "text": "repeat ( )"}, | |
] | |
cap_block_data = [ | |
{"opcode": "control_stop", "text": "stop [all]"}, | |
] | |
reporter_block_data = [ | |
{"opcode": "motion_xposition", "text": "(x position)"}, | |
{"opcode": "motion_yposition", "text": "(y position)"}, | |
{"opcode": "data_variable", "text": "(variable)"}, | |
{"opcode": "sensing_answer", "text": "(answer)"}, | |
] | |
stack_block_data = [ | |
{"opcode": "motion_gotoxy", "text": "go to x: ( ) y: ( )"}, | |
{"opcode": "motion_changeyby", "text": "change y by ( )"}, | |
{"opcode": "motion_setx", "text": "set x to ( )"}, | |
{"opcode": "motion_glidesecstoxy", "text": "glide ( ) secs to x: ( ) y: ( )"}, | |
{"opcode": "data_setvariableto", "text": "set [variable] to ( )"}, | |
{"opcode": "looks_hide", "text": "hide"}, | |
{"opcode": "looks_show", "text": "show"}, | |
{"opcode": "event_broadcast", "text": "broadcast [message]"}, | |
] | |
# Combine all block data into a single list for easier lookup | |
all_opcodes_list = [] | |
for category_data in [ | |
hat_block_data, | |
boolean_block_data, | |
c_block_data, | |
cap_block_data, | |
reporter_block_data, | |
stack_block_data, | |
]: | |
all_opcodes_list.extend(category_data) | |
def extract_json_from_llm_response(response_text: str) -> Dict[str, Any]: | |
"""Extracts JSON from an LLM response string.""" | |
try: | |
json_match = re.search(r"```json\n(.*)\n```", response_text, re.DOTALL) | |
if json_match: | |
return json.loads(json_match.group(1)) | |
return json.loads(response_text) # Try parsing directly if no code block | |
except json.JSONDecodeError as e: | |
logger.error(f"Failed to decode JSON: {e} from response: {response_text}") | |
raise | |
# Node 9:plan with exact count of the opcode used per logic | |
def plan_opcode_counter_node(state: Dict[str, Any]) -> Dict[str, Any]: | |
""" | |
For each plan in state["action_plan"]["action_overall_flow"], calls the LLM agent | |
to analyze the `logic` string and return a list of {opcode, count} for each category. | |
""" | |
logger.info("=== Running OPCODE COUTER LOGIC with LLM counts ===") | |
game_description = state.get("description", "No game description provided.") | |
sprite_name = {target["name"]: target["name"] for target in state["project_json"]["targets"]} # Adjusted for direct use | |
action_flow = state.get("action_plan", {}).get("action_overall_flow", {}) | |
if not action_flow: | |
logger.warning("No action_overall_flow found; skipping.") | |
return state | |
# Prepare block reference strings for the prompt | |
hat_description = "Blocks that start a script when an event happens." | |
hat_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in hat_block_data]) | |
boolean_description = "Blocks that report a true or false value and fit into hexagonal inputs." | |
boolean_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in boolean_block_data]) | |
c_description = "Blocks that run scripts inside them repeatedly or conditionally." | |
c_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in c_block_data]) | |
cap_description = "Blocks that end a script." | |
cap_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in cap_block_data]) | |
reporter_description = "Blocks that report a value (number or string) and fit into rounded inputs." | |
reporter_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in reporter_block_data]) | |
stack_description = "Blocks that perform a main action in a script." | |
stack_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in stack_block_data]) | |
refined_flow: Dict[str, Any] = {} | |
for sprite, sprite_data in action_flow.items(): # Use .items() for direct iteration | |
refined_plans = [] | |
for plan in sprite_data.get("plans", []): | |
logic = plan.get("logic", "") | |
event = plan.get("event", "") | |
# This is where the core change for counting opcodes will happen. | |
# We will use the 'logic' string to determine the actual opcodes and their counts. | |
opcode_counts = { | |
"motion": [], | |
"control": [], | |
"operator": [], | |
"sensing": [], | |
"looks": [], | |
"sounds": [], | |
"events": [], | |
"data": [], | |
} | |
# Initialize a dictionary to hold counts for each opcode | |
temp_opcode_counts = {} | |
# Add the event block explicitly | |
if event: | |
event_opcode = event.replace('v', '').strip() # Clean the event string | |
temp_opcode_counts[event_opcode] = temp_opcode_counts.get(event_opcode, 0) + 1 | |
# Iterate through all known opcodes and check if their 'text' appears in the logic | |
for block_info in all_opcodes_list: | |
opcode = block_info["opcode"] | |
# Use a more robust regex for matching, accounting for variable names or block inputs | |
# We need to be careful with common words that are also part of opcodes, e.g., "if" | |
# A more robust solution might involve parsing the Scratch-like logic more deeply. | |
# For now, let's try to match the "text" from the block definition. | |
# Escape special characters in the block text for regex | |
block_text_escaped = re.escape(block_info["text"]) | |
# Replace placeholders like [key], [object], ( ) with regex wildcards | |
block_text_pattern = block_text_escaped.replace(r"\[key\]", r".*?").replace(r"\[message\]", r".*?").replace(r"\[object\]", r".*?").replace(r"\( \)", r".*?") | |
block_text_pattern = block_text_pattern.replace(r"\[variable\]", r".*?") | |
# For blocks that might have variations in text (e.g., if-then, if-then-else) | |
if opcode == "control_if": | |
if_regex = r"if <.+?> then" | |
if_else_regex = r"if <.+?> then\n.*else" | |
if re.search(if_else_regex, logic, re.DOTALL): | |
temp_opcode_counts["control_if_else"] = temp_opcode_counts.get("control_if_else", 0) + 1 | |
elif re.search(if_regex, logic, re.DOTALL): | |
temp_opcode_counts["control_if"] = temp_opcode_counts.get("control_if", 0) + 1 | |
continue # Skip general matching for control_if | |
if opcode == "control_forever" and "forever" in logic: | |
temp_opcode_counts[opcode] = temp_opcode_counts.get(opcode, 0) + 1 | |
continue # Skip general matching | |
# General regex match for other blocks | |
# We need to make sure we're not just matching substrings of other blocks | |
# A simple word boundary or line-by-line check might be better | |
# For now, a simple count of occurrences of the "text" within the logic | |
# will be used, but this is a simplification. | |
count = len(re.findall(block_text_pattern, logic)) | |
if count > 0: | |
temp_opcode_counts[opcode] = temp_opcode_counts.get(opcode, 0) + count | |
# Fill the opcode_counts for each category based on temp_opcode_counts | |
def add_to_category(category_list, opcode_name, count): | |
if count > 0: | |
category_list.append({"opcode": opcode_name, "count": count}) | |
for opcode, count in temp_opcode_counts.items(): | |
if opcode.startswith("motion_"): | |
add_to_category(opcode_counts["motion"], opcode, count) | |
elif opcode.startswith("control_"): | |
add_to_category(opcode_counts["control"], opcode, count) | |
elif opcode.startswith("operator_"): | |
add_to_category(opcode_counts["operator"], opcode, count) | |
elif opcode.startswith("sensing_"): | |
add_to_category(opcode_counts["sensing"], opcode, count) | |
elif opcode.startswith("looks_"): | |
add_to_category(opcode_counts["looks"], opcode, count) | |
elif opcode.startswith("sounds_"): | |
add_to_category(opcode_counts["sounds"], opcode, count) | |
elif opcode.startswith("event_"): | |
add_to_category(opcode_counts["events"], opcode, count) | |
elif opcode.startswith("data_"): | |
add_to_category(opcode_counts["data"], opcode, count) | |
# Assign the new opcode_counts to the plan | |
plan["opcode_counts"] = opcode_counts | |
# The original plan structure also had categories as direct keys. | |
# You can choose to keep this or remove it, depending on your downstream needs. | |
# If you want to keep it, you'd populate them based on opcode_counts. | |
# For simplicity, let's keep the new 'opcode_counts' key as requested. | |
# Clear previous lists if you are relying solely on 'opcode_counts' | |
plan["motion"] = [] | |
plan["control"] = [] | |
plan["operator"] = [] | |
plan["sensing"] = [] | |
plan["looks"] = [] | |
plan["sounds"] = [] | |
plan["events"] = [] | |
plan["data"] = [] | |
# Populate the individual lists based on the newly calculated opcode_counts if needed | |
for category, opcodes_list in opcode_counts.items(): | |
for item in opcodes_list: | |
# Append just the opcode string to the category list | |
plan[category].extend([item['opcode']] * item['count']) | |
refined_plans.append(plan) | |
refined_flow[sprite] = { | |
"description": sprite_data.get("description", ""), | |
"plans": refined_plans | |
} | |
state["temporary_node"] = refined_flow | |
print(f"[OPCODE COUTER LOGIC]: {refined_flow}") | |
logger.info("=== OPCODE COUTER LOGIC completed ===") | |
return state |