import type { IncomingMessage } from "http"; import type { WebSocket } from "ws"; import { LangGraphAgent } from "./langgraph-agent"; import { HumanMessage, AIMessage, BaseMessage } from "@langchain/core/messages"; import { consoleBuffer } from "./console-buffer"; import { mcpClientManager } from "./mcp-client"; import { virtualFileSystem } from "../services/virtual-fs"; export interface WebSocketMessage { type: | "chat" | "error" | "status" | "stream" | "stream_start" | "stream_token" | "stream_end" | "auth" | "editor_update" | "editor_sync" | "tool_execution" | "console_sync" | "abort" | "clear_conversation"; payload: { content?: string; role?: string; chunk?: string; token?: string; error?: string; processing?: boolean; connected?: boolean; message?: string; toolName?: string; toolArgs?: Record; toolResult?: string; id?: string; type?: string; messageId?: string; }; timestamp: number; } class WebSocketManager { private connections: Map< WebSocket, { token?: string; agent?: LangGraphAgent; messages?: BaseMessage[]; abortController?: AbortController; } > = new Map(); handleConnection(ws: WebSocket, _request: IncomingMessage) { this.connections.set(ws, {}); this.sendMessage(ws, { type: "status", payload: { connected: true, message: "Connected to agent server" }, timestamp: Date.now(), }); ws.on("message", async (data) => { try { const message = JSON.parse(data.toString()) as WebSocketMessage; await this.handleMessage(ws, message); } catch (error) { console.error("Error handling WebSocket message:", error); this.sendError(ws, "Failed to process message"); } }); ws.on("close", async () => { await mcpClientManager.cleanup(); this.connections.delete(ws); }); ws.on("error", (error) => { console.error("WebSocket error:", error); this.connections.delete(ws); }); } private async handleMessage(ws: WebSocket, message: WebSocketMessage) { const connectionData = this.connections.get(ws); switch (message.type) { case "auth": if (message.payload.token && connectionData) { connectionData.token = message.payload.token; try { connectionData.agent = new LangGraphAgent(); await connectionData.agent.initialize(message.payload.token, ws); connectionData.messages = []; this.sendMessage(ws, { type: "status", payload: { message: "Authenticated successfully", connected: true, }, timestamp: Date.now(), }); } catch (error) { console.error("Authentication failed:", error); this.sendError( ws, "Authentication failed. Please sign in with Hugging Face.", ); } } else { this.sendError(ws, "No authentication token provided"); } break; case "console_sync": if ( message.payload.id && message.payload.type && message.payload.message ) { consoleBuffer.addMessage({ id: message.payload.id, type: message.payload.type as "log" | "warn" | "error" | "info", message: message.payload.message, timestamp: message.timestamp, }); } break; case "abort": if (connectionData?.abortController) { connectionData.abortController.abort(); this.sendMessage(ws, { type: "status", payload: { processing: false, message: "Conversation stopped" }, timestamp: Date.now(), }); } break; case "editor_sync": if (message.payload.content) { virtualFileSystem.updateGameContent( message.payload.content as string, ); } break; case "clear_conversation": if (connectionData) { connectionData.messages = []; this.sendMessage(ws, { type: "status", payload: { message: "Conversation history cleared" }, timestamp: Date.now(), }); } break; case "chat": try { if (!connectionData?.agent) { throw new Error( "Authentication required. Please sign in with Hugging Face.", ); } const userMessage = message.payload.content; if (!userMessage) { throw new Error("No message content provided"); } connectionData.abortController = new AbortController(); this.sendMessage(ws, { type: "status", payload: { processing: true }, timestamp: Date.now(), }); if (!connectionData.messages) { connectionData.messages = []; } connectionData.messages.push(new HumanMessage(userMessage)); const messageId = `msg_${Date.now()}`; this.sendMessage(ws, { type: "stream_start", payload: { messageId }, timestamp: Date.now(), }); const response = await connectionData.agent.processMessage( userMessage, connectionData.messages.slice(0, -1), (chunk: string) => { this.sendMessage(ws, { type: "stream_token", payload: { token: chunk, messageId, }, timestamp: Date.now(), }); }, messageId, connectionData.abortController.signal, ); connectionData.messages.push(new AIMessage(response)); this.sendMessage(ws, { type: "stream_end", payload: { messageId, content: response, }, timestamp: Date.now(), }); this.sendMessage(ws, { type: "status", payload: { processing: false }, timestamp: Date.now(), }); } catch (error) { console.error("Error processing chat message:", error); if (error instanceof Error && error.name === "AbortError") { this.sendMessage(ws, { type: "stream_end", payload: { messageId: `msg_${Date.now()}`, content: "", }, timestamp: Date.now(), }); } else { this.sendError( ws, error instanceof Error ? error.message : "Unknown error", ); } } finally { if (connectionData) { connectionData.abortController = undefined; } } break; default: this.sendError(ws, `Unknown message type: ${message.type}`); } } private sendMessage(ws: WebSocket, message: WebSocketMessage) { if (ws.readyState === ws.OPEN) { ws.send(JSON.stringify(message)); } } private sendError(ws: WebSocket, error: string) { this.sendMessage(ws, { type: "error", payload: { error }, timestamp: Date.now(), }); } broadcast(message: WebSocketMessage) { for (const [ws] of this.connections) { this.sendMessage(ws, message); } } } export const wsManager = new WebSocketManager();