Spaces:
Sleeping
Sleeping
File size: 57,269 Bytes
296a173 9dfc544 296a173 9dfc544 b8cfc3d 9dfc544 5195978 9dfc544 296a173 9dfc544 296a173 df68bcf 296a173 b536efb 296a173 b536efb df68bcf b536efb f83a308 b536efb f83a308 b536efb 3a20c66 b536efb 3a20c66 b536efb df68bcf 4f4f634 13b47ea df68bcf b536efb 296a173 df68bcf b536efb df68bcf a0097d3 296a173 f83a308 296a173 7124da9 a0097d3 7124da9 a0097d3 3a20c66 a0097d3 3a20c66 a0097d3 3a20c66 a0097d3 296a173 9dfc544 296a173 9dfc544 296a173 9dfc544 296a173 9dfc544 296a173 9dfc544 0a10dd6 296a173 0a10dd6 b8cfc3d 296a173 0a10dd6 296a173 0a10dd6 a0097d3 0a10dd6 4de79ce |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 |
import pdb
import logging
import os
import gradio as gr
import queue
import threading
import time
import asyncio
import tempfile
from typing import Optional, List, Dict, Any, Union, Callable, Tuple, AsyncGenerator, TextIO
from pathlib import Path
from datetime import datetime as dt
from gradio.themes import Default, Soft, Glass, Monochrome, Ocean, Origin, Base, Citrus
import pandas as pd
from playwright.async_api import BrowserContext as PlaywrightBrowserContextType, Browser as PlaywrightBrowser
from playwright.async_api import Browser # For isinstance check
# from playwright.async_api import async_playwright # Ensure this is removed if only for old recording logic
import json # Add json import
from uuid import uuid4
import uvicorn
from fastapi import FastAPI, HTTPException, UploadFile, BackgroundTasks, Request
from fastapi.responses import RedirectResponse
from playwright.async_api import async_playwright
import aiofiles
from dotenv import load_dotenv
load_dotenv()
# --- Project-specific global imports needed by replay logic ---
from src.browser.custom_browser import CustomBrowser
from src.browser.custom_context import CustomBrowserContext
from src.browser.custom_context_config import CustomBrowserContextConfig as AppCustomBrowserContextConfig
from browser_use.browser.browser import BrowserConfig
from src.utils.trace_utils import get_upload_file_names_from_trace
from src.utils import user_input_functions
from browser_use.browser.context import BrowserContextWindowSize
# --- Global Logging Setup ---
from src.utils.replay_streaming_manager import start_replay_sync_api_in_thread, log_q as manager_log_q
# BasicConfig should still be called once in webui.py for general console logging
if not logging.getLogger().handlers and not logging.getLogger().hasHandlers():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - [%(module)s.%(funcName)s:%(lineno)d] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
else:
if logging.getLogger().getEffectiveLevel() > logging.DEBUG:
logging.getLogger().setLevel(logging.DEBUG)
# --- Specific logger levels for DEBUG ---
logging.getLogger('src.utils.replayer').setLevel(logging.DEBUG)
logging.getLogger('src.controller.custom_controller').setLevel(logging.DEBUG)
logger = logging.getLogger(__name__) # Logger for webui.py itself
logger.info("WebUI: Base logging configured. UI log: ReplayStreamingManager.")
# --- Global Queues and Pipe Paths ---
RECORDER_EVENT_LOG_Q: asyncio.Queue[str] = asyncio.Queue() # This queue might become less central for raw recording
HOST_STATUS_LOG_Q: asyncio.Queue[str] = asyncio.Queue() # This queue is now KEY for the new recorder
HOST_STATUS_PIPE_PATH = "/tmp/rebrowse_host_status.pipe"
# --- NEW: Command Pipe Paths to sync with host.py ---
COMMAND_PIPE_PATH = "/tmp/rebrowse_ui_command.pipe"
RESPONSE_PIPE_PATH = "/tmp/rebrowse_ui_command_response.pipe"
MANUAL_TRACES_DIR = "./tmp/input_tracking"
# --- NEW Global State for "Pipe-to-File" Recording ---
_RECORDING_ACTIVE: bool = False
_RECORDING_FILE_HANDLE: Optional[TextIO] = None
_RECORDING_ASYNC_TASK: Optional[asyncio.Task] = None
_CURRENT_RECORDING_FILE_PATH: Optional[str] = None # To store the path of the current recording
demo: Optional[gr.Blocks] = None
# --- Global Helper Functions (e.g. trace file listing) ---
def refresh_traces():
logger.info("refresh_traces called")
try:
files_details_list = user_input_functions.list_input_trace_files(MANUAL_TRACES_DIR)
df_rows = []
for item_dict in files_details_list:
if isinstance(item_dict, dict):
df_rows.append([
item_dict.get("name", "N/A"),
item_dict.get("created", "N/A"),
item_dict.get("size", "N/A"),
item_dict.get("events", "N/A")
])
if df_rows:
pandas_df = pd.DataFrame(df_rows, columns=["Name", "Created", "Size", "Events"])
return pandas_df, files_details_list
else:
logger.info("No trace files found or processed by refresh_traces.")
return pd.DataFrame(columns=["Name", "Created", "Size", "Events"]), []
except Exception as e:
logger.error(f"Error in refresh_traces: {e}", exc_info=True)
return pd.DataFrame(columns=["Name", "Created", "Size", "Events"]), []
# --- Global Browser/Context Variables ---
_ui_global_browser: Optional[CustomBrowser] = None
_ui_global_browser_context: Optional[CustomBrowserContext] = None
# _global_agent: Optional[Any] = None # This can be reviewed/removed if not used elsewhere
# The old _global_input_tracking_active (if it existed here) is replaced by the new ones below.
# --- NEW Global variables for Recording Feature ---
_global_input_tracking_active: bool = False
_last_manual_trace_path: Optional[str] = None
# Note: The old, separate _global_browser and _global_browser_context for recording have been removed.
# --- NEW Global variable for the replay-specific context ---
# This variable needs to be set by your UI logic when a suitable context is active.
GLOBAL_REPLAY_BROWSER_CTX: Optional[CustomBrowserContext] = None
# --- Global Helper Function for Replay Logic: context_is_closed ---
def context_is_closed(ctx: Optional[PlaywrightBrowserContextType]) -> bool:
"""Checks if a Playwright BrowserContext is closed."""
if not ctx: return True
try:
# Accessing pages on a closed context raises an error.
# Also check if pages list itself is None, which can happen if context was not properly initialized
# or if the underlying Playwright context object is in an invalid state.
if ctx.pages is None: # Explicitly check if pages attribute is None
logger.warning("context_is_closed: context.pages is None, treating as unusable/closed.")
return True
_ = ctx.pages # Trigger potential error if closed
return False
except Exception as e:
logger.debug(f"context_is_closed: Exception caught (likely closed context): {e}")
return True
# --- Global Helper Function for Replay Logic: ensure_browser_session ---
async def ensure_browser_session(
force_new_context_if_existing: bool = False,
) -> Tuple[Optional[CustomBrowser], Optional[CustomBrowserContext]]:
global _ui_global_browser, _ui_global_browser_context, logger, _browser_init_lock
async with _browser_init_lock:
browser_needs_real_init = False
if not _ui_global_browser: browser_needs_real_init = True
elif not _ui_global_browser._actual_playwright_browser: _ui_global_browser = None; browser_needs_real_init = True
else:
core_pw_object = _ui_global_browser._actual_playwright_browser
if isinstance(core_pw_object, PlaywrightBrowser):
if not core_pw_object.is_connected(): _ui_global_browser = None; browser_needs_real_init = True
elif isinstance(core_pw_object, PlaywrightBrowserContextType):
if context_is_closed(core_pw_object): _ui_global_browser = None; browser_needs_real_init = True;
else: _ui_global_browser = None; browser_needs_real_init = True;
if browser_needs_real_init:
cdp_url = os.getenv("CHROME_CDP_URL"); chrome_path = os.getenv("CHROME_PATH")
cfg = BrowserConfig(headless=False,disable_security=True,cdp_url=cdp_url,chrome_instance_path=chrome_path,extra_chromium_args=[f"--window-size={1280},{1100}","--disable-web-security"])
_ui_global_browser = CustomBrowser(config=cfg)
try:
await _ui_global_browser.async_init()
if not _ui_global_browser._actual_playwright_browser: raise Exception("async_init fail")
if _ui_global_browser_context and hasattr(_ui_global_browser_context, 'browser') and _ui_global_browser_context.browser != _ui_global_browser:
try: await _ui_global_browser_context.close()
except: pass
_ui_global_browser_context = None
except Exception as e: logger.error(f"Browser Init Fail: {e}",exc_info=True);_ui_global_browser=None;return None,None
if not _ui_global_browser: return None,None
context_needs_recheck=False
if not _ui_global_browser_context: context_needs_recheck=True
elif hasattr(_ui_global_browser_context,'browser') and _ui_global_browser_context.browser!=_ui_global_browser:
try: await _ui_global_browser_context.close()
except: pass
_ui_global_browser_context=None;context_needs_recheck=True
elif not hasattr(_ui_global_browser_context,'playwright_context') or context_is_closed(_ui_global_browser_context.playwright_context):
_ui_global_browser_context=None;context_needs_recheck=True
if force_new_context_if_existing and _ui_global_browser_context:
try: await _ui_global_browser_context.close()
except: pass
_ui_global_browser_context=None;context_needs_recheck=True
if context_needs_recheck:
try:
cfg=AppCustomBrowserContextConfig(enable_input_tracking=False,browser_window_size=BrowserContextWindowSize(width=1280,height=1100))
if _ui_global_browser.config and _ui_global_browser.config.cdp_url: _ui_global_browser_context=await _ui_global_browser.reuse_existing_context(config=cfg)
if not _ui_global_browser_context: _ui_global_browser_context=await _ui_global_browser.new_context(config=cfg)
if not(_ui_global_browser_context and _ui_global_browser_context.playwright_context):raise Exception("Context link invalid")
except Exception as e:logger.error(f"Context Establish Fail: {e}",exc_info=True);_ui_global_browser_context=None
if _ui_global_browser_context and not _ui_global_browser_context.pages:
try:
await _ui_global_browser_context.new_page()
if not _ui_global_browser_context.pages:logger.error("Failed to create page")
except Exception as e:logger.error(f"Error creating page: {e}",exc_info=True)
if not(_ui_global_browser and _ui_global_browser_context and _ui_global_browser_context.pages): logger.warning("Session incomplete")
return _ui_global_browser,_ui_global_browser_context
# Refactored to be a regular async function, sends logs to RECORDER_EVENT_LOG_Q
async def start_recording_logic():
global _RECORDING_ACTIVE, _RECORDING_FILE_HANDLE, _RECORDING_ASYNC_TASK, _CURRENT_RECORDING_FILE_PATH
global RECORDER_EVENT_LOG_Q, logger, MANUAL_TRACES_DIR
# Log to RECORDER_EVENT_LOG_Q for the Gradio UI "Record Status Logs" textbox
# This queue is separate from HOST_STATUS_LOG_Q used by the pipe writer
def _log_to_ui_q(msg: str, is_error: bool = False):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
formatted_msg = f"[{timestamp}] [WebUI-Record] {msg}"
if is_error: logger.error(msg)
else: logger.info(msg)
try:
RECORDER_EVENT_LOG_Q.put_nowait(formatted_msg)
except asyncio.QueueFull:
logger.warning(f"RECORDER_EVENT_LOG_Q full for UI. Dropped: {formatted_msg}")
_log_to_ui_q("Attempting to start pipe-to-file recording...")
if _RECORDING_ACTIVE:
_log_to_ui_q("Pipe-to-file recording is already active.")
return
try:
if not os.path.exists(MANUAL_TRACES_DIR):
os.makedirs(MANUAL_TRACES_DIR, exist_ok=True)
_log_to_ui_q(f"Created recordings directory: {MANUAL_TRACES_DIR}")
# Generate filename (e.g., YYYYMMDD_HHMMSS_pipe_events.jsonl)
timestamp_str = dt.now().strftime("%Y%m%d_%H%M%S")
new_trace_filename = f"{timestamp_str}_pipe_events.jsonl" # Using .jsonl as it's line-delimited JSON
_CURRENT_RECORDING_FILE_PATH = str(Path(MANUAL_TRACES_DIR) / new_trace_filename)
_log_to_ui_q(f"Opening trace file for writing: {_CURRENT_RECORDING_FILE_PATH}")
_RECORDING_FILE_HANDLE = open(_CURRENT_RECORDING_FILE_PATH, "w", encoding="utf-8")
_RECORDING_ACTIVE = True
# Start the background task to write from HOST_STATUS_LOG_Q to the file
if _RECORDING_ASYNC_TASK and not _RECORDING_ASYNC_TASK.done():
_log_to_ui_q("Warning: Previous recording task was still present. Cancelling it.")
_RECORDING_ASYNC_TASK.cancel()
try: await _RECORDING_ASYNC_TASK # Allow cancellation to process
except asyncio.CancelledError:
_log_to_ui_q("Previous recording task cancelled.")
_RECORDING_ASYNC_TASK = asyncio.create_task(_pipe_to_file_writer())
_log_to_ui_q(f"Pipe-to-file recording started. Saving to: {_CURRENT_RECORDING_FILE_PATH}")
except Exception as e:
_log_to_ui_q(f"Exception starting recording: {e}", is_error=True)
logger.error(f"Exception in start_recording_logic: {e}", exc_info=True)
_RECORDING_ACTIVE = False # Ensure state is correct
if _RECORDING_FILE_HANDLE and not _RECORDING_FILE_HANDLE.closed:
_RECORDING_FILE_HANDLE.close()
_RECORDING_FILE_HANDLE = None
_CURRENT_RECORDING_FILE_PATH = None
if _RECORDING_ASYNC_TASK and not _RECORDING_ASYNC_TASK.done():
_RECORDING_ASYNC_TASK.cancel()
# No await here as we are in an exception handler already
# Refactored to be a regular async function, sends logs to RECORDER_EVENT_LOG_Q
async def stop_recording_logic():
global _RECORDING_ACTIVE, _RECORDING_FILE_HANDLE, _RECORDING_ASYNC_TASK, _CURRENT_RECORDING_FILE_PATH
global _last_manual_trace_path, RECORDER_EVENT_LOG_Q, logger
# Log to RECORDER_EVENT_LOG_Q for the Gradio UI "Record Status Logs" textbox
# TODO: wtf is _log_to_ui_q? Same logs with pipe_trace file?
def _log_to_ui_q(msg: str, is_error: bool = False):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
formatted_msg = f"[{timestamp}] [WebUI-Record] {msg}"
if is_error: logger.error(msg)
else: logger.info(msg)
try:
RECORDER_EVENT_LOG_Q.put_nowait(formatted_msg)
except asyncio.QueueFull:
logger.warning(f"RECORDER_EVENT_LOG_Q full for UI. Dropped: {formatted_msg}")
_log_to_ui_q("Attempting to stop pipe-to-file recording...")
if not _RECORDING_ACTIVE and not _RECORDING_FILE_HANDLE:
_log_to_ui_q("Pipe-to-file recording was not active.")
return
try:
_RECORDING_ACTIVE = False # Signal the writer task to stop
_log_to_ui_q("Recording flag set to inactive.")
if _RECORDING_ASYNC_TASK and not _RECORDING_ASYNC_TASK.done():
_log_to_ui_q("Cancelling pipe-to-file writer task...")
_RECORDING_ASYNC_TASK.cancel()
try:
await _RECORDING_ASYNC_TASK
_log_to_ui_q("Writer task finished after cancellation.")
except asyncio.CancelledError:
_log_to_ui_q("Writer task successfully cancelled.")
except Exception as e_task_await:
_log_to_ui_q(f"Error awaiting writer task: {e_task_await}", is_error=True)
logger.error(f"Error awaiting _RECORDING_ASYNC_TASK: {e_task_await}", exc_info=True)
_RECORDING_ASYNC_TASK = None
if _RECORDING_FILE_HANDLE and not _RECORDING_FILE_HANDLE.closed:
# NEW: Before closing, flush any remaining messages that might still be in HOST_STATUS_LOG_Q
try:
flushed_count = 0
while not HOST_STATUS_LOG_Q.empty():
try:
pending_line = HOST_STATUS_LOG_Q.get_nowait()
_RECORDING_FILE_HANDLE.write(pending_line + "\n")
flushed_count += 1
HOST_STATUS_LOG_Q.task_done()
except asyncio.QueueEmpty:
break
if flushed_count:
_RECORDING_FILE_HANDLE.flush()
logger.info(f"[stop_recording_logic] Flushed {flushed_count} remaining lines from HOST_STATUS_LOG_Q before closing file.")
except Exception as e_flush:
logger.error(f"[stop_recording_logic] Error flushing remaining queue messages: {e_flush}", exc_info=True)
_log_to_ui_q(f"Closing trace file: {_CURRENT_RECORDING_FILE_PATH}")
_RECORDING_FILE_HANDLE.flush()
_RECORDING_FILE_HANDLE.close()
_log_to_ui_q("Trace file closed.")
_RECORDING_FILE_HANDLE = None
if _CURRENT_RECORDING_FILE_PATH:
_last_manual_trace_path = _CURRENT_RECORDING_FILE_PATH # Update for next UI cycle
_log_to_ui_q(f"Trace saved: {_last_manual_trace_path}")
try:
info = user_input_functions.get_file_info(_last_manual_trace_path)
await recorded_trace_info_display.update(info) # push to UI once
except Exception: pass
else:
_log_to_ui_q("Recording stopped, but no current file path was set.", is_error=True)
_CURRENT_RECORDING_FILE_PATH = None # Clear for next recording
except Exception as e:
_log_to_ui_q(f"Error stopping recording: {e}", is_error=True)
logger.error(f"Exception in stop_recording_logic: {e}", exc_info=True)
# Try to revert to a safe state
_RECORDING_ACTIVE = False
# --- Replay UI ---
async def stream_replay_ui(
trace_path: str,
speed: float,
override_files_temp_list: Optional[List[Any]],
request: gr.Request
) -> AsyncGenerator[str, None]:
print("[WEBUI stream_replay_ui] Entered function.", flush=True)
global _ui_global_browser, _ui_global_browser_context, logger, manager_log_q
override_files_paths: List[str] = []
print(f"[WEBUI stream_replay_ui] trace_path: {trace_path}, speed: {speed}, override_files_temp_list: {override_files_temp_list}", flush=True)
if override_files_temp_list:
for temp_file in override_files_temp_list:
if hasattr(temp_file, 'name') and isinstance(temp_file.name, str):
override_files_paths.append(temp_file.name)
elif isinstance(temp_file, str):
override_files_paths.append(temp_file)
else:
logger.warning(f"stream_replay_ui: Skipping unexpected item type {type(temp_file)} in override_files_temp_list")
print(f"[WEBUI stream_replay_ui] Processed override_files_paths: {override_files_paths}", flush=True)
log_buffer = ""
def _accumulate_log(new_text: str) -> str:
nonlocal log_buffer
if log_buffer and not log_buffer.endswith("\n"):
log_buffer += "\n"
log_buffer += new_text
return log_buffer
print("[WEBUI stream_replay_ui] Right before first try...finally block.", flush=True)
try:
log_buffer = _accumulate_log(f"Initiating replay for: {Path(trace_path).name}")
yield log_buffer
except Exception as e_first_yield:
print(f"[WEBUI stream_replay_ui] ERROR during/after first yield (before session): {e_first_yield}", flush=True)
log_buffer = _accumulate_log(f"Error before starting: {e_first_yield}")
yield log_buffer
return
finally:
print("[WEBUI stream_replay_ui] After first yield attempt (inside finally).", flush=True)
logger.info(f"stream_replay_ui: Replay for '{trace_path}'. Ensuring browser session...")
print(f"[WEBUI stream_replay_ui] Ensuring browser session...", flush=True)
live_browser, live_context = await ensure_browser_session()
logger.debug(f"stream_replay_ui: After ensure_browser_session - live_browser: {type(live_browser)}, live_context: {type(live_context)}")
print(f"[WEBUI stream_replay_ui] ensure_browser_session returned: browser={type(live_browser)}, context={type(live_context)}", flush=True)
if not live_browser or not live_context:
err_msg = "Error: Failed to ensure browser session for replay. Check logs from ensure_browser_session."
logger.error(err_msg)
log_buffer = _accumulate_log(f"SESSION ERROR: {err_msg}")
yield log_buffer
print(f"[WEBUI stream_replay_ui] Yielded SESSION ERROR. Returning.", flush=True)
return
log_buffer = _accumulate_log("๐ Browser session ensured. Starting replay thread...")
yield log_buffer
print(f"[WEBUI stream_replay_ui] Yielded 'Browser session ensured'.", flush=True)
ui_async_q: asyncio.Queue[str] = asyncio.Queue()
done_event = threading.Event()
main_loop = asyncio.get_running_loop()
print(f"[WEBUI stream_replay_ui] Initialized ui_async_q, done_event, and got main_loop: {main_loop}.", flush=True)
cdp_url_for_thread = None
if live_browser and hasattr(live_browser, 'config') and live_browser.config and hasattr(live_browser.config, 'cdp_url') and live_browser.config.cdp_url:
cdp_url_for_thread = live_browser.config.cdp_url
print(f"[WEBUI stream_replay_ui] Retrieved CDP URL for thread: {cdp_url_for_thread}", flush=True)
else:
print("[WEBUI stream_replay_ui] ERROR: Could not retrieve cdp_url from live_browser.config.cdp_url.", flush=True)
if not cdp_url_for_thread:
err_msg_cdp = "Error: CDP URL for thread is not available. Cannot connect worker thread to browser."
logger.error(err_msg_cdp)
log_buffer = _accumulate_log(f"CDP ERROR: {err_msg_cdp}")
yield log_buffer
print(f"[WEBUI stream_replay_ui] Yielded CDP ERROR. Returning.", flush=True)
return
logger.debug(f"stream_replay_ui: Calling start_replay_sync_api_in_thread for {trace_path}")
print(f"[WEBUI stream_replay_ui] Calling start_replay_sync_api_in_thread with trace_path={trace_path}, speed={speed}, files={override_files_paths}", flush=True)
done_event = start_replay_sync_api_in_thread(
trace_path,
speed,
override_files_paths,
ui_async_q,
main_loop,
cdp_url_for_thread
)
print(f"[WEBUI stream_replay_ui] start_replay_sync_api_in_thread call completed. Returned done_event: {done_event}", flush=True)
log_buffer = _accumulate_log("--- Replay thread started ---")
yield log_buffer
print(f"[WEBUI stream_replay_ui] Yielded 'Replay thread started'. Beginning monitor loop.", flush=True)
loop_count = 0
while not done_event.is_set() or not ui_async_q.empty():
loop_count += 1
try:
while True:
line = ui_async_q.get_nowait()
log_buffer = _accumulate_log(line)
yield log_buffer
ui_async_q.task_done()
except asyncio.QueueEmpty:
pass
yield log_buffer
await asyncio.sleep(0.25)
logger.info("stream_replay_ui: Replay thread finished. Final log flush.")
print(f"[WEBUI stream_replay_ui] Monitor loop exited. Final log flush. done_event.is_set(): {done_event.is_set()}, ui_async_q.empty(): {ui_async_q.empty()}", flush=True)
while not ui_async_q.empty():
try:
line = ui_async_q.get_nowait()
log_buffer = _accumulate_log(line)
yield log_buffer
ui_async_q.task_done()
except asyncio.QueueEmpty:
print(f"[WEBUI stream_replay_ui] Final flush: ui_async_q is empty.", flush=True)
break
log_buffer = _accumulate_log("--- Replay completedโจ ---")
yield log_buffer
logger.info("stream_replay_ui: Streaming finished.")
print(f"[WEBUI stream_replay_ui] Yielded final 'Replay process fully completed'. Exiting function.", flush=True)
############### POLLING LOG SNAPSHOTS INSTEAD OF INFINITE STREAMS ###############
# Running infinite async generators via .load() blocks subsequent UI events because
# the front-end keeps them in a perpetual "running" state. Instead we expose
# *snapshot* functions that return the latest accumulated log text and let
# Gradio poll them every few hundred milliseconds.
_recorder_log_accum = ""
def poll_recorder_log() -> str:
global _recorder_log_accum, RECORDER_EVENT_LOG_Q
new_messages = []
while not RECORDER_EVENT_LOG_Q.empty():
try:
msg = RECORDER_EVENT_LOG_Q.get_nowait()
new_messages.append(msg)
logger.debug(f"[poll_recorder_log] new_messages: {new_messages}")
RECORDER_EVENT_LOG_Q.task_done()
except asyncio.QueueEmpty:
break
if new_messages:
if _recorder_log_accum and not _recorder_log_accum.endswith("\n"):
_recorder_log_accum += "\n"
_recorder_log_accum += "\n".join(new_messages)
return _recorder_log_accum.strip()
_host_log_accum = "[WebUI] Waiting for Native Host logs..."
def _format_host_log_line(line: str) -> str:
"""Helper to format a single log line to be more human-readable."""
try:
data = json.loads(line)
if not isinstance(data, dict):
return line
log_type = data.get("type")
if log_type == "mousedown":
selector = data.get("selector", "N/A")
url = data.get("url", "N/A")
return f"๐ฑ๏ธ: sel={selector}, ๐: {url}"
elif log_type == "keydown":
key = data.get("key", "N/A")
url = data.get("url", "N/A")
return f"โจ๏ธ: key='{key}', ๐: {url}"
elif log_type == "cdp":
method = data.get("method")
if method == "Page.navigatedWithinDocument":
url = data.get("params", {}).get("url", "N/A")
return f"๐: {url}"
return line
return line
except (json.JSONDecodeError, AttributeError):
return line
def poll_host_status_log() -> str: # called frequently by gr.Timer.tick()
global _host_log_accum, HOST_STATUS_LOG_Q
new_messages = []
while not HOST_STATUS_LOG_Q.empty():
try:
msg = HOST_STATUS_LOG_Q.get_nowait()
new_messages.append(msg)
logger.debug(f"[poll_host_status_log] new_messages: {new_messages}")
HOST_STATUS_LOG_Q.task_done()
except asyncio.QueueEmpty:
break
if new_messages:
if _host_log_accum == "[WebUI] Waiting for Native Host logs...":
_host_log_accum = ""
formatted_lines = [_format_host_log_line(line) for line in new_messages]
if _host_log_accum and not _host_log_accum.endswith("\n"):
_host_log_accum += "\n"
_host_log_accum += "\n".join(formatted_lines)
return _host_log_accum.strip()
###############################################################################
async def _read_host_pipe_task():
"""Creates and reads from a named pipe, putting messages into HOST_STATUS_LOG_Q."""
global HOST_STATUS_LOG_Q, HOST_STATUS_PIPE_PATH, logger
logger.info(f"[_read_host_pipe_task] Starting. Pipe path: {HOST_STATUS_PIPE_PATH}")
if os.path.exists(HOST_STATUS_PIPE_PATH):
try:
os.remove(HOST_STATUS_PIPE_PATH)
logger.info(f"[_read_host_pipe_task] Removed existing host status pipe: {HOST_STATUS_PIPE_PATH}")
except OSError as e:
logger.error(f"[_read_host_pipe_task] Error removing existing host status pipe {HOST_STATUS_PIPE_PATH}: {e}")
# Continue to try and create it anyway
try:
os.mkfifo(HOST_STATUS_PIPE_PATH)
logger.info(f"[_read_host_pipe_task] Created host status pipe: {HOST_STATUS_PIPE_PATH}")
await HOST_STATUS_LOG_Q.put(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] [WebUI] Named pipe {HOST_STATUS_PIPE_PATH} created successfully.")
except OSError as e:
logger.error(f"[_read_host_pipe_task] Failed to create host status pipe {HOST_STATUS_PIPE_PATH}: {e}. Host status will not be available.", exc_info=True)
await HOST_STATUS_LOG_Q.put(f"CRITICAL ERROR: [WebUI] Could not create named pipe {HOST_STATUS_PIPE_PATH}. Host logs disabled.")
return
logger.info(f"[_read_host_pipe_task] Listener loop started for {HOST_STATUS_PIPE_PATH}")
while True:
pipe_file = None # Ensure pipe_file is reset for each attempt to open
try:
logger.info(f"[_read_host_pipe_task] Attempting to open pipe for reading: {HOST_STATUS_PIPE_PATH} (this may block until writer connects)..." )
pipe_file = open(HOST_STATUS_PIPE_PATH, 'r') # Blocking open
logger.info(f"[_read_host_pipe_task] Pipe opened for reading: {HOST_STATUS_PIPE_PATH}")
await HOST_STATUS_LOG_Q.put(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] [WebUI] Pipe reader connected to {HOST_STATUS_PIPE_PATH}.")
while True:
# logger.debug(f"[_read_host_pipe_task] Waiting for line from pipe_file.readline()...")
line = pipe_file.readline()
# logger.debug(f"[_read_host_pipe_task] pipe_file.readline() returned: '{(line.strip() if line else "<EOF or empty line>")}'")
if not line:
logger.warning("[_read_host_pipe_task] Writer closed pipe or EOF detected. Re-opening pipe...")
await HOST_STATUS_LOG_Q.put(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] [WebUI] Pipe writer disconnected. Attempting to reconnect...")
break # Break inner loop to reopen the pipe
message = line.strip()
if message: # Ensure not just an empty line
# logger.debug(f"[_read_host_pipe_task] Received from pipe: '{message}'")
await HOST_STATUS_LOG_Q.put(message) # Put the raw message from host.py
# logger.debug(f"[_read_host_pipe_task] Message '{message}' put to HOST_STATUS_LOG_Q.")
except FileNotFoundError:
logger.error(f"[_read_host_pipe_task] Pipe {HOST_STATUS_PIPE_PATH} not found. Recreating...", exc_info=False) # Less noisy for frequent checks
await HOST_STATUS_LOG_Q.put(f"ERROR: [WebUI] Pipe {HOST_STATUS_PIPE_PATH} lost. Attempting to recreate.")
if os.path.exists(HOST_STATUS_PIPE_PATH):
try:
os.remove(HOST_STATUS_PIPE_PATH)
except OSError as e_remove_fnf:
logger.error(f"[_read_host_pipe_task] Error removing existing pipe during FileNotFoundError handling: {e_remove_fnf}")
try:
os.mkfifo(HOST_STATUS_PIPE_PATH)
logger.info(f"[_read_host_pipe_task] Recreated pipe {HOST_STATUS_PIPE_PATH} after FileNotFoundError.")
await HOST_STATUS_LOG_Q.put(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] [WebUI] Pipe {HOST_STATUS_PIPE_PATH} recreated.")
await asyncio.sleep(1) # Brief pause before retrying open in the main loop
except OSError as e_mkfifo_retry:
logger.error(f"[_read_host_pipe_task] Failed to recreate pipe {HOST_STATUS_PIPE_PATH}: {e_mkfifo_retry}. Retrying outer loop in 10s.", exc_info=True)
await HOST_STATUS_LOG_Q.put(f"CRITICAL ERROR: [WebUI] Failed to recreate pipe {HOST_STATUS_PIPE_PATH}. Retrying in 10s.")
await asyncio.sleep(10)
except Exception as e_pipe_read_outer:
logger.error(f"[_read_host_pipe_task] Unhandled error in pipe reading loop: {e_pipe_read_outer}", exc_info=True)
await HOST_STATUS_LOG_Q.put(f"ERROR: [WebUI] Pipe reading loop encountered: {e_pipe_read_outer}. Retrying in 5s.")
await asyncio.sleep(5)
finally:
if pipe_file:
try:
pipe_file.close()
logger.info(f"[_read_host_pipe_task] Closed pipe file handle for {HOST_STATUS_PIPE_PATH} in finally block.")
except Exception as e_close_finally:
logger.error(f"[_read_host_pipe_task] Error closing pipe in finally: {e_close_finally}")
# If loop broken due to readline EOF or other error causing pipe_file to close,
# this sleep prevents a tight loop if open() immediately fails again.
await asyncio.sleep(1) # Wait a bit before retrying the main while True loop (re-opening pipe)
# --- Test function for demo.load ---
def _test_load_function():
logging.getLogger(__name__).critical("[_test_load_function] syncTEST LOAD FUNCTION EXECUTED CRITICAL LOG AT VERY TOP OF LOAD")
async def _async_test_load_function():
logging.getLogger(__name__).critical("[_async_test_load_function] ASYNC TEST LOAD FUNCTION EXECUTED CRITICAL LOG AT VERY TOP OF LOAD")
await asyncio.sleep(0.1) # Minimal async work
# --- Global UI Definitions ---
css = """
.gradio-container { width: 80% !important; max-width: 90% !important; margin-left: auto !important; margin-right: auto !important; }
"""
# Define the theme map globally
theme_map = {
"Default": Default(),
"Soft": Soft(),
"Citrus": Citrus(font=gr.themes.GoogleFont("Inter")),
"Monochrome": Monochrome(),
"Glass": Glass(),
"Ocean": Ocean(),
"Origin": Base()
}
# --- End Global UI Definitions ---
async def _write_to_response_pipe(response_data: dict):
global RESPONSE_PIPE_PATH, logger
try:
json_response = json.dumps(response_data)
logger.info(f"[_write_to_response_pipe] Attempting to open response pipe {RESPONSE_PIPE_PATH} for writing.")
fd = os.open(RESPONSE_PIPE_PATH, os.O_WRONLY | os.O_NONBLOCK)
with os.fdopen(fd, 'w') as pipe_writer:
pipe_writer.write(json_response + '\n')
pipe_writer.flush()
logger.info(f"[_write_to_response_pipe] Successfully wrote to {RESPONSE_PIPE_PATH}: {json_response}")
except FileNotFoundError:
logger.error(f"[_write_to_response_pipe] ERROR: Response pipe {RESPONSE_PIPE_PATH} not found. Host.py might not be ready or pipe was removed.")
except OSError as e:
if e.errno == 6:
logger.warning(f"[_write_to_response_pipe] Response pipe {RESPONSE_PIPE_PATH} has no reader. Host.py might not be listening.")
else:
logger.error(f"[_write_to_response_pipe] OSError writing to response pipe {RESPONSE_PIPE_PATH}: {e}", exc_info=True)
except Exception as e:
logger.error(f"[_write_to_response_pipe] Error writing to response pipe {RESPONSE_PIPE_PATH}: {e}", exc_info=True)
async def _process_command_from_pipe(command_str: str):
global logger, _RECORDING_ACTIVE, _last_manual_trace_path, _record_log_stream_task, demo
response_payload = {"status": "unknown_command", "command": command_str, "message": "Command not recognized by webui.py"}
if command_str == "START_RECORDING":
try:
logger.info(f"[_process_command_from_pipe] received START from command pipe: {command_str}")
await start_recording_logic()
logger.info(f"[_process_command_from_pipe] _RECORDING_ACTIVE state: {_RECORDING_ACTIVE}")
if _RECORDING_ACTIVE:
response_payload = {"status": "recording_started", "command": command_str, "message": "Recording started successfully."}
except Exception as e:
logger.error(f"Error calling start_recording_logic from pipe: {e}", exc_info=True)
response_payload = {"status": "error", "command": command_str, "message": f"Exception during start_recording: {str(e)}"}
elif command_str == "STOP_RECORDING":
try:
logger.info(f"received STOP from command pipe: {command_str}")
await stop_recording_logic()
logger.info(f"[_process_command_from_pipe] LOG: Returned from stop_recording_logic. _RECORDING_ACTIVE state: {_RECORDING_ACTIVE}")
if not _RECORDING_ACTIVE:
response_payload = {
"status": "recording_stopped",
"command": command_str,
"message": "Recording stopped successfully via extension command.",
"filePath": _last_manual_trace_path
}
else:
response_payload = {"status": "error_stopping_recording", "command": command_str, "message": "Recording did not deactivate as expected. Check webui logs."}
except Exception as e:
logger.error(f"Error calling stop_recording_logic from pipe: {e}", exc_info=True)
response_payload = {"status": "error", "command": command_str, "message": f"Exception during stop_recording: {str(e)}"}
response_payload["source"] = "extension_command_response"
await _write_to_response_pipe(response_payload)
# TODO: No need to manually start a stream task; demo.load odens't stream.
async def _listen_command_pipe():
"""Restored: Creates and listens on COMMAND_PIPE_PATH for commands from host.py."""
# VERY FIRST LINE - ABSOLUTE ENTRY TEST (KEEP THIS)
print("[_listen_command_pipe RESTORED] EXECUTION STARTED - PRINT STATEMENT", flush=True)
logging.getLogger(__name__).critical("[_listen_command_pipe RESTORED] EXECUTION STARTED - CRITICAL LOG")
global COMMAND_PIPE_PATH, logger
logger.info(f"[_listen_command_pipe RESTORED] Starting. Command pipe path: {COMMAND_PIPE_PATH}")
if os.path.exists(COMMAND_PIPE_PATH):
try:
os.remove(COMMAND_PIPE_PATH)
logger.info(f"[_listen_command_pipe RESTORED] Removed existing command pipe: {COMMAND_PIPE_PATH}")
except OSError as e:
logger.error(f"[_listen_command_pipe RESTORED] Error removing existing command pipe {COMMAND_PIPE_PATH}: {e}")
try:
os.mkfifo(COMMAND_PIPE_PATH)
logger.info(f"[_listen_command_pipe RESTORED] Created command pipe: {COMMAND_PIPE_PATH}")
except OSError as e:
logger.error(f"[_listen_command_pipe RESTORED] Failed to create command pipe {COMMAND_PIPE_PATH}: {e}. Extension commands will not be processed.", exc_info=True)
return # Exit if pipe creation fails
logger.info(f"[_listen_command_pipe RESTORED] Listener loop started for {COMMAND_PIPE_PATH}")
while True:
pipe_file_cmd = None
try:
logger.info(f"[_listen_command_pipe RESTORED] Attempting to open command pipe for reading: {COMMAND_PIPE_PATH} (blocks until writer)...")
# Blocking open in a thread is fine as this whole function runs in its own thread.
pipe_file_cmd = open(COMMAND_PIPE_PATH, 'r')
logger.info(f"[_listen_command_pipe RESTORED] Command pipe opened for reading: {COMMAND_PIPE_PATH}")
while True:
line = pipe_file_cmd.readline()
if not line:
logger.warning("[_listen_command_pipe RESTORED] Writer (host.py) closed command pipe or EOF. Re-opening...")
# Close the current pipe_file_cmd before breaking to reopen
if pipe_file_cmd:
try: pipe_file_cmd.close()
except Exception as e_close_inner: logger.error(f"[_listen_command_pipe RESTORED] Error closing pipe in inner loop: {e_close_inner}")
pipe_file_cmd = None # Avoid trying to close again in finally
break # Break inner loop to reopen the pipe in the outer loop
command = line.strip()
if command:
logger.info(f"[_listen_command_pipe RESTORED] Received command: '{command}'")
# Create an asyncio task to process the command concurrently.
# This needs to be run on an event loop that _process_command_from_pipe can use.
# Since _listen_command_pipe is already running in its own loop (bg_loop/command_pipe_loop),
# we can schedule tasks on that same loop.
loop = asyncio.get_running_loop() # Get the loop this thread is running
loop.create_task(_process_command_from_pipe(command))
# asyncio.create_task(_process_command_from_pipe(command)) # This might try to use the wrong loop if not careful
except FileNotFoundError:
logger.error(f"[_listen_command_pipe RESTORED] Command pipe {COMMAND_PIPE_PATH} not found during open/read. This shouldn't happen if created successfully. Retrying outer loop.")
# Pipe might have been deleted externally. The outer loop will try to recreate.
except Exception as e_cmd_pipe_outer:
logger.error(f"[_listen_command_pipe RESTORED] Unhandled error in outer loop: {e_cmd_pipe_outer}", exc_info=True)
# Avoid tight loop on persistent error
await asyncio.sleep(5) # Use await since this is an async function now
finally:
if pipe_file_cmd:
try: pipe_file_cmd.close()
except Exception as e_close: logger.error(f"[_listen_command_pipe RESTORED] Error closing command pipe in finally: {e_close}")
# If the pipe was closed (EOF) or an error occurred opening it, wait a bit before retrying the outer loop.
logger.info("[_listen_command_pipe RESTORED] End of outer loop, will pause and retry pipe open if necessary.")
await asyncio.sleep(1) # Use await
# --- Main UI ---
README_SNIPPET = """
<h2>๐ Quick Start</h2>
### 1. Install the Chrome extension
```bash
curl -O https://huggingface.co/spaces/zk1tty/rebrowse/resolve/main/rebrowse_extension.zip
unzip rebrowse_extension.zip
```
Go to `chrome://extensions` โ "Load unpacked" โ select the extracted folder
### 2. Install Host App
```bash
/bin/bash -c "$(curl -fsSL https://huggingface.co/spaces/zk1tty/rebrowse/resolve/main/install_host.sh)"
```
<h2> Demos </h2>
### 1. Record
<video autoplay controls src="https://cdn-uploads.huggingface.co/production/uploads/66244188ea4f4ed06659f20c/qSRwgI_fGvYce8ySiI9Eq.qt"></video>
### 2. Replay
check out [Youtube๐](https://youtu.be/CjxfwRO0TC8)
<video autoplay controls src="https://huggingface.co/spaces/zk1tty/rebrowse/resolve/main/demo_replayer.mp4"></video>
"""
def create_ui(theme_name="Citrus"):
with gr.Blocks(theme=theme_map.get(theme_name, Default()), css=css) as demo:
print("[create_ui] PRINT: About to call demo.load for _test_load_function", flush=True)
demo.load(_test_load_function, inputs=None, outputs=None)
print("[create_ui] PRINT: About to call demo.load for _async_test_load_function", flush=True)
demo.load(_async_test_load_function, inputs=None, outputs=None)
print("[create_ui] PRINT: About to call demo.load for _listen_command_pipe (this line is just for context, actual load removed)", flush=True)
with gr.Tabs() as tabs:
with gr.TabItem("๐๐ป Intro", id=0):
gr.Markdown(README_SNIPPET, elem_classes="readme-box")
with gr.TabItem("๐ Record", id=1):
gr.Markdown("## Record")
# Record Status Logs
record_status_logs_output = gr.Textbox(
label="Record Status Log",
interactive=False,
lines=5,
max_lines=10,
autoscroll=True,
show_label=True
)
with gr.Row():
host_status_output_tb = gr.Textbox(
label="Native Host Process Logs",
interactive=False,
lines=10,
max_lines=20,
autoscroll=True,
show_label=True,
elem_id="host_status_logs_textbox"
)
# NOTE: Use gr.Timer for periodic polling, as demo.load(every=...) is deprecated in Gradio 4+
t_rec = gr.Timer(0.5)
t_host = gr.Timer(0.5)
t_rec.tick(
poll_recorder_log,
inputs=None,
outputs=record_status_logs_output,
queue=False,
show_progress="hidden",
)
t_host.tick(
poll_host_status_log,
inputs=None,
outputs=host_status_output_tb,
queue=False,
show_progress="hidden",
)
# with gr.Row():
# gr.Button("Pause polling").click(
# lambda: gr.Timer(active=False), None, [t_rec, t_host]
# )
# gr.Button("Resume polling").click(
# lambda: gr.Timer(active=True), None, [t_rec, t_host]
# )
gr.Markdown("## TEST: Log Streaming")
with gr.Row():
minimal_test_btn = gr.Button("โถ๏ธ Run Streaming bg log")
minimal_test_output = gr.Textbox(label="Minimal Test Output", interactive=False)
# ---- TESTING: Minimal Stream Test for health check. ----
async def minimal_stream_test_fn() -> AsyncGenerator[str, None]:
print("[MINIMAL_TEST] Entered minimal_stream_test_fn")
yield "Tick 1"
print("[MINIMAL_TEST] After yield 1")
await asyncio.sleep(1) # Simulate some async work
print("[MINIMAL_TEST] After sleep 1")
yield "Tick 1\nTick 2"
print("[MINIMAL_TEST] After yield 2")
await asyncio.sleep(1)
print("[MINIMAL_TEST] After sleep 2")
yield "Tick 1\nTick 2\nTick 3 (Done)"
print("[MINIMAL_TEST] Minimal test finished")
minimal_test_btn.click(
fn=minimal_stream_test_fn,
outputs=minimal_test_output,
queue=True,
concurrency_limit=None # behaves like queue=False performance
)
# ---- END: TEST: Log Streaming ----
with gr.TabItem("โถ๏ธ Replay", id=2):
gr.Markdown("# Rebrowse Space")
with gr.Row():
file_u = gr.File(label="Upload Trace (.jsonl)")
traces_dd = gr.Dropdown(choices=[], label="My Traces", interactive=True)
replay_btn = gr.Button("Replay")
status_box = gr.Textbox(label="Live Status", lines=20)
def refresh_traces_for_dd():
import requests, os
base_url = os.getenv("HF_SPACE_URL", "http://127.0.0.1:7860")
try:
r = requests.get(f"{base_url}/api/trace/list")
r.raise_for_status()
return gr.update(choices=r.json())
except requests.exceptions.RequestException as e:
logger.error(f"Failed to refresh traces: {e}")
return gr.update(choices=[])
t_replay = gr.Timer(2)
t_replay.tick(refresh_traces_for_dd, None, traces_dd)
# --- client actions ---
def upload_trace_fn(file):
import requests, os
# Ensure you have HF_SPACE_URL in your env vars
base_url = os.getenv("HF_SPACE_URL")
if not base_url:
# Fallback for local development
base_url = "http://127.0.0.1:7860"
logger.warning(f"HF_SPACE_URL not set, falling back to {base_url}")
if not hasattr(file, 'name'):
logger.error("Upload file object has no 'name' attribute.")
return "Error: Invalid file object"
try:
with open(file.name, "rb") as f:
r = requests.post(
f"{base_url}/api/trace/upload",
files={"file": f}
)
r.raise_for_status()
return r.json().get("trace_id", "Unknown ID")
except requests.exceptions.RequestException as e:
logger.error(f"Failed to upload trace: {e}")
return f"Error: {e}"
except Exception as e:
logger.error(f"An unexpected error occurred during upload: {e}", exc_info=True)
return "An unexpected error occurred."
file_u.upload(upload_trace_fn, file_u, outputs=None)
def ask_replay(tid):
import requests, os
base_url = os.getenv("HF_SPACE_URL", "http://127.0.0.1:7860")
if not tid:
return "Please select a trace first."
try:
requests.post(f"{base_url}/api/trace/{tid}/replay_request")
return f"Replay requested for {tid}"
except requests.exceptions.RequestException as e:
logger.error(f"Failed to request replay for trace {tid}: {e}")
return f"Error requesting replay for {tid}: {e}"
replay_btn.click(ask_replay, traces_dd, status_box)
# --- Original positions of demo.load hooks ---
print("[create_ui] PRINT: Reaching original demo.load positions", flush=True)
return demo
# --- End: Main UI ---
# --- Main entry ----
def _run_async_in_thread(loop: asyncio.AbstractEventLoop, coro):
"""Helper function to set the event loop for the new thread and run a coroutine."""
asyncio.set_event_loop(loop)
loop.run_until_complete(coro)
# --- NEW Pipe-to-File Writer Coroutine ---
async def _pipe_to_file_writer():
"""Drains HOST_STATUS_LOG_Q and appends each line to the open trace file."""
global _RECORDING_FILE_HANDLE, _RECORDING_ACTIVE, HOST_STATUS_LOG_Q, logger
logger.info("[_pipe_to_file_writer] Started.")
while _RECORDING_ACTIVE and _RECORDING_FILE_HANDLE:
try:
# Get from HOST_STATUS_LOG_Q, which is fed by _read_host_pipe_task
line = await asyncio.wait_for(HOST_STATUS_LOG_Q.get(), timeout=1.0)
if _RECORDING_FILE_HANDLE and not _RECORDING_FILE_HANDLE.closed:
_RECORDING_FILE_HANDLE.write(line + "\n")
_RECORDING_FILE_HANDLE.flush() # Flush frequently to see live data in file
HOST_STATUS_LOG_Q.task_done()
else:
# This case means recording was stopped and handle closed while waiting for queue
logger.info("[_pipe_to_file_writer] Recording file handle closed or None while writer active. Exiting.")
HOST_STATUS_LOG_Q.task_done() # Still mark as done
break # Exit loop
except asyncio.TimeoutError:
# Just a timeout, check _RECORDING_ACTIVE again
if not _RECORDING_ACTIVE:
logger.info("[_pipe_to_file_writer] Recording became inactive during queue timeout. Exiting.")
break
continue # Continue loop if still active
except Exception as e:
logger.error(f"[_pipe_to_file_writer] Error: {e}", exc_info=True)
# Potentially stop recording on persistent error or just log and continue
await asyncio.sleep(0.1) # Brief pause before retrying get()
logger.info("[_pipe_to_file_writer] Exited.")
# --- API Implementation ---
TRACES_DIR = "/tmp/traces"
if not os.path.exists(TRACES_DIR):
os.makedirs(TRACES_DIR, exist_ok=True)
async def async_file_iter(path: str):
async with aiofiles.open(path, 'r', encoding='utf-8') as f:
async for line in f:
yield line
async def apply_event(page, event, speed):
event_type = event.get("type")
logger.info(f"Applying event: {event_type}")
try:
if event_type == 'mousedown' and 'selector' in event:
await page.click(event['selector'], timeout=5000)
elif event_type == 'keydown' and 'key' in event:
await page.keyboard.press(event['key'])
elif event_type == 'cdp' and event.get('method') == 'Page.navigatedWithinDocument' and 'url' in event.get('params', {}):
await page.goto(event['params']['url'])
await asyncio.sleep(1.0 / speed)
except Exception as e:
logger.error(f"Failed to apply event {event}: {e}")
async def run_replay(path: str, speed: float):
logger.info(f"Starting replay for {path} with speed {speed}")
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
try:
async for line in async_file_iter(path):
try:
event = json.loads(line)
await apply_event(page, event, speed)
except json.JSONDecodeError:
logger.warning(f"Skipping malformed JSON line in {path}: {line.strip()}")
finally:
await browser.close()
logger.info(f"Finished replay for {path}")
app = FastAPI()
@app.post("/api/trace/upload")
async def upload_trace(file: UploadFile):
trace_id = Path(file.filename).stem or uuid4().hex
dest = TRACES_DIR / f"{trace_id}.jsonl"
dest.write_bytes(await file.read())
return {"trace_id": trace_id}
@app.get("/api/trace/list")
async def list_traces():
return [p.stem for p in TRACES_DIR.glob("*.jsonl")]
@app.post("/api/trace/{trace_id}/replay_request")
async def replay_request(trace_id: str):
(REQ_DIR / trace_id).touch() # empty file as semaphore
return {"status": "queued"}
@app.post("/api/trace/{trace_id}/status")
async def recv_status(trace_id: str, request: Request):
"""Optional: host.py posts logs here โ UI polls below."""
body = await request.body()
line_content = body.decode('utf-8')
log_file = TRACES_DIR / f"{trace_id}.log"
with open(log_file, "a") as f:
f.write(line_content + "\n")
return {"ok": True}
@app.post("/api/record")
async def api_record(payload: dict):
trace_id = payload.get("trace_id") or dt.utcnow().isoformat()
path = f"{TRACES_DIR}/{trace_id}.jsonl"
with open(path, "a") as f:
f.write(json.dumps(payload)+"\n")
return {"status": "ok", "trace_id": trace_id}
@app.post("/api/replay")
async def api_replay(req: dict):
trace_id = req.get("trace_id")
if not trace_id:
raise HTTPException(400, "trace_id is required")
speed = float(req.get("speed", 1.0))
path = f"{TRACES_DIR}/{trace_id}.jsonl"
if not os.path.exists(path):
raise HTTPException(404, "trace not found")
asyncio.create_task(run_replay(path, speed)) # fire-and-forget
return {"status": "started"}
_browser_init_lock = asyncio.Lock() # Add lock for ensure_browser_session
# --- ๐ป Background tasks ---
# deamon to listen Record cmd: START_RECORDING/STOP_RECORDING
# Create and start the background event loop and task for _listen_command_pipe
command_pipe_loop = asyncio.new_event_loop()
command_pipe_thread = threading.Thread(
target=_run_async_in_thread,
args=(command_pipe_loop, _listen_command_pipe()), # Pass the loop and the coroutine object
daemon=True,
name="RebrowseCmdPipeLoop"
)
command_pipe_thread.start()
logger.info("Started _listen_command_pipe in a background daemon thread with its own event loop.")
# Start _read_host_pipe_task in a background event loop (similar to command pipe listener)
host_pipe_loop = asyncio.new_event_loop()
host_pipe_thread = threading.Thread(
target=_run_async_in_thread,
args=(host_pipe_loop, _read_host_pipe_task()),
daemon=True,
name="RebrowseHostPipeLoop"
)
host_pipe_thread.start()
logger.info("Started _read_host_pipe_task in a background daemon thread with its own event loop.")
# --- END: Background tasks ---
# -- Init: HF Space compatible UI --
demo = create_ui(theme_name="Citrus")
app = FastAPI()
# 1๏ธโฃ put the Blocks under / so taht Space can see it.
gr.mount_gradio_app(app, demo, path="")
if not Path(MANUAL_TRACES_DIR).exists():
Path(MANUAL_TRACES_DIR).mkdir(parents=True, exist_ok=True)
logger.info(f"Created MANUAL_TRACES_DIR at: {Path(MANUAL_TRACES_DIR).resolve()}")
else:
logger.info(f"MANUAL_TRACES_DIR exists at: {Path(MANUAL_TRACES_DIR).resolve()}")
# -- END: Init: HF Space compatible UI --
# 2๏ธโฃ keep demo.launch() *only* for local dev
if __name__ == "__main__":
# NO demo.launch() inside HF Space
# demo.launch(server_name="0.0.0.0", server_port=7860, debug=False, allowed_paths=[MANUAL_TRACES_DIR])
uvicorn.run(app, host="0.0.0.0", port=7860) |