# Pipecat Multi-Threading Integration ## Overview This document explains how the multi-threaded telco agent is integrated with the Pipecat voice pipeline using WebRTC. ## Architecture ``` ┌─────────────────────────────────────────────────────────┐ │ Browser (WebRTC) │ └────────────────────┬────────────────────────────────────┘ │ ↓ Audio Stream ┌─────────────────────────────────────────────────────────┐ │ pipeline.py (FastAPI + Pipecat) │ │ │ │ ┌─────────────────────────────────────────────────┐ │ │ │ Pipeline: │ │ │ │ WebRTC → ASR → LangGraphLLMService → TTS → │ │ │ └─────────────────────────────────────────────────┘ │ │ ↓ │ │ langgraph_llm_service.py │ │ ↓ │ └────────────────────┬───────────────────────────────────┘ │ ↓ HTTP/WebSocket ┌─────────────────────────────────────────────────────────┐ │ LangGraph Server (langgraph dev) │ │ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ react_agent.py (Multi-threaded) │ │ │ │ │ │ │ │ Main Thread: Handles long operations │ │ │ │ Secondary Thread: Handles interim queries │ │ │ │ │ │ │ │ Store: Coordinates between threads │ │ │ └──────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────┘ ``` ## How It Works ### 1. **LangGraphLLMService** (`langgraph_llm_service.py`) This service acts as a bridge between Pipecat's frame-based processing and LangGraph's agent. #### Key Changes: **a) Dual Thread Management:** ```python self._thread_id_main: Optional[str] = None # For long operations self._thread_id_secondary: Optional[str] = None # For interim queries ``` **b) Operation Status Checking:** ```python async def _check_long_operation_running(self) -> bool: """Check if a long operation is currently running via the store.""" # Queries LangGraph store for "running" status # Returns True if a long operation is in progress ``` **c) Automatic Routing:** ```python # Before each message, check if long operation is running long_operation_running = await self._check_long_operation_running() if long_operation_running: thread_type = "secondary" # Route to secondary thread else: thread_type = "main" # Route to main thread ``` **d) Input Format:** ```python # New multi-threaded format input_payload = { "messages": [{"type": "human", "content": text}], "thread_type": "main" or "secondary", "interim_messages_reset": bool, } # Config includes namespace for coordination config = { "configurable": { "user_email": self.user_email, "thread_id": thread_id, "namespace_for_memory": ["user@example.com", "tools_updates"], } } ``` ### 2. **Pipeline Configuration** (`pipeline.py`) ```python # Enable multi-threading for specific assistants enable_multi_threading = selected_assistant in ["telco-agent", "wire-transfer-agent"] llm = LangGraphLLMService( base_url=os.getenv("LANGGRAPH_BASE_URL", "http://127.0.0.1:2024"), assistant=selected_assistant, enable_multi_threading=enable_multi_threading, # NEW ) ``` ### 3. **React Agent** (`react_agent.py`) Already updated to handle multi-threaded input format (see `MULTI_THREAD_README.md`). ## Flow Example ### User says: "Close my contract" ``` 1. Browser (WebRTC) → Pipecat Pipeline 2. ASR converts to text: "Close my contract" 3. LangGraphLLMService receives text 4. Service checks store: No long operation running 5. Service sends to main thread: { "messages": [{"type": "human", "content": "Close my contract"}], "thread_type": "main", "interim_messages_reset": True } 6. Agent starts 50-second contract closure 7. Agent writes status to store: {"status": "running", "progress": 10} 8. TTS speaks: "Processing your contract closure..." ``` ### User says (5 seconds later): "What's the status?" ``` 1. Browser (WebRTC) → Pipecat Pipeline 2. ASR converts to text: "What's the status?" 3. LangGraphLLMService receives text 4. Service checks store: Long operation IS running ✓ 5. Service sends to secondary thread: { "messages": [{"type": "human", "content": "What's the status?"}], "thread_type": "secondary", "interim_messages_reset": False } 6. Secondary thread checks status tool 7. Agent responds: "Your request is 20% complete" 8. TTS speaks response 9. Main thread continues running in background ``` ### Main operation completes (50 seconds later) ``` 1. Main thread finishes contract closure 2. Agent synthesizes: result + interim conversation 3. Agent sets completion flag in store 4. TTS speaks: "Your contract has been closed..." 5. Service detects completion on next message 6. Routes future messages to main thread ``` ## Configuration ### Environment Variables ```bash # LangGraph Server LANGGRAPH_BASE_URL=http://127.0.0.1:2024 LANGGRAPH_ASSISTANT=telco-agent # User identification (for namespace) USER_EMAIL=test@example.com # Enable debug logging LANGGRAPH_DEBUG_STREAM=true ``` ### Enable/Disable Multi-Threading **For specific agents:** ```python # In pipeline.py enable_multi_threading = selected_assistant in ["telco-agent", "wire-transfer-agent"] ``` **Via environment variable (optional):** ```python enable_multi_threading = os.getenv("ENABLE_MULTI_THREADING", "true").lower() == "true" ``` **Disable for an agent:** ```python llm = LangGraphLLMService( assistant="some-other-agent", enable_multi_threading=False, # Use simple single-threaded mode ) ``` ## Store Keys Used The service queries these store keys for coordination: | Key | Purpose | Set By | |-----|---------|--------| | `working-tool-status-update` | Current tool progress | Agent's long-running tools | | `main_operation_complete` | Completion signal | Agent's main thread | | `secondary_interim_messages` | Interim conversation | Agent's secondary thread | ## Backward Compatibility When `enable_multi_threading=False`: - Uses single thread - Sends simple message format: `[HumanMessage(content=text)]` - No store coordination - Works with non-multi-threaded agents ## Benefits 1. **Non-Blocking Voice UX**: User can continue talking during long operations 2. **Transparent**: User doesn't need to know about threading 3. **Automatic Routing**: Service handles main/secondary routing automatically 4. **Store-Based**: No client-side coordination needed 5. **Backward Compatible**: Existing agents work without changes ## Testing ### With Web UI 1. Start LangGraph server: `langgraph dev` 2. Start pipeline: `python pipeline.py` 3. Open browser to `http://localhost:7860` 4. Select "Telco Agent" 5. Say: "Close my contract" → Confirm with "yes" 6. While processing, say: "What's the status?" 7. Agent should respond with progress while operation continues ### With Client Script ```bash # Terminal 1: Start LangGraph cd examples/voice_agent_multi_thread/agents langgraph dev # Terminal 2: Test with client cd examples/voice_agent_multi_thread/agents python telco_client.py --interactive ``` ## Troubleshooting ### Messages always go to main thread - Check that `enable_multi_threading=True` - Verify long-running tools are writing status to store - Check namespace matches: `("user_email", "tools_updates")` ### Secondary thread not responding - Ensure secondary thread has limited tool set - Check `SECONDARY_SYSTEM_PROMPT` in `react_agent.py` - Verify `check_status` tool is included ### Synthesis not working - Check `secondary_interim_messages` in store - Verify meaningful messages filter in agent - Check synthesis prompt in agent ## Performance - **Store queries**: ~10-20ms per check - **Thread switching**: Negligible (routing decision) - **Memory overhead**: Two threads vs one - **Latency impact**: Minimal (<50ms added per request) ## Future Enhancements 1. **Session persistence**: Store thread IDs in Redis 2. **Multiple long operations**: Queue system 3. **Progress streaming**: Real-time progress updates 4. **Cancellation**: User can cancel long operations 5. **Thread pooling**: Reuse secondary threads ## Related Files - `langgraph_llm_service.py` - Service implementation - `pipeline.py` - Pipeline configuration - `react_agent.py` - Multi-threaded agent - `tools.py` - Long-running tools with progress reporting - `helper_functions.py` - Store coordination utilities - `telco_client.py` - CLI test client ## Credits Implementation: Option 1 (Tool-Level Designation) Date: September 30, 2025