Spaces:
Runtime error
Runtime error
File size: 11,356 Bytes
a522962 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
import json
import re
from typing import Any, Dict
import logging
logger = logging.getLogger(__name__)
# Dummy data for demonstration. You should replace this with your actual opcode data.
# Each item should have at least an 'opcode' and 'text' field.
hat_block_data = [
{"opcode": "event_whenflagclicked", "text": "when green flag clicked"},
{"opcode": "event_whenkeypressed", "text": "when [key] pressed"},
{"opcode": "event_whenbroadcastreceived", "text": "when I receive [message]"},
]
boolean_block_data = [
{"opcode": "operator_gt", "text": "< ( ) > ( ) >"},
{"opcode": "sensing_touchingobject", "text": "<touching [object]?>"},
{"opcode": "operator_equals", "text": "< ( ) = ( ) >"},
]
c_block_data = [
{"opcode": "control_forever", "text": "forever"},
{"opcode": "control_if", "text": "if < > then"},
{"opcode": "control_repeat", "text": "repeat ( )"},
]
cap_block_data = [
{"opcode": "control_stop", "text": "stop [all]"},
]
reporter_block_data = [
{"opcode": "motion_xposition", "text": "(x position)"},
{"opcode": "motion_yposition", "text": "(y position)"},
{"opcode": "data_variable", "text": "(variable)"},
{"opcode": "sensing_answer", "text": "(answer)"},
]
stack_block_data = [
{"opcode": "motion_gotoxy", "text": "go to x: ( ) y: ( )"},
{"opcode": "motion_changeyby", "text": "change y by ( )"},
{"opcode": "motion_setx", "text": "set x to ( )"},
{"opcode": "motion_glidesecstoxy", "text": "glide ( ) secs to x: ( ) y: ( )"},
{"opcode": "data_setvariableto", "text": "set [variable] to ( )"},
{"opcode": "looks_hide", "text": "hide"},
{"opcode": "looks_show", "text": "show"},
{"opcode": "event_broadcast", "text": "broadcast [message]"},
]
# Combine all block data into a single list for easier lookup
all_opcodes_list = []
for category_data in [
hat_block_data,
boolean_block_data,
c_block_data,
cap_block_data,
reporter_block_data,
stack_block_data,
]:
all_opcodes_list.extend(category_data)
def extract_json_from_llm_response(response_text: str) -> Dict[str, Any]:
"""Extracts JSON from an LLM response string."""
try:
json_match = re.search(r"```json\n(.*)\n```", response_text, re.DOTALL)
if json_match:
return json.loads(json_match.group(1))
return json.loads(response_text) # Try parsing directly if no code block
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON: {e} from response: {response_text}")
raise
# Node 9:plan with exact count of the opcode used per logic
def plan_opcode_counter_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
For each plan in state["action_plan"]["action_overall_flow"], calls the LLM agent
to analyze the `logic` string and return a list of {opcode, count} for each category.
"""
logger.info("=== Running OPCODE COUTER LOGIC with LLM counts ===")
game_description = state.get("description", "No game description provided.")
sprite_name = {target["name"]: target["name"] for target in state["project_json"]["targets"]} # Adjusted for direct use
action_flow = state.get("action_plan", {}).get("action_overall_flow", {})
if not action_flow:
logger.warning("No action_overall_flow found; skipping.")
return state
# Prepare block reference strings for the prompt
hat_description = "Blocks that start a script when an event happens."
hat_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in hat_block_data])
boolean_description = "Blocks that report a true or false value and fit into hexagonal inputs."
boolean_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in boolean_block_data])
c_description = "Blocks that run scripts inside them repeatedly or conditionally."
c_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in c_block_data])
cap_description = "Blocks that end a script."
cap_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in cap_block_data])
reporter_description = "Blocks that report a value (number or string) and fit into rounded inputs."
reporter_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in reporter_block_data])
stack_description = "Blocks that perform a main action in a script."
stack_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in stack_block_data])
refined_flow: Dict[str, Any] = {}
for sprite, sprite_data in action_flow.items(): # Use .items() for direct iteration
refined_plans = []
for plan in sprite_data.get("plans", []):
logic = plan.get("logic", "")
event = plan.get("event", "")
# This is where the core change for counting opcodes will happen.
# We will use the 'logic' string to determine the actual opcodes and their counts.
opcode_counts = {
"motion": [],
"control": [],
"operator": [],
"sensing": [],
"looks": [],
"sounds": [],
"events": [],
"data": [],
}
# Initialize a dictionary to hold counts for each opcode
temp_opcode_counts = {}
# Add the event block explicitly
if event:
event_opcode = event.replace('v', '').strip() # Clean the event string
temp_opcode_counts[event_opcode] = temp_opcode_counts.get(event_opcode, 0) + 1
# Iterate through all known opcodes and check if their 'text' appears in the logic
for block_info in all_opcodes_list:
opcode = block_info["opcode"]
# Use a more robust regex for matching, accounting for variable names or block inputs
# We need to be careful with common words that are also part of opcodes, e.g., "if"
# A more robust solution might involve parsing the Scratch-like logic more deeply.
# For now, let's try to match the "text" from the block definition.
# Escape special characters in the block text for regex
block_text_escaped = re.escape(block_info["text"])
# Replace placeholders like [key], [object], ( ) with regex wildcards
block_text_pattern = block_text_escaped.replace(r"\[key\]", r".*?").replace(r"\[message\]", r".*?").replace(r"\[object\]", r".*?").replace(r"\( \)", r".*?")
block_text_pattern = block_text_pattern.replace(r"\[variable\]", r".*?")
# For blocks that might have variations in text (e.g., if-then, if-then-else)
if opcode == "control_if":
if_regex = r"if <.+?> then"
if_else_regex = r"if <.+?> then\n.*else"
if re.search(if_else_regex, logic, re.DOTALL):
temp_opcode_counts["control_if_else"] = temp_opcode_counts.get("control_if_else", 0) + 1
elif re.search(if_regex, logic, re.DOTALL):
temp_opcode_counts["control_if"] = temp_opcode_counts.get("control_if", 0) + 1
continue # Skip general matching for control_if
if opcode == "control_forever" and "forever" in logic:
temp_opcode_counts[opcode] = temp_opcode_counts.get(opcode, 0) + 1
continue # Skip general matching
# General regex match for other blocks
# We need to make sure we're not just matching substrings of other blocks
# A simple word boundary or line-by-line check might be better
# For now, a simple count of occurrences of the "text" within the logic
# will be used, but this is a simplification.
count = len(re.findall(block_text_pattern, logic))
if count > 0:
temp_opcode_counts[opcode] = temp_opcode_counts.get(opcode, 0) + count
# Fill the opcode_counts for each category based on temp_opcode_counts
def add_to_category(category_list, opcode_name, count):
if count > 0:
category_list.append({"opcode": opcode_name, "count": count})
for opcode, count in temp_opcode_counts.items():
if opcode.startswith("motion_"):
add_to_category(opcode_counts["motion"], opcode, count)
elif opcode.startswith("control_"):
add_to_category(opcode_counts["control"], opcode, count)
elif opcode.startswith("operator_"):
add_to_category(opcode_counts["operator"], opcode, count)
elif opcode.startswith("sensing_"):
add_to_category(opcode_counts["sensing"], opcode, count)
elif opcode.startswith("looks_"):
add_to_category(opcode_counts["looks"], opcode, count)
elif opcode.startswith("sounds_"):
add_to_category(opcode_counts["sounds"], opcode, count)
elif opcode.startswith("event_"):
add_to_category(opcode_counts["events"], opcode, count)
elif opcode.startswith("data_"):
add_to_category(opcode_counts["data"], opcode, count)
# Assign the new opcode_counts to the plan
plan["opcode_counts"] = opcode_counts
# The original plan structure also had categories as direct keys.
# You can choose to keep this or remove it, depending on your downstream needs.
# If you want to keep it, you'd populate them based on opcode_counts.
# For simplicity, let's keep the new 'opcode_counts' key as requested.
# Clear previous lists if you are relying solely on 'opcode_counts'
plan["motion"] = []
plan["control"] = []
plan["operator"] = []
plan["sensing"] = []
plan["looks"] = []
plan["sounds"] = []
plan["events"] = []
plan["data"] = []
# Populate the individual lists based on the newly calculated opcode_counts if needed
for category, opcodes_list in opcode_counts.items():
for item in opcodes_list:
# Append just the opcode string to the category list
plan[category].extend([item['opcode']] * item['count'])
refined_plans.append(plan)
refined_flow[sprite] = {
"description": sprite_data.get("description", ""),
"plans": refined_plans
}
state["temporary_node"] = refined_flow
print(f"[OPCODE COUTER LOGIC]: {refined_flow}")
logger.info("=== OPCODE COUTER LOGIC completed ===")
return state |