Spaces:
Running
Running
Pipecat Multi-Threading Integration
Overview
This document explains how the multi-threaded telco agent is integrated with the Pipecat voice pipeline using WebRTC.
Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Browser (WebRTC) β
ββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ
β
β Audio Stream
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β pipeline.py (FastAPI + Pipecat) β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Pipeline: β β
β β WebRTC β ASR β LangGraphLLMService β TTS β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β langgraph_llm_service.py β
β β β
ββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββ
β
β HTTP/WebSocket
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β LangGraph Server (langgraph dev) β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β react_agent.py (Multi-threaded) β β
β β β β
β β Main Thread: Handles long operations β β
β β Secondary Thread: Handles interim queries β β
β β β β
β β Store: Coordinates between threads β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
How It Works
1. LangGraphLLMService (langgraph_llm_service.py)
This service acts as a bridge between Pipecat's frame-based processing and LangGraph's agent.
Key Changes:
a) Dual Thread Management:
self._thread_id_main: Optional[str] = None # For long operations
self._thread_id_secondary: Optional[str] = None # For interim queries
b) Operation Status Checking:
async def _check_long_operation_running(self) -> bool:
"""Check if a long operation is currently running via the store."""
# Queries LangGraph store for "running" status
# Returns True if a long operation is in progress
c) Automatic Routing:
# Before each message, check if long operation is running
long_operation_running = await self._check_long_operation_running()
if long_operation_running:
thread_type = "secondary" # Route to secondary thread
else:
thread_type = "main" # Route to main thread
d) Input Format:
# New multi-threaded format
input_payload = {
"messages": [{"type": "human", "content": text}],
"thread_type": "main" or "secondary",
"interim_messages_reset": bool,
}
# Config includes namespace for coordination
config = {
"configurable": {
"user_email": self.user_email,
"thread_id": thread_id,
"namespace_for_memory": ["user@example.com", "tools_updates"],
}
}
2. Pipeline Configuration (pipeline.py)
# Enable multi-threading for specific assistants
enable_multi_threading = selected_assistant in ["telco-agent", "wire-transfer-agent"]
llm = LangGraphLLMService(
base_url=os.getenv("LANGGRAPH_BASE_URL", "http://127.0.0.1:2024"),
assistant=selected_assistant,
enable_multi_threading=enable_multi_threading, # NEW
)
3. React Agent (react_agent.py)
Already updated to handle multi-threaded input format (see MULTI_THREAD_README.md).
Flow Example
User says: "Close my contract"
1. Browser (WebRTC) β Pipecat Pipeline
2. ASR converts to text: "Close my contract"
3. LangGraphLLMService receives text
4. Service checks store: No long operation running
5. Service sends to main thread:
{
"messages": [{"type": "human", "content": "Close my contract"}],
"thread_type": "main",
"interim_messages_reset": True
}
6. Agent starts 50-second contract closure
7. Agent writes status to store: {"status": "running", "progress": 10}
8. TTS speaks: "Processing your contract closure..."
User says (5 seconds later): "What's the status?"
1. Browser (WebRTC) β Pipecat Pipeline
2. ASR converts to text: "What's the status?"
3. LangGraphLLMService receives text
4. Service checks store: Long operation IS running β
5. Service sends to secondary thread:
{
"messages": [{"type": "human", "content": "What's the status?"}],
"thread_type": "secondary",
"interim_messages_reset": False
}
6. Secondary thread checks status tool
7. Agent responds: "Your request is 20% complete"
8. TTS speaks response
9. Main thread continues running in background
Main operation completes (50 seconds later)
1. Main thread finishes contract closure
2. Agent synthesizes: result + interim conversation
3. Agent sets completion flag in store
4. TTS speaks: "Your contract has been closed..."
5. Service detects completion on next message
6. Routes future messages to main thread
Configuration
Environment Variables
# LangGraph Server
LANGGRAPH_BASE_URL=http://127.0.0.1:2024
LANGGRAPH_ASSISTANT=telco-agent
# User identification (for namespace)
USER_EMAIL=test@example.com
# Enable debug logging
LANGGRAPH_DEBUG_STREAM=true
Enable/Disable Multi-Threading
For specific agents:
# In pipeline.py
enable_multi_threading = selected_assistant in ["telco-agent", "wire-transfer-agent"]
Via environment variable (optional):
enable_multi_threading = os.getenv("ENABLE_MULTI_THREADING", "true").lower() == "true"
Disable for an agent:
llm = LangGraphLLMService(
assistant="some-other-agent",
enable_multi_threading=False, # Use simple single-threaded mode
)
Store Keys Used
The service queries these store keys for coordination:
| Key | Purpose | Set By |
|---|---|---|
working-tool-status-update |
Current tool progress | Agent's long-running tools |
main_operation_complete |
Completion signal | Agent's main thread |
secondary_interim_messages |
Interim conversation | Agent's secondary thread |
Backward Compatibility
When enable_multi_threading=False:
- Uses single thread
- Sends simple message format:
[HumanMessage(content=text)] - No store coordination
- Works with non-multi-threaded agents
Benefits
- Non-Blocking Voice UX: User can continue talking during long operations
- Transparent: User doesn't need to know about threading
- Automatic Routing: Service handles main/secondary routing automatically
- Store-Based: No client-side coordination needed
- Backward Compatible: Existing agents work without changes
Testing
With Web UI
- Start LangGraph server:
langgraph dev - Start pipeline:
python pipeline.py - Open browser to
http://localhost:7860 - Select "Telco Agent"
- Say: "Close my contract" β Confirm with "yes"
- While processing, say: "What's the status?"
- Agent should respond with progress while operation continues
With Client Script
# Terminal 1: Start LangGraph
cd examples/voice_agent_multi_thread/agents
langgraph dev
# Terminal 2: Test with client
cd examples/voice_agent_multi_thread/agents
python telco_client.py --interactive
Troubleshooting
Messages always go to main thread
- Check that
enable_multi_threading=True - Verify long-running tools are writing status to store
- Check namespace matches:
("user_email", "tools_updates")
Secondary thread not responding
- Ensure secondary thread has limited tool set
- Check
SECONDARY_SYSTEM_PROMPTinreact_agent.py - Verify
check_statustool is included
Synthesis not working
- Check
secondary_interim_messagesin store - Verify meaningful messages filter in agent
- Check synthesis prompt in agent
Performance
- Store queries: ~10-20ms per check
- Thread switching: Negligible (routing decision)
- Memory overhead: Two threads vs one
- Latency impact: Minimal (<50ms added per request)
Future Enhancements
- Session persistence: Store thread IDs in Redis
- Multiple long operations: Queue system
- Progress streaming: Real-time progress updates
- Cancellation: User can cancel long operations
- Thread pooling: Reuse secondary threads
Related Files
langgraph_llm_service.py- Service implementationpipeline.py- Pipeline configurationreact_agent.py- Multi-threaded agenttools.py- Long-running tools with progress reportinghelper_functions.py- Store coordination utilitiestelco_client.py- CLI test client
Credits
Implementation: Option 1 (Tool-Level Designation) Date: September 30, 2025