fciannella's picture
Added the healthcare example
2f49513
|
raw
history blame
10.3 kB

Pipecat Multi-Threading Integration

Overview

This document explains how the multi-threaded telco agent is integrated with the Pipecat voice pipeline using WebRTC.

Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  Browser (WebRTC)                        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                     β”‚
                     ↓ Audio Stream
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              pipeline.py (FastAPI + Pipecat)             β”‚
β”‚                                                           β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚   Pipeline:                                      β”‚   β”‚
β”‚  β”‚   WebRTC β†’ ASR β†’ LangGraphLLMService β†’ TTS β†’   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚                        ↓                                 β”‚
β”‚                langgraph_llm_service.py                  β”‚
β”‚                        ↓                                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                     β”‚
                     ↓ HTTP/WebSocket
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚           LangGraph Server (langgraph dev)               β”‚
β”‚                                                           β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚  react_agent.py (Multi-threaded)                  β”‚  β”‚
β”‚  β”‚                                                    β”‚  β”‚
β”‚  β”‚  Main Thread: Handles long operations             β”‚  β”‚
β”‚  β”‚  Secondary Thread: Handles interim queries        β”‚  β”‚
β”‚  β”‚                                                    β”‚  β”‚
β”‚  β”‚  Store: Coordinates between threads               β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

How It Works

1. LangGraphLLMService (langgraph_llm_service.py)

This service acts as a bridge between Pipecat's frame-based processing and LangGraph's agent.

Key Changes:

a) Dual Thread Management:

self._thread_id_main: Optional[str] = None      # For long operations
self._thread_id_secondary: Optional[str] = None  # For interim queries

b) Operation Status Checking:

async def _check_long_operation_running(self) -> bool:
    """Check if a long operation is currently running via the store."""
    # Queries LangGraph store for "running" status
    # Returns True if a long operation is in progress

c) Automatic Routing:

# Before each message, check if long operation is running
long_operation_running = await self._check_long_operation_running()

if long_operation_running:
    thread_type = "secondary"  # Route to secondary thread
else:
    thread_type = "main"       # Route to main thread

d) Input Format:

# New multi-threaded format
input_payload = {
    "messages": [{"type": "human", "content": text}],
    "thread_type": "main" or "secondary",
    "interim_messages_reset": bool,
}

# Config includes namespace for coordination
config = {
    "configurable": {
        "user_email": self.user_email,
        "thread_id": thread_id,
        "namespace_for_memory": ["user@example.com", "tools_updates"],
    }
}

2. Pipeline Configuration (pipeline.py)

# Enable multi-threading for specific assistants
enable_multi_threading = selected_assistant in ["telco-agent", "wire-transfer-agent"]

llm = LangGraphLLMService(
    base_url=os.getenv("LANGGRAPH_BASE_URL", "http://127.0.0.1:2024"),
    assistant=selected_assistant,
    enable_multi_threading=enable_multi_threading,  # NEW
)

3. React Agent (react_agent.py)

Already updated to handle multi-threaded input format (see MULTI_THREAD_README.md).

Flow Example

User says: "Close my contract"

1. Browser (WebRTC) β†’ Pipecat Pipeline
2. ASR converts to text: "Close my contract"
3. LangGraphLLMService receives text
4. Service checks store: No long operation running
5. Service sends to main thread:
   {
     "messages": [{"type": "human", "content": "Close my contract"}],
     "thread_type": "main",
     "interim_messages_reset": True
   }
6. Agent starts 50-second contract closure
7. Agent writes status to store: {"status": "running", "progress": 10}
8. TTS speaks: "Processing your contract closure..."

User says (5 seconds later): "What's the status?"

1. Browser (WebRTC) β†’ Pipecat Pipeline
2. ASR converts to text: "What's the status?"
3. LangGraphLLMService receives text
4. Service checks store: Long operation IS running βœ“
5. Service sends to secondary thread:
   {
     "messages": [{"type": "human", "content": "What's the status?"}],
     "thread_type": "secondary",
     "interim_messages_reset": False
   }
