File size: 11,356 Bytes
a522962
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import json
import re
from typing import Any, Dict
import logging

logger = logging.getLogger(__name__)

# Dummy data for demonstration. You should replace this with your actual opcode data.
# Each item should have at least an 'opcode' and 'text' field.
hat_block_data = [
    {"opcode": "event_whenflagclicked", "text": "when green flag clicked"},
    {"opcode": "event_whenkeypressed", "text": "when [key] pressed"},
    {"opcode": "event_whenbroadcastreceived", "text": "when I receive [message]"},
]
boolean_block_data = [
    {"opcode": "operator_gt", "text": "< ( ) > ( ) >"},
    {"opcode": "sensing_touchingobject", "text": "<touching [object]?>"},
    {"opcode": "operator_equals", "text": "< ( ) = ( ) >"},
]
c_block_data = [
    {"opcode": "control_forever", "text": "forever"},
    {"opcode": "control_if", "text": "if < > then"},
    {"opcode": "control_repeat", "text": "repeat ( )"},
]
cap_block_data = [
    {"opcode": "control_stop", "text": "stop [all]"},
]
reporter_block_data = [
    {"opcode": "motion_xposition", "text": "(x position)"},
    {"opcode": "motion_yposition", "text": "(y position)"},
    {"opcode": "data_variable", "text": "(variable)"},
    {"opcode": "sensing_answer", "text": "(answer)"},
]
stack_block_data = [
    {"opcode": "motion_gotoxy", "text": "go to x: ( ) y: ( )"},
    {"opcode": "motion_changeyby", "text": "change y by ( )"},
    {"opcode": "motion_setx", "text": "set x to ( )"},
    {"opcode": "motion_glidesecstoxy", "text": "glide ( ) secs to x: ( ) y: ( )"},
    {"opcode": "data_setvariableto", "text": "set [variable] to ( )"},
    {"opcode": "looks_hide", "text": "hide"},
    {"opcode": "looks_show", "text": "show"},
    {"opcode": "event_broadcast", "text": "broadcast [message]"},
]

# Combine all block data into a single list for easier lookup
all_opcodes_list = []
for category_data in [
    hat_block_data,
    boolean_block_data,
    c_block_data,
    cap_block_data,
    reporter_block_data,
    stack_block_data,
]:
    all_opcodes_list.extend(category_data)


def extract_json_from_llm_response(response_text: str) -> Dict[str, Any]:
    """Extracts JSON from an LLM response string."""
    try:
        json_match = re.search(r"```json\n(.*)\n```", response_text, re.DOTALL)
        if json_match:
            return json.loads(json_match.group(1))
        return json.loads(response_text) # Try parsing directly if no code block
    except json.JSONDecodeError as e:
        logger.error(f"Failed to decode JSON: {e} from response: {response_text}")
        raise

