AttnTrace / app.py
SecureLLMSys's picture
update
165ea49
# Acknowledgement: This demo code is adapted from the original Hugging Face Space "ContextCite"
# (https://huggingface.co/spaces/contextcite/context-cite).
import os
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Any, Optional
import gradio as gr
import numpy as np
import spaces
import nltk
import base64
import traceback
from src.utils import split_into_sentences as split_into_sentences_utils
# --- AttnTrace imports (from app_full.py) ---
from src.models import create_model
from src.attribution import AttnTraceAttribution
from src.prompts import wrap_prompt
from gradio_highlightedtextbox import HighlightedTextbox
from examples import run_example_1, run_example_2, run_example_3, run_example_4, run_example_5, run_example_6
from functools import partial
from nltk.tokenize import sent_tokenize
# Load original app constants
APP_TITLE = '<div class="app-title"><span class="brand">AttnTrace: </span><span class="subtitle">Attention-based Context Traceback for Long-Context LLMs</span></div>'
APP_DESCRIPTION = """AttnTrace traces a model's generated statements back to specific parts of the context using attention-based traceback. Try it out with Meta-Llama-3.1-8B-Instruct here! See the [[paper](https://arxiv.org/abs/2508.03793)] and [[code](https://github.com/Wang-Yanting/AttnTrace)] for more!
Maintained by the AttnTrace team."""
# NEW_TEXT = """Long-context large language models (LLMs), such as Gemini-2.5-Pro and Claude-Sonnet-4, are increasingly used to empower advanced AI systems, including retrieval-augmented generation (RAG) pipelines and autonomous agents. In these systems, an LLM receives an instruction along with a context—often consisting of texts retrieved from a knowledge database or memory—and generates a response that is contextually grounded by following the instruction. Recent studies have designed solutions to trace back to a subset of texts in the context that contributes most to the response generated by the LLM. These solutions have numerous real-world applications, including performing post-attack forensic analysis and improving the interpretability and trustworthiness of LLM outputs. While significant efforts have been made, state-of-the-art solutions such as TracLLM often lead to a high computation cost, e.g., it takes TracLLM hundreds of seconds to perform traceback for a single response-context pair. In this work, we propose {\name}, a new context traceback method based on the attention weights produced by an LLM for a prompt. To effectively utilize attention weights, we introduce two techniques designed to enhance the effectiveness of {\name}, and we provide theoretical insights for our design choice. %Moreover, we perform both theoretical analysis and empirical evaluation to demonstrate their effectiveness.
# We also perform a systematic evaluation for {\name}. The results demonstrate that {\name} is more accurate and efficient than existing state-of-the-art context traceback methods. We also show {\name} can improve state-of-the-art methods in detecting prompt injection under long contexts through the attribution-before-detection paradigm. As a real-world application, we demonstrate that {\name} can effectively pinpoint injected instructions in a paper designed to manipulate LLM-generated reviews.
# The code and data will be open-sourced. """
# EDIT_TEXT = "Feel free to edit!"
GENERATE_CONTEXT_TOO_LONG_TEXT = (
'<em style="color: red;">Context is too long for the current model.</em>'
)
ATTRIBUTE_CONTEXT_TOO_LONG_TEXT = '<em style="color: red;">Context is too long for the current traceback method.</em>'
CONTEXT_LINES = 20
CONTEXT_MAX_LINES = 40
SELECTION_DEFAULT_TEXT = "Click on a sentence in the response to traceback!"
SELECTION_DEFAULT_VALUE = [(SELECTION_DEFAULT_TEXT, None)]
SOURCES_INFO = 'These are the texts that contribute most to the response.'
# SOURCES_IN_CONTEXT_INFO = (
# "This shows the important sentences highlighted within their surrounding context from the text above. Colors indicate ranking: Red (1st), Orange (2nd), Golden (3rd), Yellow (4th-5th), Light (6th+)."
# )
MODEL_PATHS = [
"meta-llama/Meta-Llama-3.1-8B-Instruct",
]
MAX_TOKENS = {
"meta-llama/Meta-Llama-3.1-8B-Instruct": 131072,
}
DEFAULT_MODEL_PATH = MODEL_PATHS[0]
EXPLANATION_LEVELS = ["sentence", "paragraph", "text segment"]
DEFAULT_EXPLANATION_LEVEL = "sentence"
class WorkflowState(Enum):
WAITING_TO_GENERATE = 0
WAITING_TO_SELECT = 1
READY_TO_ATTRIBUTE = 2
@dataclass
class State:
workflow_state: WorkflowState
context: str
query: str
response: str
start_index: int
end_index: int
scores: np.ndarray
answer: str
highlighted_context: str
full_response: str
explained_response_part: str
last_query_used: str = ""
# --- Dynamic Model and Attribution Management ---
current_llm = None
current_attr = None
current_model_path = None
current_explanation_level = None
current_api_key = None
current_top_k = 3 # Add top-k tracking
current_B = 30 # Add B parameter tracking
current_q = 0.4 # Add q parameter tracking
def update_configuration(explanation_level, top_k, B, q):
"""Update the global configuration and reinitialize attribution if needed"""
global current_explanation_level, current_top_k, current_B, current_q, current_attr, current_llm
# Convert parameters to appropriate types
top_k = int(top_k)
B = int(B)
q = float(q)
# Check if configuration has changed
config_changed = (current_explanation_level != explanation_level or
current_top_k != top_k or
current_B != B or
current_q != q)
if config_changed:
print(f"🔄 Updating configuration: explanation_level={explanation_level}, top_k={top_k}, B={B}, q={q}")
current_explanation_level = explanation_level
current_top_k = top_k
current_B = B
current_q = q
# Reset both model and attribution to force complete reinitialization
current_llm = None
current_attr = None
# Reinitialize with new configuration
try:
llm, attr, error_msg = initialize_model_and_attr()
if llm is not None and attr is not None:
return gr.update(value=f"✅ Configuration updated: {explanation_level} level, top-{top_k}, B={B}, q={q}")
else:
return gr.update(value=f"❌ Error reinitializing: {error_msg}")
except Exception as e:
return gr.update(value=f"❌ Error updating configuration: {str(e)}")
else:
return gr.update(value="ℹ️ Configuration unchanged")
def initialize_model_and_attr():
"""Initialize model and attribution with default configuration"""
global current_llm, current_attr, current_model_path, current_explanation_level, current_api_key, current_top_k, current_B, current_q
try:
# Check if we need to reinitialize the model
need_model_update = (current_llm is None or
current_model_path != DEFAULT_MODEL_PATH or
current_api_key != os.getenv("HF_TOKEN"))
# Check if we need to update attribution
need_attr_update = (current_attr is None or
current_explanation_level != (current_explanation_level or DEFAULT_EXPLANATION_LEVEL) or
need_model_update)
if need_model_update:
print(f"Initializing model: {DEFAULT_MODEL_PATH}")
effective_api_key = os.getenv("HF_TOKEN")
current_llm = create_model(model_path=DEFAULT_MODEL_PATH, api_key=effective_api_key, device="cuda")
current_model_path = DEFAULT_MODEL_PATH
current_api_key = effective_api_key
if need_attr_update:
# Use current configuration or defaults
explanation_level = current_explanation_level or DEFAULT_EXPLANATION_LEVEL
top_k = current_top_k or 3
B = current_B or 30
q = current_q or 0.4
if "segment" in explanation_level:
explanation_level = "segment"
print(f"Initializing context traceback with explanation level: {explanation_level}, top_k: {top_k}, B: {B}, q: {q}")
current_attr = AttnTraceAttribution(
current_llm,
explanation_level= explanation_level,
K=top_k,
q=q,
B=B
)
current_explanation_level = explanation_level
return current_llm, current_attr, None
except Exception as e:
error_msg = f"Error initializing model/traceback: {str(e)}"
print(error_msg)
traceback.print_exc()
return None, None, error_msg
# Remove immediate initialization - let lazy initialization work
llm, attr, error_msg = initialize_model_and_attr() # Commented out to avoid main-thread CUDA initialization
# Images replaced with CSS textures and gradients - no longer needed
def clear_state():
return State(
workflow_state=WorkflowState.WAITING_TO_GENERATE,
context="",
query="",
response="",
start_index=0,
end_index=0,
scores=np.array([]),
answer="",
highlighted_context="",
full_response="",
explained_response_part="",
last_query_used=""
)
def load_an_example(example_loader_func, state: State):
context, query = example_loader_func()
# Update both UI and state
state.context = context
state.query = query
state.workflow_state = WorkflowState.WAITING_TO_GENERATE
# Clear previous results
state.response = ""
state.answer = ""
state.full_response = ""
state.explained_response_part = ""
print(f"Loaded example - Context: {len(context)} chars, Query: {query[:50]}...")
return (
context, # basic_context_box
query, # basic_query_box
state,
"", # response_input_box - clear it
gr.update(value=[("Click the 'Generate/Use Response' button above to see response text here for traceback analysis.", None)]), # basic_response_box - keep visible
gr.update(selected=0) # basic_context_tabs - switch to first tab
)
def get_max_tokens(model_path: str):
return MAX_TOKENS.get(model_path, 2048) # Default fallback
def get_scroll_js_code(elem_id):
return f"""
function scrollToElement() {{
const element = document.getElementById("{elem_id}");
element.scrollIntoView({{ behavior: "smooth", block: "nearest" }});
}}
"""
def basic_update(context: str, query: str, state: State):
state.context = context
state.query = query
state.workflow_state = WorkflowState.WAITING_TO_GENERATE
return (
gr.update(value=[("Click the 'Generate/Use Response' button above to see response text here for traceback analysis.", None)]), # basic_response_box - keep visible
gr.update(selected=0), # basic_context_tabs - switch to first tab
state,
)
@spaces.GPU
def generate_model_response(state: State):
# Validate inputs first with debug info
print(f"Validation - Context length: {len(state.context) if state.context else 0}")
print(f"Validation - Query: {state.query[:50] if state.query else 'empty'}...")
if not state.context or not state.context.strip():
print("❌ Validation failed: No context")
return state, gr.update(value=[("❌ Please enter context before generating response! If you just changed configuration, try reloading an example.", None)], visible=True)
if not state.query or not state.query.strip():
print("❌ Validation failed: No query")
return state, gr.update(value=[("❌ Please enter a query before generating response! If you just changed configuration, try reloading an example.", None)], visible=True)
# Initialize model and attribution with current configuration
print(f"🔧 Generating response with explanation_level: {current_explanation_level or DEFAULT_EXPLANATION_LEVEL}, top_k: {current_top_k or 3}")
llm, attr, error_msg = initialize_model_and_attr()
if llm is None or attr is None:
error_text = error_msg if error_msg else "Model initialization failed!"
return state, gr.update(value=[(f"❌ {error_text}", None)], visible=True)
prompt = wrap_prompt(state.query, [state.context])
print(f"Generated prompt for {DEFAULT_MODEL_PATH}: {prompt[:200]}...") # Debug log
# Check context length
if len(prompt.split()) > get_max_tokens(DEFAULT_MODEL_PATH) - 512:
return state, gr.update(value=[(GENERATE_CONTEXT_TOO_LONG_TEXT, None)], visible=True)
answer = llm.query(prompt)
print(f"Model response: {answer}") # Debug log
state.response = answer
state.answer = answer
state.full_response = answer
state.workflow_state = WorkflowState.WAITING_TO_SELECT
return state, gr.update(visible=False)
def split_into_sentences(text: str):
def rule_based_split(text):
sentences = []
start = 0
for i, char in enumerate(text):
if char in ".?。":
if i + 1 == len(text) or text[i + 1] == " ":
sentences.append(text[start:i + 1].strip())
start = i + 1
if start < len(text):
sentences.append(text[start:].strip())
return sentences
lines = text.splitlines()
sentences = []
for line in lines:
#sentences.extend(sent_tokenize(line))
sentences.extend(rule_based_split(line))
separators = []
cur_start = 0
for sentence in sentences:
cur_end = text.find(sentence, cur_start)
separators.append(text[cur_start:cur_end])
cur_start = cur_end + len(sentence)
return sentences, separators
def basic_highlight_response(
response: str, selected_index: int, num_sources: int = -1
):
sentences, separators = split_into_sentences(response)
ht = []
if num_sources == -1:
citations_text = "Traceback!"
elif num_sources == 0:
citations_text = "No important text!"
else:
citations_text = f"[{','.join(str(i) for i in range(1, num_sources + 1))}]"
for i, (sentence, separator) in enumerate(zip(sentences, separators)):
label = citations_text if i == selected_index else "Traceback"
# Hack to ignore punctuation
if len(sentence) >= 4:
ht.append((separator + sentence, label))
else:
ht.append((separator + sentence, None))
color_map = {"Click to cite!": "blue", citations_text: "yellow"}
return gr.HighlightedText(value=ht, color_map=color_map)
def basic_highlight_response_with_visibility(
response: str, selected_index: int, num_sources: int = -1, visible: bool = True
):
"""Version of basic_highlight_response that also sets visibility"""
sentences, separators = split_into_sentences(response)
ht = []
if num_sources == -1:
citations_text = "Traceback!"
elif num_sources == 0:
citations_text = "No important text!"
else:
citations_text = f"[{','.join(str(i) for i in range(1, num_sources + 1))}]"
for i, (sentence, separator) in enumerate(zip(sentences, separators)):
label = citations_text if i == selected_index else "Traceback"
# Hack to ignore punctuation
if len(sentence) >= 4:
ht.append((separator + sentence, label))
else:
ht.append((separator + sentence, None))
color_map = {"Click to cite!": "blue", citations_text: "yellow"}
return gr.update(value=ht, color_map=color_map, visible=visible)
def basic_update_highlighted_response(evt: gr.SelectData, state: State):
response_update = basic_highlight_response(state.response, evt.index)
return response_update, state
def unified_response_handler(response_text: str, state: State):
"""Handle both LLM generation and manual input based on whether text is provided"""
# Check if instruction has changed from what was used to generate current response
instruction_changed = hasattr(state, 'last_query_used') and state.last_query_used != state.query
# If response_text is empty, whitespace, or instruction changed, generate from LLM
if not response_text or not response_text.strip() or instruction_changed:
if instruction_changed:
print("📝 Instruction changed, generating new response from LLM...")
else:
print("🤖 Generating response from LLM...")
# Validate inputs first
if not state.context or not state.context.strip():
return (
state,
response_text, # Keep current text box content
gr.update(visible=False), # Keep response box hidden
gr.update(value=[("❌ Please enter context before generating response!", None)], visible=True)
)
if not state.query or not state.query.strip():
return (
state,
response_text, # Keep current text box content
gr.update(visible=False), # Keep response box hidden
gr.update(value=[("❌ Please enter a query before generating response!", None)], visible=True)
)
# Initialize model and generate response
llm, attr, error_msg = initialize_model_and_attr()
if llm is None:
error_text = error_msg if error_msg else "Model initialization failed!"
return (
state,
response_text, # Keep current text box content
gr.update(visible=False), # Keep response box hidden
gr.update(value=[(f"❌ {error_text}", None)], visible=True)
)
prompt = wrap_prompt(state.query, [state.context])
# Check context length
if len(prompt.split()) > get_max_tokens(DEFAULT_MODEL_PATH) - 512:
return (
state,
response_text, # Keep current text box content
gr.update(visible=False), # Keep response box hidden
gr.update(value=[(GENERATE_CONTEXT_TOO_LONG_TEXT, None)], visible=True)
)
# Generate response
answer = llm.query(prompt)
print(f"Generated response: {answer[:100]}...")
# Update state and UI
state.response = answer
state.answer = answer
state.full_response = answer
state.last_query_used = state.query # Track which query was used for this response
state.workflow_state = WorkflowState.WAITING_TO_SELECT
# Create highlighted response and show it
response_update = basic_highlight_response_with_visibility(state.response, -1, visible=True)
return (
state,
answer, # Put generated response in text box
response_update, # Update clickable response content
gr.update(visible=False) # Hide error box
)
else:
# Use provided text as manual response
print("✏️ Using manual response...")
manual_text = response_text.strip()
# Update state with manual response
state.response = manual_text
state.answer = manual_text
state.full_response = manual_text
state.last_query_used = state.query # Track current query for this response
state.workflow_state = WorkflowState.WAITING_TO_SELECT
# Create highlighted response for selection
response_update = basic_highlight_response_with_visibility(state.response, -1, visible=True)
return (
state,
manual_text, # Keep text in text box
response_update, # Update clickable response content
gr.update(visible=False) # Hide error box
)
def get_color_by_rank(rank, total_items):
"""Get color based purely on rank position for better visual distinction"""
if total_items == 0:
return "#F0F0F0", "rgba(240, 240, 240, 0.8)"
# Pure ranking-based color assignment for clear visual hierarchy
if rank == 1: # Highest importance - Strong Red
bg_color = "#FF4444" # Bright red
rgba_color = "rgba(255, 68, 68, 0.9)"
elif rank == 2: # Second highest - Orange
bg_color = "#FF8C42" # Bright orange
rgba_color = "rgba(255, 140, 66, 0.8)"
elif rank == 3: # Third highest - Golden Yellow
bg_color = "#FFD93D" # Golden yellow
rgba_color = "rgba(255, 217, 61, 0.8)"
elif rank <= 5: # 4th-5th - Light Yellow
bg_color = "#FFF280" # Standard yellow
rgba_color = "rgba(255, 242, 128, 0.7)"
else: # Lower importance - Very Light Yellow
bg_color = "#FFF9C4" # Very light yellow
rgba_color = "rgba(255, 249, 196, 0.6)"
return bg_color, rgba_color
@spaces.GPU
def basic_get_scores_and_sources_full_response(state: State):
"""Traceback the entire response instead of a selected segment"""
# Use the entire response as the explained part
state.explained_response_part = state.full_response
# Attribution using default configuration
llm, attr, error_msg = initialize_model_and_attr()
if attr is None:
error_text = error_msg if error_msg else "Traceback initialization failed!"
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[(f"❌ {error_text}", None)], visible=True),
state,
)
try:
# Validate attribution inputs
if not state.context or not state.context.strip():
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[("❌ No context available for traceback!", None)], visible=True),
state,
)
if not state.query or not state.query.strip():
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[("❌ No query available for traceback!", None)], visible=True),
state,
)
if not state.full_response or not state.full_response.strip():
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[("❌ No response available for traceback!", None)], visible=True),
state,
)
print(f"start full response traceback with explanation_level: {DEFAULT_EXPLANATION_LEVEL}")
print(f"context length: {len(state.context)}, query: {state.query[:100]}...")
print(f"full response: {state.full_response[:100]}...")
print(f"tracing entire response (length: {len(state.full_response)} chars)")
texts, important_ids, importance_scores, _, _ = attr.attribute(
state.query, [state.context], state.full_response, state.full_response
)
print("end full response traceback")
print(f"explanation_level: {DEFAULT_EXPLANATION_LEVEL}")
print(f"texts count: {len(texts)} (how context was segmented)")
if len(texts) > 0:
print(f"sample text segments: {[text[:50] + '...' if len(text) > 50 else text for text in texts[:3]]}")
print(f"important_ids: {important_ids}")
print("importance_scores: ", importance_scores)
if not importance_scores:
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[("❌ No traceback scores generated for full response!", None)], visible=True),
state,
)
state.scores = np.array(importance_scores)
# Highlighted sources with ranking-based colors
highlighted_text = []
sorted_indices = np.argsort(state.scores)[::-1]
total_sources = len(important_ids)
for rank, i in enumerate(sorted_indices):
source_text = texts[important_ids[i]]
_ = get_color_by_rank(rank + 1, total_sources)
highlighted_text.append(
(
source_text,
f"rank_{rank+1}",
)
)
# In-context highlights with ranking-based colors - show ALL text
in_context_highlighted_text = []
ranks = {important_ids[i]: rank for rank, i in enumerate(sorted_indices)}
for i in range(len(texts)):
source_text = texts[i]
# Skip or don't highlight segments that are only newlines or whitespace
if source_text.strip() == "":
# For whitespace-only segments, add them without highlighting
in_context_highlighted_text.append((source_text, None))
elif i in important_ids:
# Only highlight if the segment has actual content (not just newlines)
if source_text.strip(): # Has non-whitespace content
rank = ranks[i] + 1
# Split the segment to separate leading/trailing newlines from content
# This prevents newlines from being highlighted
leading_whitespace = ""
trailing_whitespace = ""
content = source_text
# Extract leading newlines/whitespace
while content and content[0] in ['\n', '\r', '\t', ' ']:
leading_whitespace += content[0]
content = content[1:]
# Extract trailing newlines/whitespace
while content and content[-1] in ['\n', '\r', '\t', ' ']:
trailing_whitespace = content[-1] + trailing_whitespace
content = content[:-1]
# Add the parts separately: whitespace unhighlighted, content highlighted
if leading_whitespace:
in_context_highlighted_text.append((leading_whitespace, None))
if content:
in_context_highlighted_text.append((content, f"rank_{rank}"))
if trailing_whitespace:
in_context_highlighted_text.append((trailing_whitespace, None))
else:
# Even if marked as important, don't highlight whitespace-only segments
in_context_highlighted_text.append((source_text, None))
else:
# Add unhighlighted text for non-important segments
in_context_highlighted_text.append((source_text, None))
# Enhanced color map with ranking-based colors
color_map = {}
for rank in range(len(important_ids)):
_, rgba_color = get_color_by_rank(rank + 1, total_sources)
color_map[f"rank_{rank+1}"] = rgba_color
dummy_update = gr.update(
value=f"AttnTrace_{state.response}_{state.start_index}_{state.end_index}"
)
attribute_error_update = gr.update(visible=False)
# Combine sources and highlighted context into a single display
# Sources at the top
combined_display = []
# Add sources header (no highlighting for UI elements)
combined_display.append(("═══ FULL RESPONSE TRACEBACK RESULTS ═══\n", None))
combined_display.append(("These are the text segments that contribute most to the entire response:\n\n", None))
# Add sources using available data
for rank, i in enumerate(sorted_indices):
if i < len(important_ids):
source_text = texts[important_ids[i]]
# Strip leading/trailing whitespace from source text to avoid highlighting newlines
clean_source_text = source_text.strip()
if clean_source_text: # Only add if there's actual content
# Add the source text with highlighting, then add spacing without highlighting
combined_display.append((clean_source_text, f"rank_{rank+1}"))
combined_display.append(("\n\n", None))
# Add separator (no highlighting for UI elements)
combined_display.append(("\n" + "═"*50 + "\n", None))
combined_display.append(("FULL CONTEXT WITH HIGHLIGHTS\n", None))
combined_display.append(("Scroll down to see the complete context with important segments highlighted:\n\n", None))
# Add highlighted context using in_context_highlighted_text
combined_display.extend(in_context_highlighted_text)
# Use only the ranking colors (no highlighting for UI elements)
enhanced_color_map = color_map.copy()
combined_sources_update = HighlightedTextbox(
value=combined_display, color_map=enhanced_color_map, visible=True
)
# Switch to the highlighted context tab and show results
basic_context_tabs_update = gr.update(selected=1)
basic_sources_in_context_tab_update = gr.update(visible=True)
return (
combined_sources_update,
basic_context_tabs_update,
basic_sources_in_context_tab_update,
dummy_update,
attribute_error_update,
state,
)
except Exception as e:
traceback.print_exc()
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[(f"❌ Error: {str(e)}", None)], visible=True),
state,
)
def basic_get_scores_and_sources(
evt: gr.SelectData,
highlighted_response: List[Dict[str, str]],
state: State,
):
# Get the selected sentence
print("highlighted_response: ", highlighted_response[evt.index])
selected_text = highlighted_response[evt.index]['token']
state.explained_response_part = selected_text
# Attribution using default configuration
llm, attr, error_msg = initialize_model_and_attr()
if attr is None:
error_text = error_msg if error_msg else "Traceback initialization failed!"
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[(f"❌ {error_text}", None)], visible=True),
state,
)
try:
# Validate attribution inputs
if not state.context or not state.context.strip():
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[("❌ No context available for traceback!", None)], visible=True),
state,
)
if not state.query or not state.query.strip():
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[("❌ No query available for traceback!", None)], visible=True),
state,
)
if not state.full_response or not state.full_response.strip():
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[("❌ No response available for traceback!", None)], visible=True),
state,
)
print(f"start traceback with explanation_level: {DEFAULT_EXPLANATION_LEVEL}")
print(f"context length: {len(state.context)}, query: {state.query[:100]}...")
print(f"response: {state.full_response[:100]}...")
print(f"selected part: {state.explained_response_part[:100]}...")
texts, important_ids, importance_scores, _, _ = attr.attribute(
state.query, [state.context], state.full_response, state.explained_response_part
)
print("end traceback")
print(f"explanation_level: {DEFAULT_EXPLANATION_LEVEL}")
print(f"texts count: {len(texts)} (how context was segmented)")
if len(texts) > 0:
print(f"sample text segments: {[text[:50] + '...' if len(text) > 50 else text for text in texts[:3]]}")
print(f"important_ids: {important_ids}")
print("importance_scores: ", importance_scores)
if not importance_scores:
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[("❌ No traceback scores generated! Try a different text segment.", None)], visible=True),
state,
)
state.scores = np.array(importance_scores)
# Highlighted sources with ranking-based colors
highlighted_text = []
sorted_indices = np.argsort(state.scores)[::-1]
total_sources = len(important_ids)
for rank, i in enumerate(sorted_indices):
source_text = texts[important_ids[i]]
_ = get_color_by_rank(rank + 1, total_sources)
highlighted_text.append(
(
source_text,
f"rank_{rank+1}",
)
)
# In-context highlights with ranking-based colors - show ALL text
in_context_highlighted_text = []
ranks = {important_ids[i]: rank for rank, i in enumerate(sorted_indices)}
for i in range(len(texts)):
source_text = texts[i]
# Skip or don't highlight segments that are only newlines or whitespace
if source_text.strip() == "":
# For whitespace-only segments, add them without highlighting
in_context_highlighted_text.append((source_text, None))
elif i in important_ids:
# Only highlight if the segment has actual content (not just newlines)
if source_text.strip(): # Has non-whitespace content
rank = ranks[i] + 1
# Split the segment to separate leading/trailing newlines from content
# This prevents newlines from being highlighted
leading_whitespace = ""
trailing_whitespace = ""
content = source_text
# Extract leading newlines/whitespace
while content and content[0] in ['\n', '\r', '\t', ' ']:
leading_whitespace += content[0]
content = content[1:]
# Extract trailing newlines/whitespace
while content and content[-1] in ['\n', '\r', '\t', ' ']:
trailing_whitespace = content[-1] + trailing_whitespace
content = content[:-1]
# Add the parts separately: whitespace unhighlighted, content highlighted
if leading_whitespace:
in_context_highlighted_text.append((leading_whitespace, None))
if content:
in_context_highlighted_text.append((content, f"rank_{rank}"))
if trailing_whitespace:
in_context_highlighted_text.append((trailing_whitespace, None))
else:
# Even if marked as important, don't highlight whitespace-only segments
in_context_highlighted_text.append((source_text, None))
else:
# Add unhighlighted text for non-important segments
in_context_highlighted_text.append((source_text, None))
# Enhanced color map with ranking-based colors
color_map = {}
for rank in range(len(important_ids)):
_, rgba_color = get_color_by_rank(rank + 1, total_sources)
color_map[f"rank_{rank+1}"] = rgba_color
dummy_update = gr.update(
value=f"AttnTrace_{state.response}_{state.start_index}_{state.end_index}"
)
attribute_error_update = gr.update(visible=False)
# Combine sources and highlighted context into a single display
# Sources at the top
combined_display = []
# Add sources header (no highlighting for UI elements)
combined_display.append(("═══ TRACEBACK RESULTS ═══\n", None))
combined_display.append(("These are the text segments that contribute most to the response:\n\n", None))
# Add sources using available data
for rank, i in enumerate(sorted_indices):
if i < len(important_ids):
source_text = texts[important_ids[i]]
# Strip leading/trailing whitespace from source text to avoid highlighting newlines
clean_source_text = source_text.strip()
if clean_source_text: # Only add if there's actual content
# Add the source text with highlighting, then add spacing without highlighting
combined_display.append((clean_source_text, f"rank_{rank+1}"))
combined_display.append(("\n\n", None))
# Add separator (no highlighting for UI elements)
combined_display.append(("\n" + "═"*50 + "\n", None))
combined_display.append(("FULL CONTEXT WITH HIGHLIGHTS\n", None))
combined_display.append(("Scroll down to see the complete context with important segments highlighted:\n\n", None))
# Add highlighted context using in_context_highlighted_text
combined_display.extend(in_context_highlighted_text)
# Use only the ranking colors (no highlighting for UI elements)
enhanced_color_map = color_map.copy()
combined_sources_update = HighlightedTextbox(
value=combined_display, color_map=enhanced_color_map, visible=True
)
# Switch to the highlighted context tab and show results
basic_context_tabs_update = gr.update(selected=1)
basic_sources_in_context_tab_update = gr.update(visible=True)
return (
combined_sources_update,
basic_context_tabs_update,
basic_sources_in_context_tab_update,
dummy_update,
attribute_error_update,
state,
)
except Exception as e:
traceback.print_exc()
return (
gr.update(value=[("", None)], visible=False),
gr.update(selected=0),
gr.update(visible=False),
gr.update(value=""),
gr.update(value=[(f"❌ Error: {str(e)}", None)], visible=True),
state,
)
def load_custom_css():
"""Load CSS from external file"""
try:
with open("assets/app_styles.css", "r") as f:
css_content = f.read()
return css_content
except FileNotFoundError:
print("Warning: CSS file not found, using minimal CSS")
return ""
except Exception as e:
print(f"Error loading CSS: {e}")
return ""
# Load CSS from external file
custom_css = load_custom_css()
theme = gr.themes.Citrus(
text_size="lg",
spacing_size="md",
)
with gr.Blocks(theme=theme, css=custom_css) as demo:
gr.Markdown(f"# {APP_TITLE}")
gr.Markdown(APP_DESCRIPTION, elem_classes="app-description")
# gr.Markdown(NEW_TEXT, elem_classes="app-description-2")
gr.Markdown("""
<div style="font-size: 18px;">
AttnTrace is an efficient context traceback method for long contexts (e.g., full papers). It is over 15× faster than the state-of-the-art context traceback method TracLLM. Compared to previous attention-based approaches, AttnTrace is more accurate, reliable, and memory-efficient.
""", elem_classes="feature-highlights")
# Feature highlights
gr.Markdown("""
<div style="font-size: 18px;">
AttnTrace can be used in many real-world applications, such as tracing back to:
- 📄 prompt injection instructions that manipulate LLM-generated paper reviews.
- 💻 malicious comment & code hiding in the codebase that misleads the AI coding assistant.
- 🤖 malicious instructions that mislead the action of the LLM agent.
- 🖋 source texts in the context from an AI summary.
- 🔍 evidence that supports the LLM-generated answer for a question.
- ❌ misinformation (corrupted knowledge) that manipulates LLM output for a question.
- And a lot more...
</div>
""", elem_classes="feature-highlights")
# Example buttons with topic-relevant images - moved here for better positioning
gr.Markdown("### 🚀 Try These Examples!", elem_classes="example-title")
with gr.Row(elem_classes=["example-button-container"]):
with gr.Column(scale=1):
example_1_btn = gr.Button(
"📄 Prompt Injection Attacks in AI Paper Review",
elem_classes=["example-button", "example-paper"],
elem_id="example_1_button",
scale=None,
size="sm"
)
with gr.Column(scale=1):
example_2_btn = gr.Button(
"💻 Malicious Comments & Code in Codebase",
elem_classes=["example-button", "example-movie"],
elem_id="example_2_button"
)
with gr.Column(scale=1):
example_3_btn = gr.Button(
"🤖 Malicious Instructions Misleading the LLM Agent",
elem_classes=["example-button", "example-code"],
elem_id="example_3_button"
)
with gr.Row(elem_classes=["example-button-container"]):
with gr.Column(scale=1):
example_4_btn = gr.Button(
"🖋 Source Texts for an AI Summary",
elem_classes=["example-button", "example-paper-alt"],
elem_id="example_4_button"
)
with gr.Column(scale=1):
example_5_btn = gr.Button(
"🔍 Evidence that Support Question Answering",
elem_classes=["example-button", "example-movie-alt"],
elem_id="example_5_button"
)
with gr.Column(scale=1):
example_6_btn = gr.Button(
"❌ Misinformation (Corrupted Knowledge) in Question Answering",
elem_classes=["example-button", "example-code-alt"],
elem_id="example_6_button"
)
state = gr.State(
value=clear_state()
)
# Create tabs for Demo and Configuration
with gr.Tabs() as main_tabs:
# Demo Tab
with gr.Tab("Demo", id="demo_tab"):
gr.Markdown(
"Enter your context and instruction below to try out AttnTrace! You can also click on the example buttons above to load pre-configured examples."
)
gr.Markdown(
'**Color Legend for Context Traceback (by ranking):** <span style="background-color: #FF4444; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Red</span> = 1st (most important) | <span style="background-color: #FF8C42; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Orange</span> = 2nd | <span style="background-color: #FFD93D; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Golden</span> = 3rd | <span style="background-color: #FFF280; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Yellow</span> = 4th-5th | <span style="background-color: #FFF9C4; color: black; padding: 2px 6px; border-radius: 4px; font-weight: 600;">Light</span> = 6th+'
)
# Top section: Wide Context box with tabs
with gr.Row():
with gr.Column(scale=1):
with gr.Tabs() as basic_context_tabs:
with gr.TabItem("Context", id=0):
basic_context_box = gr.Textbox(
placeholder="Enter context...",
show_label=False,
value="",
lines=6,
max_lines=6,
elem_id="basic_context_box",
autoscroll=False,
)
with gr.TabItem("Context with highlighted traceback results", id=1, visible=True) as basic_sources_in_context_tab:
basic_sources_in_context_box = HighlightedTextbox(
value=[("Click on a sentence in the response below to see highlighted traceback results here.", None)],
show_legend_label=False,
show_label=False,
show_legend=False,
interactive=False,
elem_id="basic_sources_in_context_box",
)
# Error messages
basic_generate_error_box = HighlightedTextbox(
show_legend_label=False,
show_label=False,
show_legend=False,
visible=False,
interactive=False,
container=False,
)
# Bottom section: Left (instruction + button + response), Right (response selection)
with gr.Row(equal_height=True):
# Left: Instruction + Button + Response
with gr.Column(scale=1):
basic_query_box = gr.Textbox(
label="Instruction",
placeholder="Enter an instruction...",
value="",
lines=3,
max_lines=3,
)
unified_response_button = gr.Button(
"Generate/Use Response",
variant="primary",
size="lg"
)
response_input_box = gr.Textbox(
label="Response (Editable)",
placeholder="Response will appear here after generation, or type your own response for traceback...",
lines=8,
max_lines=8,
info="Leave empty and click button to generate from LLM, or type your own response to use for traceback"
)
# Right: Response for attribution selection
with gr.Column(scale=1):
basic_response_box = gr.HighlightedText(
label="Click to select text for traceback!",
value=[("Click the 'Generate/Use Response' button on the left to see response text here for traceback analysis.", None)],
interactive=False,
combine_adjacent=False,
show_label=True,
show_legend=False,
elem_id="basic_response_box",
visible=True,
)
# Button for full response traceback
full_response_traceback_button = gr.Button(
"🔍 Traceback Entire Response",
variant="secondary",
size="sm"
)
# Hidden error box and dummy elements
basic_attribute_error_box = HighlightedTextbox(
show_legend_label=False,
show_label=False,
show_legend=False,
visible=False,
interactive=False,
container=False,
)
dummy_basic_sources_box = gr.Textbox(
visible=False, interactive=False, container=False
)
# Configuration Tab
with gr.Tab("Config", id="config_tab"):
gr.Markdown("## ⚙️ AttnTrace Configuration")
gr.Markdown("Configure the traceback analysis parameters to customize how AttnTrace processes your context and generates results.")
with gr.Row():
with gr.Column(scale=1):
explanation_level_dropdown = gr.Dropdown(
choices=["sentence", "paragraph", "text segment"],
value="sentence",
label="Explanation Level",
info="How to segment the context for traceback analysis"
)
with gr.Column(scale=1):
top_k_dropdown = gr.Dropdown(
choices=["3", "5", "10"],
value="3",
label="Top-N Value",
info="Number of most important text segments to highlight"
)
with gr.Row():
with gr.Column(scale=1):
B_slider = gr.Slider(
minimum=1,
maximum=100,
value=30,
step=5,
label="B Parameter",
info="Number of subsamples (higher = more accurate but slower)"
)
with gr.Column(scale=1):
q_slider = gr.Slider(
minimum=0.1,
maximum=1.0,
value=0.4,
step=0.1,
label="ρ Parameter",
info="Sub-sampling ratio (0.1-1.0)"
)
with gr.Row():
with gr.Column(scale=1):
apply_config_button = gr.Button(
"Apply Configuration",
variant="primary",
size="lg"
)
with gr.Column(scale=2):
config_status_text = gr.Textbox(
label="Configuration Status",
value="Ready to apply configuration",
interactive=False,
lines=2
)
gr.Markdown("### 📋 Current Configuration")
gr.Markdown("""
- **Explanation Level**: Determines how the context is segmented for analysis
- `sentence`: Analyze at sentence level (recommended for most cases)
- `paragraph`: Analyze at paragraph level (good for longer documents)
- `text segment`: Analyze at the level of 100-word text segments (ideal for non-standard document formats)
- **Top-N Value**: Number of most important text segments to highlight in results
- Higher values show more context but may be less focused
- Lower values provide more focused results but may miss some context
- **B Parameter**: Number of subsamples
- Higher values (50-100): More thorough analysis but slower
- Lower values (10-30): Faster analysis but may miss some important segments
- Default: 30 (good balance of speed and accuracy)
- **ρ Parameter**: Sub-sampling ratio (0.1-1.0)
**Note**: Configuration changes will take effect immediately for new traceback operations.
""")
gr.Markdown("### 🔄 Model Information")
gr.Markdown(f"""
- **Current Model**: {DEFAULT_MODEL_PATH}
- **Max Tokens**: {get_max_tokens(DEFAULT_MODEL_PATH):,}
- **Device**: CUDA (GPU accelerated)
""")
# Only a single (AttnTrace) method and model in this simplified version
def basic_clear_state():
state = clear_state()
return (
"", # basic_context_box
"", # basic_query_box
"", # response_input_box
gr.update(value=[("Click the 'Generate/Use Response' button above to see response text here for traceback analysis.", None)]), # basic_response_box - keep visible
gr.update(selected=0), # basic_context_tabs - switch to first tab
state,
)
# Defining behavior of various interactions for the demo tab only
def handle_demo_tab_selection(evt: gr.SelectData):
"""Handle tab selection - only clear state when switching to demo tab"""
if evt.index == 0: # Demo tab
return basic_clear_state()
else: # Configuration tab - no state change needed
return (
gr.update(), # basic_context_box
gr.update(), # basic_query_box
gr.update(), # response_input_box
gr.update(), # basic_response_box
gr.update(), # basic_context_tabs
gr.update(), # state
)
main_tabs.select(
fn=handle_demo_tab_selection,
inputs=[],
outputs=[
basic_context_box,
basic_query_box,
response_input_box,
basic_response_box,
basic_context_tabs,
state,
],
)
for component in [basic_context_box, basic_query_box]:
component.change(
basic_update,
[basic_context_box, basic_query_box, state],
[
basic_response_box,
basic_context_tabs,
state,
],
)
# Example button event handlers - now update both UI and state
outputs_for_examples = [
basic_context_box,
basic_query_box,
state,
response_input_box,
basic_response_box,
basic_context_tabs,
]
example_1_btn.click(
fn=partial(load_an_example, run_example_1),
inputs=[state],
outputs=outputs_for_examples
)
example_2_btn.click(
fn=partial(load_an_example, run_example_2),
inputs=[state],
outputs=outputs_for_examples
)
example_3_btn.click(
fn=partial(load_an_example, run_example_3),
inputs=[state],
outputs=outputs_for_examples
)
example_4_btn.click(
fn=partial(load_an_example, run_example_4),
inputs=[state],
outputs=outputs_for_examples
)
example_5_btn.click(
fn=partial(load_an_example, run_example_5),
inputs=[state],
outputs=outputs_for_examples
)
example_6_btn.click(
fn=partial(load_an_example, run_example_6),
inputs=[state],
outputs=outputs_for_examples
)
unified_response_button.click(
fn=lambda: None,
inputs=[],
outputs=[],
js=get_scroll_js_code("basic_response_box"),
)
basic_response_box.change(
fn=lambda: None,
inputs=[],
outputs=[],
js=get_scroll_js_code("basic_sources_in_context_box"),
)
# Add immediate tab switch on response selection
def immediate_tab_switch():
return (
gr.update(value=[("🔄 Processing traceback... Please wait...", None)]), # Show progress message
gr.update(selected=1), # Switch to annotation tab immediately
)
basic_response_box.select(
fn=immediate_tab_switch,
inputs=[],
outputs=[basic_sources_in_context_box, basic_context_tabs],
queue=False, # Execute immediately without queue
)
basic_response_box.select(
fn=basic_get_scores_and_sources,
inputs=[basic_response_box, state],
outputs=[
basic_sources_in_context_box,
basic_context_tabs,
basic_sources_in_context_tab,
dummy_basic_sources_box,
basic_attribute_error_box,
state,
],
show_progress="full",
)
basic_response_box.select(
fn=basic_update_highlighted_response,
inputs=[state],
outputs=[basic_response_box, state],
)
# Full response traceback button
full_response_traceback_button.click(
fn=immediate_tab_switch,
inputs=[],
outputs=[basic_sources_in_context_box, basic_context_tabs],
queue=False, # Execute immediately without queue
)
full_response_traceback_button.click(
fn=basic_get_scores_and_sources_full_response,
inputs=[state],
outputs=[
basic_sources_in_context_box,
basic_context_tabs,
basic_sources_in_context_tab,
dummy_basic_sources_box,
basic_attribute_error_box,
state,
],
show_progress="full",
)
dummy_basic_sources_box.change(
fn=lambda: None,
inputs=[],
outputs=[],
js=get_scroll_js_code("basic_sources_in_context_box"),
)
# Unified response handler
unified_response_button.click(
fn=unified_response_handler,
inputs=[response_input_box, state],
outputs=[state, response_input_box, basic_response_box, basic_generate_error_box]
)
# Configuration update handler
apply_config_button.click(
fn=update_configuration,
inputs=[explanation_level_dropdown, top_k_dropdown, B_slider, q_slider],
outputs=[config_status_text]
)
# gr.Markdown(
# "Please do not interact with elements while generation/attribution is in progress. This may cause errors. You can refresh the page if you run into issues because of this."
# )
demo.launch(show_api=False, share=True)