import nbformat from nbconvert import HTMLExporter from traitlets.config import Config import json import copy from jinja2 import DictLoader import datetime import logging # Configure logging for jupyter_handler module logger = logging.getLogger(__name__) system_template = """\

System

{}
""" user_template = """\

User

{}
""" assistant_thinking_template = """\

Assistant

{}
""" assistant_final_answer_template = """
Assistant: Final answer: {}
""" web_search_template = """

🔍 Web Search Results

Query: {query}
{quick_answer}
📚 Sources:
{sources}
""" header_message = """

🔬 Eureka Agent

Built on top of Jupyter Agent 2

""" shell_command_template = """
Terminal
$ {}
""" shell_output_template = """
{}
""" bad_html_bad = """input[type="file"] { display: block; }""" EXECUTING_WIDGET = """
Executing code...
""" GENERATING_WIDGET = """
Generating...
""" DONE_WIDGET = """
Generation complete
""" DONE_WIDGET = """
Generation complete
""" STOPPED_WIDGET = """
Execution stopped by user
""" ERROR_WIDGET = """
Execution failed - check error details above
""" ERROR_HTML = """\
!
Error: {}
""" STOPPED_SANDBOX_HTML = """
Sandbox stopped
Started: {start_time} Expired: {end_time}
""" TIMEOUT_HTML = """
The E2B Sandbox for code execution has a timeout of {total_seconds} seconds.
Started: {start_time} Expires: {end_time}
""" TIMEOUT_HTML = """
Sandbox timeout: {total_seconds}s
Started: {start_time} Expires: {end_time}
""" # Custom CSS for notebook styling including shell commands custom_css = """ """ # Configure the exporter config = Config() html_exporter = HTMLExporter(config=config, template_name="classic") class JupyterNotebook: def __init__(self, messages=None, session_state_data=None): self.exec_count = 0 self.countdown_info = None # If session_state_data is provided, use it directly if session_state_data and "notebook_data" in session_state_data: logger.info("Initializing JupyterNotebook from session state") self.data = session_state_data["notebook_data"] # Count existing code cells to maintain execution count self.exec_count = len([cell for cell in self.data.get("cells", []) if cell.get("cell_type") == "code" and cell.get("execution_count")]) logger.info(f"JupyterNotebook initialized from session state with {len(self.data['cells'])} cells, exec_count={self.exec_count}") return # Legacy initialization path if messages is None: messages = [] logger.debug(f"Initializing JupyterNotebook with {len(messages)} messages") self.data, self.code_cell_counter = self.create_base_notebook(messages) logger.info(f"JupyterNotebook initialized with {len(self.data['cells'])} cells") def create_base_notebook(self, messages): logger.debug("Creating base notebook structure") base_notebook = { "metadata": { "kernel_info": {"name": "python3"}, "language_info": { "name": "python", "version": "3.12", }, }, "nbformat": 4, "nbformat_minor": 0, "cells": [] } # Add header base_notebook["cells"].append({ "cell_type": "markdown", "metadata": {}, "source": header_message }) logger.debug("Added header cell to notebook") # Set initial data self.data = base_notebook # Add empty code cell if no messages if len(messages) == 0: self.data["cells"].append({ "cell_type": "code", "execution_count": None, "metadata": {}, "source": "", "outputs": [] }) logger.debug("Added empty code cell for new notebook") return self.data, 0 # Process messages using existing methods logger.info(f"Processing {len(messages)} messages for notebook creation") i = 0 while i < len(messages): message = messages[i] logger.debug(f"Processing message {i+1}/{len(messages)}: {message['role']}") if message["role"] == "system": logger.debug("Adding system message as markdown") self.add_markdown(message["content"], "system") elif message["role"] == "user": logger.debug("Adding user message as markdown") self.add_markdown(message["content"], "user") elif message["role"] == "assistant": if "tool_calls" in message: logger.debug(f"Processing assistant message with {len(message['tool_calls'])} tool calls") # Add assistant thinking if there's content if message.get("content"): logger.debug("Adding assistant thinking content") self.add_markdown(message["content"], "assistant") # Process tool calls - we know the next message(s) will be tool responses for tool_call in message["tool_calls"]: if tool_call["function"]["name"] == "add_and_execute_jupyter_code_cell": logger.debug(f"Processing code execution tool call: {tool_call['id']}") tool_args = json.loads(tool_call["function"]["arguments"]) code = tool_args["code"] logger.debug(f"Code cell contains {len(code)} characters") # Get the next tool response (guaranteed to exist) tool_message = messages[i + 1] if tool_message["role"] == "tool" and tool_message.get("tool_call_id") == tool_call["id"]: logger.debug(f"Found matching tool response for {tool_call['id']}") # Use the raw execution if available, otherwise fall back to empty list execution = tool_message.get("raw_execution", []) self.add_code_execution(code, execution, parsed=True) logger.debug(f"Added code execution cell with {len(execution)} outputs") i += 1 # Skip the tool message since we just processed it else: logger.warning(f"No matching tool response found for tool call {tool_call['id']}") else: # Regular assistant message logger.debug("Adding regular assistant message") self.add_markdown(message["content"], "assistant") elif message["role"] == "tool": # Skip - should have been handled with corresponding tool_calls # This shouldn't happen given our assumptions, but just in case logger.debug("Skipping tool message (should have been processed with tool_calls)") pass i += 1 return self.data, 0 def _update_countdown_cell(self): if not self.countdown_info: logger.debug("No countdown info available, skipping countdown update") return logger.debug("Updating countdown cell") start_time = self.countdown_info['start_time'] end_time = self.countdown_info['end_time'] current_time = datetime.datetime.now(datetime.timezone.utc) remaining_time = end_time - current_time # Show stopped message if expired if remaining_time.total_seconds() <= 0: logger.info("Sandbox has expired, showing stopped message") # Format display for stopped sandbox start_display = start_time.strftime("%H:%M") end_display = end_time.strftime("%H:%M") stopped_html = STOPPED_SANDBOX_HTML.format( start_time=start_display, end_time=end_display ) # Update countdown cell to show stopped message stopped_cell = { "cell_type": "markdown", "metadata": {}, "source": stopped_html } # Find and update existing countdown cell for i, cell in enumerate(self.data["cells"]): if cell.get("cell_type") == "markdown" and ("⏱" in str(cell.get("source", "")) or "⏹" in str(cell.get("source", ""))): self.data["cells"][i] = stopped_cell logger.debug(f"Updated countdown cell at position {i} with stopped message") break return # Calculate current progress total_duration = end_time - start_time elapsed_time = current_time - start_time current_progress = (elapsed_time.total_seconds() / total_duration.total_seconds()) * 100 current_progress = max(0, min(100, current_progress)) logger.debug(f"Countdown progress: {current_progress:.1f}% ({remaining_time.total_seconds():.0f}s remaining)") # Format display start_display = start_time.strftime("%H:%M") end_display = end_time.strftime("%H:%M") remaining_seconds = int(remaining_time.total_seconds()) remaining_minutes = remaining_seconds // 60 remaining_secs = remaining_seconds % 60 remaining_display = f"{remaining_minutes}:{remaining_secs:02d}" # Generate unique ID to avoid CSS conflicts when updating unique_id = int(current_time.timestamp() * 1000) % 100000 # Calculate total timeout duration in seconds total_seconds = int(total_duration.total_seconds()) countdown_html = TIMEOUT_HTML.format( start_time=start_display, end_time=end_display, current_progress=current_progress, remaining_seconds=remaining_seconds, unique_id=unique_id, total_seconds=total_seconds ) # Update or insert the countdown cell countdown_cell = { "cell_type": "markdown", "metadata": {}, "source": countdown_html } # Find existing countdown cell by looking for the timer emoji found_countdown = False for i, cell in enumerate(self.data["cells"]): if cell.get("cell_type") == "markdown" and "⏱" in str(cell.get("source", "")): # Update existing countdown cell self.data["cells"][i] = countdown_cell found_countdown = True logger.debug(f"Updated existing countdown cell at position {i}") break if not found_countdown: # Insert new countdown cell at position 1 (after header) self.data["cells"].insert(1, countdown_cell) logger.debug("Inserted new countdown cell at position 1") def add_sandbox_countdown(self, start_time, end_time): logger.info(f"Adding sandbox countdown: {start_time} to {end_time}") # Store the countdown info for later updates self.countdown_info = { 'start_time': start_time, 'end_time': end_time, 'cell_index': 1 # Remember where we put it } def add_code_execution(self, code, execution, parsed=False): self.exec_count += 1 logger.debug(f"Adding code execution cell #{self.exec_count} with {len(code)} chars of code") outputs = execution if parsed else self.parse_exec_result_nb(execution) logger.debug(f"Code execution has {len(outputs)} outputs") self.data["cells"].append({ "cell_type": "code", "execution_count": self.exec_count, "metadata": {}, "source": code, "outputs": outputs }) def add_code(self, code): """Add a code cell without execution results""" self.exec_count += 1 logger.debug(f"Adding code cell #{self.exec_count} with {len(code)} chars (no execution)") self.data["cells"].append({ "cell_type": "code", "execution_count": self.exec_count, "metadata": {}, "source": code, "outputs": [] }) def append_execution(self, execution): """Append execution results to the immediate previous cell if it's a code cell""" if (len(self.data["cells"]) > 0 and self.data["cells"][-1]["cell_type"] == "code"): outputs = self.parse_exec_result_nb(execution) self.data["cells"][-1]["outputs"] = outputs logger.debug(f"Appended {len(outputs)} outputs to last code cell") else: logger.error("Cannot append execution: previous cell is not a code cell") raise ValueError("Cannot append execution: previous cell is not a code cell") def has_execution_error(self, execution): """Check if an execution result contains an error""" has_error = execution.error is not None logger.debug(f"Execution error check: {has_error}") return has_error def has_execution_warnings(self, execution): """Check if an execution result contains warnings (stderr output but no error)""" has_warnings = (execution.error is None and execution.logs.stderr and len(execution.logs.stderr) > 0) logger.debug(f"Execution warning check: {has_warnings}") return has_warnings def update_last_code_cell(self, code): """Update the source code of the last code cell""" if (len(self.data["cells"]) > 0 and self.data["cells"][-1]["cell_type"] == "code"): logger.debug(f"Updating last code cell with {len(code)} chars") self.data["cells"][-1]["source"] = code # Clear previous outputs when updating code self.data["cells"][-1]["outputs"] = [] logger.debug("Cleared previous outputs from updated code cell") else: logger.error("Cannot update: last cell is not a code cell") raise ValueError("Cannot update: last cell is not a code cell") def get_last_cell_type(self): """Get the type of the last cell, or None if no cells exist""" if len(self.data["cells"]) > 0: cell_type = self.data["cells"][-1]["cell_type"] logger.debug(f"Last cell type: {cell_type}") return cell_type logger.debug("No cells exist, returning None") return None def add_markdown(self, markdown, role="markdown"): logger.debug(f"Adding markdown cell with role '{role}' ({len(markdown)} chars)") if role == "system": system_message = markdown if markdown else "default" clean_message = self._clean_markdown_formatting(system_message) markdown_formatted = system_template.format(clean_message) elif role == "user": clean_message = self._clean_markdown_formatting(markdown) markdown_formatted = user_template.format(clean_message) elif role == "assistant": clean_message = self._clean_markdown_formatting(markdown) markdown_formatted = assistant_thinking_template.format(clean_message) markdown_formatted = markdown_formatted.replace('', '<think>') markdown_formatted = markdown_formatted.replace('', '</think>') else: # Default case for raw markdown markdown_formatted = self._clean_markdown_formatting(markdown) self.data["cells"].append({ "cell_type": "markdown", "metadata": {}, "source": markdown_formatted }) def add_shell_command(self, command): """Add a shell command cell with terminal-style formatting""" logger.debug(f"Adding shell command cell: '{command}'") # Format command with terminal-style template shell_formatted = shell_command_template.format(self._clean_shell_command(command)) self.data["cells"].append({ "cell_type": "markdown", "metadata": {"shell_command": True, "command": command}, "source": shell_formatted }) def append_shell_execution(self, execution): """Append shell execution results to the notebook with terminal styling""" logger.debug("Appending shell execution results") # Format the shell output using terminal styling output_content = self._format_shell_output(execution) shell_output_formatted = shell_output_template.format(output_content) # Wrap in a div with shell-output class for styling shell_output_with_class = f'
{shell_output_formatted}
' # Add the output as a new markdown cell self.data["cells"].append({ "cell_type": "markdown", "metadata": {"shell_output": True}, "source": shell_output_with_class }) logger.debug("Added shell output cell to notebook") def _clean_shell_command(self, command): """Clean and escape shell command for display""" if not command: return "" # Basic HTML escaping for shell commands command = command.replace('&', '&') command = command.replace('<', '<') command = command.replace('>', '>') command = command.replace('"', '"') command = command.replace("'", ''') return command def _format_shell_output(self, execution): """Format shell execution output for terminal-style display""" output_parts = [] # Add stdout if present if execution.logs.stdout: stdout_text = ''.join(execution.logs.stdout).strip() if stdout_text: output_parts.append(stdout_text) # Add stderr if present (but filter out plot data) if execution.logs.stderr: stderr_text = ''.join(execution.logs.stderr).strip() # Filter out plot data from stderr plot_start = stderr_text.find("__PLOT_DATA__") plot_end = stderr_text.find("__END_PLOT_DATA__") if plot_start != -1 and plot_end != -1: clean_stderr = stderr_text[:plot_start] + stderr_text[plot_end + len("__END_PLOT_DATA__"):] stderr_text = clean_stderr.strip() if stderr_text: output_parts.append(f"STDERR:\n{stderr_text}") # Add error information if present if execution.error: error_text = f"ERROR: {execution.error.name}: {execution.error.value}" if execution.error.traceback: error_text += f"\n{execution.error.traceback}" output_parts.append(error_text) # Add execution results if present (for shell commands that produce results) if execution.results: for result in execution.results: if result.text: output_parts.append(result.text.strip()) # Join all output parts final_output = '\n\n'.join(output_parts) if output_parts else "No output" # Basic HTML escaping for output final_output = final_output.replace('&', '&') final_output = final_output.replace('<', '<') final_output = final_output.replace('>', '>') logger.debug(f"Formatted shell output: {len(final_output)} chars") return final_output def add_error(self, error_message): """Add an error message cell to the notebook""" logger.warning(f"Adding error cell: {error_message}") error_html = ERROR_HTML.format(error_message) self.data["cells"].append({ "cell_type": "markdown", "metadata": {}, "source": error_html }) def add_final_answer(self, answer): logger.info(f"Adding final answer cell ({len(answer)} chars)") self.data["cells"].append({ "cell_type": "markdown", "metadata": {}, "source": assistant_final_answer_template.format(answer) }) def add_web_search_result(self, query, quick_answer=None, sources=None): """Add a web search result cell with dropdown UI""" logger.info(f"Adding web search result for query: {query}") # Format quick answer section quick_answer_html = "" if quick_answer: # Clean up markdown formatting in quick answer clean_answer = self._clean_markdown_formatting(quick_answer) quick_answer_html = f"""
💡 Quick Answer:
{clean_answer}
""" # Format sources section sources_html = "" if sources: source_items = [] for i, source in enumerate(sources, 1): title = self._clean_markdown_formatting(source.get('title', f'Source {i}')) url = source.get('url', '#') relevance = source.get('relevance', 0.0) source_item = f"""
{i}. {title} Relevance: {relevance:.2f}
{url}
""" source_items.append(source_item) sources_html = "".join(source_items) # Format the complete web search result web_search_html = web_search_template.format( query=self._clean_markdown_formatting(query), quick_answer=quick_answer_html, sources=sources_html ) self.data["cells"].append({ "cell_type": "markdown", "metadata": {}, "source": web_search_html }) def _clean_markdown_formatting(self, text): """Clean up markdown formatting issues like excessive ** characters""" if not text: return "" # Replace multiple consecutive asterisks with proper formatting import re # Handle bold text: **text** -> text text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text) # Handle italic text: *text* -> text text = re.sub(r'(?\1', text) # Clean up any remaining multiple asterisks text = re.sub(r'\*{3,}', '**', text) # Handle line breaks text = text.replace('\n', '
') # Handle links [text](url) -> text text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1', text) return text def parse_exec_result_nb(self, execution): """Convert an E2B Execution object to Jupyter notebook cell output format""" logger.debug("Parsing execution result for notebook format") outputs = [] if execution.logs.stdout: stdout_text = ''.join(execution.logs.stdout) logger.debug(f"Adding stdout output ({len(stdout_text)} chars)") outputs.append({ 'output_type': 'stream', 'name': 'stdout', 'text': stdout_text }) if execution.logs.stderr: stderr_text = ''.join(execution.logs.stderr) # Filter out plot data from stderr before displaying plot_start = stderr_text.find("__PLOT_DATA__") plot_end = stderr_text.find("__END_PLOT_DATA__") if plot_start != -1 and plot_end != -1: # Remove plot data from stderr text clean_stderr = stderr_text[:plot_start] + stderr_text[plot_end + len("__END_PLOT_DATA__"):] stderr_text = clean_stderr.strip() # Only add stderr output if there's content after filtering if stderr_text: logger.debug(f"Adding stderr output ({len(stderr_text)} chars)") outputs.append({ 'output_type': 'stream', 'name': 'stderr', 'text': stderr_text }) if execution.error: logger.debug(f"Adding error output: {execution.error.name}: {execution.error.value}") outputs.append({ 'output_type': 'error', 'ename': execution.error.name, 'evalue': execution.error.value, 'traceback': [line for line in execution.error.traceback.split('\n')] }) for i, result in enumerate(execution.results): logger.debug(f"Processing execution result {i+1}/{len(execution.results)}") output = { 'output_type': 'execute_result' if result.is_main_result else 'display_data', 'metadata': {}, 'data': {} } if result.text: output['data']['text/plain'] = result.text if result.html: output['data']['text/html'] = result.html if result.png: output['data']['image/png'] = result.png if result.svg: output['data']['image/svg+xml'] = result.svg if result.jpeg: output['data']['image/jpeg'] = result.jpeg if result.pdf: output['data']['application/pdf'] = result.pdf if result.latex: output['data']['text/latex'] = result.latex if result.json: output['data']['application/json'] = result.json if result.javascript: output['data']['application/javascript'] = result.javascript if result.is_main_result and execution.execution_count is not None: output['execution_count'] = execution.execution_count if output['data']: logger.debug(f"Added result output with data types: {list(output['data'].keys())}") outputs.append(output) else: logger.debug("Skipping result with no data") logger.debug(f"Parsed execution result into {len(outputs)} outputs") return outputs def filter_base64_images(self, message): """Filter out base64 encoded images from message content""" if isinstance(message, dict) and 'nbformat' in message: for output in message['nbformat']: if 'data' in output: for key in list(output['data'].keys()): if key.startswith('image/') or key == 'application/pdf': output['data'][key] = '' return message def render(self, mode="default"): logger.debug(f"Rendering notebook in '{mode}' mode with {len(self.data['cells'])} cells") if self.countdown_info is not None: self._update_countdown_cell() render_data = copy.deepcopy(self.data) if mode == "generating": render_data["cells"].append({ "cell_type": "markdown", "metadata": {}, "source": GENERATING_WIDGET }) elif mode == "executing": logger.debug("Adding executing widget to render") render_data["cells"].append({ "cell_type": "markdown", "metadata": {}, "source": EXECUTING_WIDGET }) elif mode == "done": logger.debug("Adding done widget to render") render_data["cells"].append({ "cell_type": "markdown", "metadata": {}, "source": DONE_WIDGET }) elif mode == "stopped": logger.debug("Adding stopped widget to render") render_data["cells"].append({ "cell_type": "markdown", "metadata": {}, "source": STOPPED_WIDGET }) elif mode == "error": logger.debug("Adding error widget to render") render_data["cells"].append({ "cell_type": "markdown", "metadata": {}, "source": ERROR_WIDGET }) elif mode != "default": logger.error(f"Invalid render mode: {mode}") raise ValueError(f"Render mode should be generating, executing, done, stopped, or error. Given: {mode}.") notebook = nbformat.from_dict(render_data) notebook_body, _ = html_exporter.from_notebook_node(notebook) notebook_body = notebook_body.replace(bad_html_bad, "") logger.debug(f"Rendered notebook HTML ({len(notebook_body)} chars)") # make code font a bit smaller with custom css if "" in notebook_body: notebook_body = notebook_body.replace("", f"{custom_css}") logger.debug("Applied custom CSS to notebook") return notebook_body @classmethod def from_session_state(cls, session_state_data): """Create JupyterNotebook instance from session state data""" return cls(session_state_data=session_state_data) def get_session_notebook_data(self): """Get notebook data in format suitable for session state""" return self.data.copy() def update_from_session_state(self, session_state_data): """Update notebook data from session state""" if "notebook_data" in session_state_data: self.data = session_state_data["notebook_data"].copy() # Update execution count based on existing cells self.exec_count = len([cell for cell in self.data.get("cells", []) if cell.get("cell_type") == "code" and cell.get("execution_count")]) logger.debug(f"Updated notebook from session state: {len(self.data['cells'])} cells, exec_count={self.exec_count}") def main(): """Create a mock notebook to test styling""" # Create mock messages mock_messages = [ {"role": "system", "content": "You are a helpful AI assistant that can write and execute Python code."}, {"role": "user", "content": "Can you help me create a simple plot of a sine wave?"}, {"role": "assistant", "content": "I'll help you create a sine wave plot using matplotlib. **Let me search** for the *best practices* first."}, {"role": "assistant", "tool_calls": [{"id": "call_1", "function": {"name": "add_and_execute_jupyter_code_cell", "arguments": '{"code": "import numpy as np\\nimport matplotlib.pyplot as plt\\n\\n# Create x values\\nx = np.linspace(0, 4*np.pi, 100)\\ny = np.sin(x)\\n\\n# Create the plot\\nplt.figure(figsize=(10, 6))\\nplt.plot(x, y, \'b-\', linewidth=2)\\nplt.title(\'Sine Wave\')\\nplt.xlabel(\'x\')\\nplt.ylabel(\'sin(x)\')\\nplt.grid(True)\\nplt.show()"}'}}]}, {"role": "tool", "tool_call_id": "call_1", "raw_execution": [{"output_type": "stream", "name": "stdout", "text": "Plot created successfully!"}]} ] # Create notebook notebook = JupyterNotebook(mock_messages) # Add a web search result example to test the new UI mock_sources = [ { "title": "**Matplotlib** Tutorial - Creating **Beautiful** Plots", "url": "https://matplotlib.org/stable/tutorials/introductory/pyplot.html", "relevance": 0.85 }, { "title": "NumPy *Sine Wave* Generation **Best Practices**", "url": "https://numpy.org/doc/stable/reference/generated/numpy.sin.html", "relevance": 0.72 } ] notebook.add_web_search_result( query="**matplotlib** *sine wave* tutorial **best practices**", quick_answer="To create a **sine wave plot** with *matplotlib*, use `numpy.linspace()` to generate **x values** and `numpy.sin()` for *y values*. **Configure** the plot with *appropriate* labels and **styling** for better visualization.", sources=mock_sources ) # Add a timeout countdown (simulating a sandbox that started 2 minutes ago with 5 minute timeout) start_time = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(minutes=2) end_time = start_time + datetime.timedelta(minutes=5) notebook.add_sandbox_countdown(start_time, end_time) # Render and save html_output = notebook.render() with open("mock_notebook.html", "w", encoding="utf-8") as f: f.write(html_output) print("Mock notebook saved as 'mock_notebook.html'") print("Open it in your browser to see the improved web search UI and markdown formatting.") def create_notebook_from_session_state(session_state): """Helper function to create JupyterNotebook from session state""" return JupyterNotebook.from_session_state(session_state) if __name__ == "__main__": main()