File size: 8,086 Bytes
013de3c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#!/Users/norikakizawa/.pyenv/versions/3.12.2/bin/python3
import json, sys, struct, asyncio, aiohttp, os, time
from pathlib import Path

# --- Constants ---
SPACE_URL = "https://zk1tty-rebrowse.hf.space"
TRACE_DIR = Path.home() / ".rebrowse" / "traces"
TRACE_DIR.mkdir(parents=True, exist_ok=True)
EMERGENCY_LOG_FILE = "/tmp/rebrowse_host_emergency.log"

# --- Emergency Logger ---
def emergency_log(message):
    try:
        with open(EMERGENCY_LOG_FILE, "a") as f:
            f.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}\n")
    except Exception:
        pass

emergency_log("Host script started.")

# --- Native Messaging Helpers ---
def read_native_msg():
    """
    Reads a message from stdin using the native messaging protocol.
    This is a blocking function.
    """
    try:
        raw_len = sys.stdin.buffer.read(4)
        if not raw_len:
            emergency_log("read_native_msg: No raw_len, stdin may be closed.")
            return None
        length = struct.unpack("=I", raw_len)[0]
        message_json = sys.stdin.buffer.read(length).decode("utf-8")
        emergency_log(f"read_native_msg: Received message of length {length}. Content: {message_json[:300]}...")
        return json.loads(message_json)
    except Exception as e:
        emergency_log(f"read_native_msg: Error reading message: {e}")
        return None

def write_native_msg(msg: dict):
    """
    Writes a message to stdout using the native messaging protocol.
    """
    try:
        encoded = json.dumps(msg).encode("utf-8")
        packed_len = struct.pack("=I", len(encoded))
        sys.stdout.buffer.write(packed_len)
        sys.stdout.buffer.write(encoded)
        sys.stdout.flush()
        emergency_log(f"write_native_msg: Sent message type {msg.get('type')}. Content: {str(msg)[:300]}...")
    except Exception as e:
        emergency_log(f"write_native_msg: Error writing message: {e}")

# --- API Communication ---
async def push_trace(path):
    emergency_log(f"push_trace: Uploading trace file {path}...")
    async with aiohttp.ClientSession() as session:
        try:
            with open(path, "rb") as f:
                data = aiohttp.FormData()
                data.add_field('file', f, filename=os.path.basename(path), content_type='application/jsonl')
                async with session.post(f"{SPACE_URL}/api/trace/upload", data=data) as response:
                    response.raise_for_status()
                    resp_json = await response.json()
                    trace_id = resp_json.get("trace_id")
                    emergency_log(f"push_trace: Successfully uploaded. Trace ID: {trace_id}")
                    write_native_msg({"type": "status", "message": f"Trace uploaded with ID: {trace_id}"})
                    return trace_id
        except aiohttp.ClientError as e:
            emergency_log(f"push_trace: Error uploading trace: {e}")
            write_native_msg({"type": "error", "message": f"Failed to upload trace: {e}"})
            return None

async def poll_replay(trace_id):
    if not trace_id:
        emergency_log("poll_replay: No trace_id provided, skipping polling.")
        return
    req_path = f"{SPACE_URL}/api/trace/.requests/{trace_id}"
    emergency_log(f"poll_replay: Polling for replay request at {req_path}...")
    write_native_msg({"type": "status", "message": f"Waiting for replay command for trace {trace_id}..."})
    async with aiohttp.ClientSession() as session:
        while True:
            try:
                async with session.get(req_path) as response:
                    if response.status == 200:
                        emergency_log(f"poll_replay: Received 200 OK. Starting replay for trace {trace_id}.")
                        write_native_msg({"type": "status", "message": f"Replay command received for trace {trace_id}."})
                        return
            except aiohttp.ClientError as e:
                emergency_log(f"poll_replay: Error during polling for {trace_id}: {e}. Trying again in 3s.")
            await asyncio.sleep(3)

# --- Replay Logic ---
async def replay(trace_path):
    emergency_log(f"replay: Starting replay for trace file {trace_path}")
    write_native_msg({"type": "status", "message": f"Starting replay of {os.path.basename(trace_path)}..."})
    loop = asyncio.get_running_loop()
    try:
        with open(trace_path) as f:
            for line in f:
                step = json.loads(line)
                step_id = f"{time.time_ns()}"
                step["id"] = step_id
                step["type"] = "replay-step"
                
                write_native_msg(step)
                emergency_log(f"replay: Sent step {step_id}, waiting for ACK...")

                while True:
                    # Use run_in_executor to call the blocking read_native_msg
                    incoming = await loop.run_in_executor(None, read_native_msg)
                    if incoming and incoming.get("type") == "ack" and incoming.get("id") == step_id:
                        emergency_log(f"replay: Received ACK for step {step_id}.")
                        break
                    elif incoming is None:
                        emergency_log("replay: read_native_msg returned None, stdin may be closed. Aborting replay.")
                        return
                    else:
                        emergency_log(f"replay: Received something other than ACK for {step_id}: {str(incoming)[:200]}...")
    except FileNotFoundError:
        emergency_log(f"replay: Trace file not found at {trace_path}")
        write_native_msg({"type": "error", "message": f"Replay failed: trace file not found at {trace_path}"})
    except Exception as e:
        emergency_log(f"replay: Error during replay: {e}")
        write_native_msg({"type": "error", "message": f"Replay failed: {e}"})

# --- Main Event Loop ---
async def main():
    write_native_msg({"type": "status", "message": "Native host ready and listening for CDP."})
    loop = asyncio.get_running_loop()
    
    trace_lines = []

    # The main loop now uses a blocking read in an executor to process messages sequentially.
    while True:
        msg = await loop.run_in_executor(None, read_native_msg)
        if msg is None:
            emergency_log("main: read_native_msg returned None, stdin probably closed. Exiting.")
            break

        if msg.get("type") == "user-event":
            trace_lines.append(msg)

        elif msg.get("type") == "stop-recording":
            emergency_log("main: Received stop-recording command.")
            write_native_msg({"type": "status", "message": "Stopping recording and saving trace..."})
            
            # 1. Write file
            tid = msg.get("trace_id") or str(int(time.time()))
            path = TRACE_DIR / f"{tid}.jsonl"
            emergency_log(f"main: Saving trace to {path}")
            try:
                with open(path, "w") as f:
                    for ev in trace_lines:
                        f.write(json.dumps(ev) + "\n")
                trace_lines = [] # Clear for next recording
                write_native_msg({"type": "status", "message": f"Trace saved locally at {path}."})
            except IOError as e:
                emergency_log(f"main: Error writing trace file {path}: {e}")
                write_native_msg({"type": "error", "message": f"Failed to save trace file: {e}"})
                continue

            # 2. Upload
            trace_id = await push_trace(path)

            # 3. Poll for replay request
            await poll_replay(trace_id)

            # 4. Run replay
            await replay(path)
            
            write_native_msg({"type": "status", "message": "Replay finished."})

if __name__ == "__main__":
    try:
        emergency_log("__main__: Starting asyncio event loop.")
        asyncio.run(main())
    except KeyboardInterrupt:
        emergency_log("__main__: KeyboardInterrupt caught.")
    except Exception as e:
        emergency_log(f"__main__: An unhandled exception occurred: {e}")
    finally:
        emergency_log("__main__: Exiting.")