import { spawn, ChildProcess } from 'child_process'; import { Server as SocketServer, Socket } from 'socket.io'; import { config } from '../config'; import { logger } from '../utils/logger'; import { Video } from '../models'; interface CLISession { process: ChildProcess; userId: string; videoId?: string; logs: string[]; } const activeSessions: Map = new Map(); export function initCLIBridge(io: SocketServer): void { io.on('connection', (socket: Socket) => { logger.info(`WebSocket connected: ${socket.id}`); // Authenticate the socket connection const token = socket.handshake.auth.token; if (!token) { socket.emit('cli:error', { message: 'Authentication required.' }); socket.disconnect(); return; } let userId: string = socket.handshake.auth.userId || 'anonymous'; // Handle incoming CLI commands from the webapp chatbox socket.on('cli:execute', async (data: { command: string; videoId?: string; workingDir?: string; }) => { const { command, videoId, workingDir } = data; const sessionId = `${socket.id}-${Date.now()}`; logger.info(`CLI command received from ${userId}: ${command}`); socket.emit('cli:started', { sessionId, command }); try { // Spawn the real Antigravity CLI process const cliPath = config.cli.path; const args = parseCommand(command); const cwd = workingDir || process.cwd(); const childProcess = spawn(cliPath, args, { cwd, shell: true, env: { ...process.env }, }); const session: CLISession = { process: childProcess, userId, videoId, logs: [], }; activeSessions.set(sessionId, session); // Stream stdout to the frontend in real-time childProcess.stdout?.on('data', (chunk: Buffer) => { const text = chunk.toString(); session.logs.push(text); socket.emit('cli:stdout', { sessionId, data: text }); // If linked to a video, append to CLI log if (videoId) { appendVideoLog(videoId, text); } }); // Stream stderr to the frontend childProcess.stderr?.on('data', (chunk: Buffer) => { const text = chunk.toString(); session.logs.push(`[stderr] ${text}`); socket.emit('cli:stderr', { sessionId, data: text }); }); // Handle process completion childProcess.on('close', (code: number | null) => { const exitMessage = `Process exited with code ${code}`; session.logs.push(exitMessage); socket.emit('cli:done', { sessionId, exitCode: code, logs: session.logs, }); // Update video status based on exit code if (videoId) { updateVideoStatus(videoId, code === 0 ? 'preview' : 'failed'); } activeSessions.delete(sessionId); logger.info(`CLI session ${sessionId} completed with code ${code}`); }); childProcess.on('error', (err: Error) => { socket.emit('cli:error', { sessionId, message: `Failed to start CLI: ${err.message}`, }); activeSessions.delete(sessionId); logger.error(`CLI process error: ${err.message}`); }); } catch (error: any) { socket.emit('cli:error', { sessionId, message: `CLI execution failed: ${error.message}`, }); logger.error(`CLI bridge error: ${error.message}`); } }); // Allow sending input to a running CLI session (interactive mode) socket.on('cli:input', (data: { sessionId: string; input: string }) => { const session = activeSessions.get(data.sessionId); if (session && session.process.stdin) { session.process.stdin.write(data.input + '\n'); logger.debug(`Input sent to session ${data.sessionId}: ${data.input}`); } }); // Allow canceling a running CLI session socket.on('cli:cancel', (data: { sessionId: string }) => { const session = activeSessions.get(data.sessionId); if (session) { session.process.kill('SIGTERM'); activeSessions.delete(data.sessionId); socket.emit('cli:cancelled', { sessionId: data.sessionId }); logger.info(`CLI session ${data.sessionId} cancelled by user`); } }); // Send freeform prompt to CLI (the novel approach) socket.on('cli:prompt', async (data: { prompt: string; videoId?: string }) => { const { prompt, videoId } = data; const sessionId = `prompt-${socket.id}-${Date.now()}`; logger.info(`Freeform prompt from ${userId}: ${prompt}`); socket.emit('cli:started', { sessionId, command: prompt }); try { // For freeform prompts, we pipe the prompt directly to the CLI const childProcess = spawn(config.cli.path, [], { shell: true, env: { ...process.env }, }); const session: CLISession = { process: childProcess, userId, videoId, logs: [], }; activeSessions.set(sessionId, session); // Write the prompt to stdin if (childProcess.stdin) { childProcess.stdin.write(prompt + '\n'); childProcess.stdin.end(); } childProcess.stdout?.on('data', (chunk: Buffer) => { const text = chunk.toString(); session.logs.push(text); socket.emit('cli:stdout', { sessionId, data: text }); }); childProcess.stderr?.on('data', (chunk: Buffer) => { const text = chunk.toString(); session.logs.push(`[stderr] ${text}`); socket.emit('cli:stderr', { sessionId, data: text }); }); childProcess.on('close', (code: number | null) => { socket.emit('cli:done', { sessionId, exitCode: code, logs: session.logs, }); activeSessions.delete(sessionId); }); childProcess.on('error', (err: Error) => { socket.emit('cli:error', { sessionId, message: `CLI error: ${err.message}`, }); activeSessions.delete(sessionId); }); } catch (error: any) { socket.emit('cli:error', { sessionId, message: `Prompt failed: ${error.message}`, }); } }); socket.on('disconnect', () => { // Clean up any active sessions for this socket for (const [id, session] of activeSessions.entries()) { if (id.startsWith(socket.id)) { session.process.kill('SIGTERM'); activeSessions.delete(id); } } logger.info(`WebSocket disconnected: ${socket.id}`); }); }); } // Parse a command string into args array function parseCommand(command: string): string[] { const parts: string[] = []; let current = ''; let inQuote = false; let quoteChar = ''; for (const char of command) { if (inQuote) { if (char === quoteChar) { inQuote = false; } else { current += char; } } else if (char === '"' || char === "'") { inQuote = true; quoteChar = char; } else if (char === ' ') { if (current) { parts.push(current); current = ''; } } else { current += char; } } if (current) parts.push(current); return parts; } // Append log entry to a video record async function appendVideoLog(videoId: string, log: string): Promise { try { await Video.findByIdAndUpdate(videoId, { $push: { cliLog: log }, }); } catch (error) { logger.error(`Failed to append video log: ${error}`); } } // Update video status async function updateVideoStatus( videoId: string, status: string ): Promise { try { await Video.findByIdAndUpdate(videoId, { status }); logger.info(`Video ${videoId} status updated to ${status}`); } catch (error) { logger.error(`Failed to update video status: ${error}`); } } export { activeSessions };