6. Secondary thread checks status tool
7. Agent responds: "Your request is 20% complete"
8. TTS speaks response
9. Main thread continues running in background

Main operation completes (50 seconds later)

1. Main thread finishes contract closure
2. Agent synthesizes: result + interim conversation
3. Agent sets completion flag in store
4. TTS speaks: "Your contract has been closed..."
5. Service detects completion on next message
6. Routes future messages to main thread

Configuration

Environment Variables

# LangGraph Server
LANGGRAPH_BASE_URL=http://127.0.0.1:2024
LANGGRAPH_ASSISTANT=telco-agent

# User identification (for namespace)
USER_EMAIL=test@example.com

# Enable debug logging
LANGGRAPH_DEBUG_STREAM=true

Enable/Disable Multi-Threading

For specific agents:

# In pipeline.py
enable_multi_threading = selected_assistant in ["telco-agent", "wire-transfer-agent"]

Via environment variable (optional):

enable_multi_threading = os.getenv("ENABLE_MULTI_THREADING", "true").lower() == "true"

Disable for an agent:

llm = LangGraphLLMService(
    assistant="some-other-agent",
    enable_multi_threading=False,  # Use simple single-threaded mode
)

Store Keys Used

The service queries these store keys for coordination:

Key Purpose Set By
working-tool-status-update Current tool progress Agent's long-running tools
main_operation_complete Completion signal Agent's main thread
secondary_interim_messages Interim conversation Agent's secondary thread

Backward Compatibility

When enable_multi_threading=False:

  • Uses single thread
  • Sends simple message format: [HumanMessage(content=text)]
  • No store coordination
  • Works with non-multi-threaded agents

Benefits

  1. Non-Blocking Voice UX: User can continue talking during long operations
  2. Transparent: User doesn't need to know about threading
  3. Automatic Routing: Service handles main/secondary routing automatically
  4. Store-Based: No client-side coordination needed
  5. Backward Compatible: Existing agents work without changes

Testing

With Web UI

  1. Start LangGraph server: langgraph dev
  2. Start pipeline: python pipeline.py
  3. Open browser to http://localhost:7860
  4. Select "Telco Agent"
  5. Say: "Close my contract" β†’ Confirm with "yes"
  6. While processing, say: "What's the status?"
  7. Agent should respond with progress while operation continues

With Client Script

# Terminal 1: Start LangGraph
cd examples/voice_agent_multi_thread/agents
langgraph dev

# Terminal 2: Test with client
cd examples/voice_agent_multi_thread/agents
python telco_client.py --interactive

Troubleshooting

Messages always go to main thread

  • Check that enable_multi_threading=True
  • Verify long-running tools are writing status to store
  • Check namespace matches: ("user_email", "tools_updates")

Secondary thread not responding

  • Ensure secondary thread has limited tool set
  • Check SECONDARY_SYSTEM_PROMPT in react_agent.py
  • Verify check_status tool is included

Synthesis not working

  • Check secondary_interim_messages in store
  • Verify meaningful messages filter in agent
  • Check synthesis prompt in agent

Performance

  • Store queries: ~10-20ms per check
  • Thread switching: Negligible (routing decision)
  • Memory overhead: Two threads vs one
  • Latency impact: Minimal (<50ms added per request)

Future Enhancements

  1. Session persistence: Store thread IDs in Redis
  2. Multiple long operations: Queue system
  3. Progress streaming: Real-time progress updates
  4. Cancellation: User can cancel long operations
  5. Thread pooling: Reuse secondary threads

Related Files

  • langgraph_llm_service.py - Service implementation
  • pipeline.py - Pipeline configuration
  • react_agent.py - Multi-threaded agent
  • tools.py - Long-running tools with progress reporting
  • helper_functions.py - Store coordination utilities
  • telco_client.py - CLI test client

Credits

Implementation: Option 1 (Tool-Level Designation) Date: September 30, 2025