Spaces:
Sleeping
Sleeping
File size: 8,086 Bytes
013de3c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
#!/Users/norikakizawa/.pyenv/versions/3.12.2/bin/python3
import json, sys, struct, asyncio, aiohttp, os, time
from pathlib import Path
# --- Constants ---
SPACE_URL = "https://zk1tty-rebrowse.hf.space"
TRACE_DIR = Path.home() / ".rebrowse" / "traces"
TRACE_DIR.mkdir(parents=True, exist_ok=True)
EMERGENCY_LOG_FILE = "/tmp/rebrowse_host_emergency.log"
# --- Emergency Logger ---
def emergency_log(message):
try:
with open(EMERGENCY_LOG_FILE, "a") as f:
f.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}\n")
except Exception:
pass
emergency_log("Host script started.")
# --- Native Messaging Helpers ---
def read_native_msg():
"""
Reads a message from stdin using the native messaging protocol.
This is a blocking function.
"""
try:
raw_len = sys.stdin.buffer.read(4)
if not raw_len:
emergency_log("read_native_msg: No raw_len, stdin may be closed.")
return None
length = struct.unpack("=I", raw_len)[0]
message_json = sys.stdin.buffer.read(length).decode("utf-8")
emergency_log(f"read_native_msg: Received message of length {length}. Content: {message_json[:300]}...")
return json.loads(message_json)
except Exception as e:
emergency_log(f"read_native_msg: Error reading message: {e}")
return None
def write_native_msg(msg: dict):
"""
Writes a message to stdout using the native messaging protocol.
"""
try:
encoded = json.dumps(msg).encode("utf-8")
packed_len = struct.pack("=I", len(encoded))
sys.stdout.buffer.write(packed_len)
sys.stdout.buffer.write(encoded)
sys.stdout.flush()
emergency_log(f"write_native_msg: Sent message type {msg.get('type')}. Content: {str(msg)[:300]}...")
except Exception as e:
emergency_log(f"write_native_msg: Error writing message: {e}")
# --- API Communication ---
async def push_trace(path):
emergency_log(f"push_trace: Uploading trace file {path}...")
async with aiohttp.ClientSession() as session:
try:
with open(path, "rb") as f:
data = aiohttp.FormData()
data.add_field('file', f, filename=os.path.basename(path), content_type='application/jsonl')
async with session.post(f"{SPACE_URL}/api/trace/upload", data=data) as response:
response.raise_for_status()
resp_json = await response.json()
trace_id = resp_json.get("trace_id")
emergency_log(f"push_trace: Successfully uploaded. Trace ID: {trace_id}")
write_native_msg({"type": "status", "message": f"Trace uploaded with ID: {trace_id}"})
return trace_id
except aiohttp.ClientError as e:
emergency_log(f"push_trace: Error uploading trace: {e}")
write_native_msg({"type": "error", "message": f"Failed to upload trace: {e}"})
return None
async def poll_replay(trace_id):
if not trace_id:
emergency_log("poll_replay: No trace_id provided, skipping polling.")
return
req_path = f"{SPACE_URL}/api/trace/.requests/{trace_id}"
emergency_log(f"poll_replay: Polling for replay request at {req_path}...")
write_native_msg({"type": "status", "message": f"Waiting for replay command for trace {trace_id}..."})
async with aiohttp.ClientSession() as session:
while True:
try:
async with session.get(req_path) as response:
if response.status == 200:
emergency_log(f"poll_replay: Received 200 OK. Starting replay for trace {trace_id}.")
write_native_msg({"type": "status", "message": f"Replay command received for trace {trace_id}."})
return
except aiohttp.ClientError as e:
emergency_log(f"poll_replay: Error during polling for {trace_id}: {e}. Trying again in 3s.")
await asyncio.sleep(3)
# --- Replay Logic ---
async def replay(trace_path):
emergency_log(f"replay: Starting replay for trace file {trace_path}")
write_native_msg({"type": "status", "message": f"Starting replay of {os.path.basename(trace_path)}..."})
loop = asyncio.get_running_loop()
try:
with open(trace_path) as f:
for line in f:
step = json.loads(line)
step_id = f"{time.time_ns()}"
step["id"] = step_id
step["type"] = "replay-step"
write_native_msg(step)
emergency_log(f"replay: Sent step {step_id}, waiting for ACK...")
while True:
# Use run_in_executor to call the blocking read_native_msg
incoming = await loop.run_in_executor(None, read_native_msg)
if incoming and incoming.get("type") == "ack" and incoming.get("id") == step_id:
emergency_log(f"replay: Received ACK for step {step_id}.")
break
elif incoming is None:
emergency_log("replay: read_native_msg returned None, stdin may be closed. Aborting replay.")
return
else:
emergency_log(f"replay: Received something other than ACK for {step_id}: {str(incoming)[:200]}...")
except FileNotFoundError:
emergency_log(f"replay: Trace file not found at {trace_path}")
write_native_msg({"type": "error", "message": f"Replay failed: trace file not found at {trace_path}"})
except Exception as e:
emergency_log(f"replay: Error during replay: {e}")
write_native_msg({"type": "error", "message": f"Replay failed: {e}"})
# --- Main Event Loop ---
async def main():
write_native_msg({"type": "status", "message": "Native host ready and listening for CDP."})
loop = asyncio.get_running_loop()
trace_lines = []
# The main loop now uses a blocking read in an executor to process messages sequentially.
while True:
msg = await loop.run_in_executor(None, read_native_msg)
if msg is None:
emergency_log("main: read_native_msg returned None, stdin probably closed. Exiting.")
break
if msg.get("type") == "user-event":
trace_lines.append(msg)
elif msg.get("type") == "stop-recording":
emergency_log("main: Received stop-recording command.")
write_native_msg({"type": "status", "message": "Stopping recording and saving trace..."})
# 1. Write file
tid = msg.get("trace_id") or str(int(time.time()))
path = TRACE_DIR / f"{tid}.jsonl"
emergency_log(f"main: Saving trace to {path}")
try:
with open(path, "w") as f:
for ev in trace_lines:
f.write(json.dumps(ev) + "\n")
trace_lines = [] # Clear for next recording
write_native_msg({"type": "status", "message": f"Trace saved locally at {path}."})
except IOError as e:
emergency_log(f"main: Error writing trace file {path}: {e}")
write_native_msg({"type": "error", "message": f"Failed to save trace file: {e}"})
continue
# 2. Upload
trace_id = await push_trace(path)
# 3. Poll for replay request
await poll_replay(trace_id)
# 4. Run replay
await replay(path)
write_native_msg({"type": "status", "message": "Replay finished."})
if __name__ == "__main__":
try:
emergency_log("__main__: Starting asyncio event loop.")
asyncio.run(main())
except KeyboardInterrupt:
emergency_log("__main__: KeyboardInterrupt caught.")
except Exception as e:
emergency_log(f"__main__: An unhandled exception occurred: {e}")
finally:
emergency_log("__main__: Exiting.") |