# Node 9:plan with exact count of the opcode used per logic
def plan_opcode_counter_node(state: Dict[str, Any]) -> Dict[str, Any]:
    """

    For each plan in state["action_plan"]["action_overall_flow"], calls the LLM agent

    to analyze the `logic` string and return a list of {opcode, count} for each category.

    """
    logger.info("=== Running OPCODE COUTER LOGIC with LLM counts ===")
    game_description = state.get("description", "No game description provided.")
    sprite_name = {target["name"]: target["name"] for target in state["project_json"]["targets"]} # Adjusted for direct use
    
    action_flow = state.get("action_plan", {}).get("action_overall_flow", {})
    if not action_flow:
        logger.warning("No action_overall_flow found; skipping.")
        return state

    # Prepare block reference strings for the prompt
    hat_description = "Blocks that start a script when an event happens."
    hat_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in hat_block_data])

    boolean_description = "Blocks that report a true or false value and fit into hexagonal inputs."
    boolean_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in boolean_block_data])

    c_description = "Blocks that run scripts inside them repeatedly or conditionally."
    c_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in c_block_data])

    cap_description = "Blocks that end a script."
    cap_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in cap_block_data])

    reporter_description = "Blocks that report a value (number or string) and fit into rounded inputs."
    reporter_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in reporter_block_data])

    stack_description = "Blocks that perform a main action in a script."
    stack_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in stack_block_data])

    refined_flow: Dict[str, Any] = {}
    for sprite, sprite_data in action_flow.items(): # Use .items() for direct iteration
        refined_plans = []
        for plan in sprite_data.get("plans", []):
            logic = plan.get("logic", "")
            event = plan.get("event", "")
            
            # This is where the core change for counting opcodes will happen.
            # We will use the 'logic' string to determine the actual opcodes and their counts.
            opcode_counts = {
                "motion": [],
                "control": [],
                "operator": [],
                "sensing": [],
                "looks": [],
                "sounds": [],
                "events": [],
                "data": [],
            }
            
            # Initialize a dictionary to hold counts for each opcode
            temp_opcode_counts = {}

            # Add the event block explicitly
            if event:
                event_opcode = event.replace('v', '').strip() # Clean the event string
                temp_opcode_counts[event_opcode] = temp_opcode_counts.get(event_opcode, 0) + 1


            # Iterate through all known opcodes and check if their 'text' appears in the logic
            for block_info in all_opcodes_list:
                opcode = block_info["opcode"]
                # Use a more robust regex for matching, accounting for variable names or block inputs
                # We need to be careful with common words that are also part of opcodes, e.g., "if"
                # A more robust solution might involve parsing the Scratch-like logic more deeply.
                # For now, let's try to match the "text" from the block definition.
                # Escape special characters in the block text for regex
                block_text_escaped = re.escape(block_info["text"])
                
                # Replace placeholders like [key], [object], ( ) with regex wildcards
                block_text_pattern = block_text_escaped.replace(r"\[key\]", r".*?").replace(r"\[message\]", r".*?").replace(r"\[object\]", r".*?").replace(r"\( \)", r".*?")
                block_text_pattern = block_text_pattern.replace(r"\[variable\]", r".*?")
                
                # For blocks that might have variations in text (e.g., if-then, if-then-else)
                if opcode == "control_if":
                    if_regex = r"if <.+?> then"
                    if_else_regex = r"if <.+?> then\n.*else"
                    
                    if re.search(if_else_regex, logic, re.DOTALL):
                        temp_opcode_counts["control_if_else"] = temp_opcode_counts.get("control_if_else", 0) + 1
                    elif re.search(if_regex, logic, re.DOTALL):
                        temp_opcode_counts["control_if"] = temp_opcode_counts.get("control_if", 0) + 1
                    continue # Skip general matching for control_if

                if opcode == "control_forever" and "forever" in logic:
                    temp_opcode_counts[opcode] = temp_opcode_counts.get(opcode, 0) + 1
                    continue # Skip general matching

                # General regex match for other blocks
                # We need to make sure we're not just matching substrings of other blocks
                # A simple word boundary or line-by-line check might be better
                # For now, a simple count of occurrences of the "text" within the logic
                # will be used, but this is a simplification.
                count = len(re.findall(block_text_pattern, logic))
                if count > 0:
                    temp_opcode_counts[opcode] = temp_opcode_counts.get(opcode, 0) + count

            # Fill the opcode_counts for each category based on temp_opcode_counts
            def add_to_category(category_list, opcode_name, count):
                if count > 0:
                    category_list.append({"opcode": opcode_name, "count": count})

            for opcode, count in temp_opcode_counts.items():
                if opcode.startswith("motion_"):
                    add_to_category(opcode_counts["motion"], opcode, count)
                elif opcode.startswith("control_"):
                    add_to_category(opcode_counts["control"], opcode, count)
                elif opcode.startswith("operator_"):
                    add_to_category(opcode_counts["operator"], opcode, count)
                elif opcode.startswith("sensing_"):
                    add_to_category(opcode_counts["sensing"], opcode, count)
                elif opcode.startswith("looks_"):
                    add_to_category(opcode_counts["looks"], opcode, count)
                elif opcode.startswith("sounds_"):
                    add_to_category(opcode_counts["sounds"], opcode, count)
                elif opcode.startswith("event_"):
                    add_to_category(opcode_counts["events"], opcode, count)
                elif opcode.startswith("data_"):
                    add_to_category(opcode_counts["data"], opcode, count)

            # Assign the new opcode_counts to the plan
            plan["opcode_counts"] = opcode_counts
            
            # The original plan structure also had categories as direct keys.
            # You can choose to keep this or remove it, depending on your downstream needs.
            # If you want to keep it, you'd populate them based on opcode_counts.
            # For simplicity, let's keep the new 'opcode_counts' key as requested.
            
            # Clear previous lists if you are relying solely on 'opcode_counts'
            plan["motion"] = []
            plan["control"] = []
            plan["operator"] = []
            plan["sensing"] = []
            plan["looks"] = []
            plan["sounds"] = []
            plan["events"] = []
            plan["data"] = []

            # Populate the individual lists based on the newly calculated opcode_counts if needed
            for category, opcodes_list in opcode_counts.items():
                for item in opcodes_list:
                    # Append just the opcode string to the category list
                    plan[category].extend([item['opcode']] * item['count'])


            refined_plans.append(plan)

        refined_flow[sprite] = {
            "description": sprite_data.get("description", ""),
            "plans": refined_plans
        }
        
    state["temporary_node"] = refined_flow
    print(f"[OPCODE COUTER LOGIC]: {refined_flow}")
    logger.info("=== OPCODE COUTER LOGIC completed ===")
    return state