director-ai / server /src /services /cliBridge.ts
algorembrant's picture
Upload 79 files
11f4e50 verified
import { spawn, ChildProcess } from 'child_process';
import { Server as SocketServer, Socket } from 'socket.io';
import { config } from '../config';
import { logger } from '../utils/logger';
import { Video } from '../models';
interface CLISession {
process: ChildProcess;
userId: string;
videoId?: string;
logs: string[];
}
const activeSessions: Map<string, CLISession> = new Map();
export function initCLIBridge(io: SocketServer): void {
io.on('connection', (socket: Socket) => {
logger.info(`WebSocket connected: ${socket.id}`);
// Authenticate the socket connection
const token = socket.handshake.auth.token;
if (!token) {
socket.emit('cli:error', { message: 'Authentication required.' });
socket.disconnect();
return;
}
let userId: string = socket.handshake.auth.userId || 'anonymous';
// Handle incoming CLI commands from the webapp chatbox
socket.on('cli:execute', async (data: {
command: string;
videoId?: string;
workingDir?: string;
}) => {
const { command, videoId, workingDir } = data;
const sessionId = `${socket.id}-${Date.now()}`;
logger.info(`CLI command received from ${userId}: ${command}`);
socket.emit('cli:started', { sessionId, command });
try {
// Spawn the real Antigravity CLI process
const cliPath = config.cli.path;
const args = parseCommand(command);
const cwd = workingDir || process.cwd();
const childProcess = spawn(cliPath, args, {
cwd,
shell: true,
env: { ...process.env },
});
const session: CLISession = {
process: childProcess,
userId,
videoId,
logs: [],
};
activeSessions.set(sessionId, session);
// Stream stdout to the frontend in real-time
childProcess.stdout?.on('data', (chunk: Buffer) => {
const text = chunk.toString();
session.logs.push(text);
socket.emit('cli:stdout', { sessionId, data: text });
// If linked to a video, append to CLI log
if (videoId) {
appendVideoLog(videoId, text);
}
});
// Stream stderr to the frontend
childProcess.stderr?.on('data', (chunk: Buffer) => {
const text = chunk.toString();
session.logs.push(`[stderr] ${text}`);
socket.emit('cli:stderr', { sessionId, data: text });
});
// Handle process completion
childProcess.on('close', (code: number | null) => {
const exitMessage = `Process exited with code ${code}`;
session.logs.push(exitMessage);
socket.emit('cli:done', {
sessionId,
exitCode: code,
logs: session.logs,
});
// Update video status based on exit code
if (videoId) {
updateVideoStatus(videoId, code === 0 ? 'preview' : 'failed');
}
activeSessions.delete(sessionId);
logger.info(`CLI session ${sessionId} completed with code ${code}`);
});
childProcess.on('error', (err: Error) => {
socket.emit('cli:error', {
sessionId,
message: `Failed to start CLI: ${err.message}`,
});
activeSessions.delete(sessionId);
logger.error(`CLI process error: ${err.message}`);
});
} catch (error: any) {
socket.emit('cli:error', {
sessionId,
message: `CLI execution failed: ${error.message}`,
});
logger.error(`CLI bridge error: ${error.message}`);
}
});
// Allow sending input to a running CLI session (interactive mode)
socket.on('cli:input', (data: { sessionId: string; input: string }) => {
const session = activeSessions.get(data.sessionId);
if (session && session.process.stdin) {
session.process.stdin.write(data.input + '\n');
logger.debug(`Input sent to session ${data.sessionId}: ${data.input}`);
}
});
// Allow canceling a running CLI session
socket.on('cli:cancel', (data: { sessionId: string }) => {
const session = activeSessions.get(data.sessionId);
if (session) {
session.process.kill('SIGTERM');
activeSessions.delete(data.sessionId);
socket.emit('cli:cancelled', { sessionId: data.sessionId });
logger.info(`CLI session ${data.sessionId} cancelled by user`);
}
});
// Send freeform prompt to CLI (the novel approach)
socket.on('cli:prompt', async (data: { prompt: string; videoId?: string }) => {
const { prompt, videoId } = data;
const sessionId = `prompt-${socket.id}-${Date.now()}`;
logger.info(`Freeform prompt from ${userId}: ${prompt}`);
socket.emit('cli:started', { sessionId, command: prompt });
try {
// For freeform prompts, we pipe the prompt directly to the CLI
const childProcess = spawn(config.cli.path, [], {
shell: true,
env: { ...process.env },
});
const session: CLISession = {
process: childProcess,
userId,
videoId,
logs: [],
};
activeSessions.set(sessionId, session);
// Write the prompt to stdin
if (childProcess.stdin) {
childProcess.stdin.write(prompt + '\n');
childProcess.stdin.end();
}
childProcess.stdout?.on('data', (chunk: Buffer) => {
const text = chunk.toString();
session.logs.push(text);
socket.emit('cli:stdout', { sessionId, data: text });
});
childProcess.stderr?.on('data', (chunk: Buffer) => {
const text = chunk.toString();
session.logs.push(`[stderr] ${text}`);
socket.emit('cli:stderr', { sessionId, data: text });
});
childProcess.on('close', (code: number | null) => {
socket.emit('cli:done', {
sessionId,
exitCode: code,
logs: session.logs,
});
activeSessions.delete(sessionId);
});
childProcess.on('error', (err: Error) => {
socket.emit('cli:error', {
sessionId,
message: `CLI error: ${err.message}`,
});
activeSessions.delete(sessionId);
});
} catch (error: any) {
socket.emit('cli:error', {
sessionId,
message: `Prompt failed: ${error.message}`,
});
}
});
socket.on('disconnect', () => {
// Clean up any active sessions for this socket
for (const [id, session] of activeSessions.entries()) {
if (id.startsWith(socket.id)) {
session.process.kill('SIGTERM');
activeSessions.delete(id);
}
}
logger.info(`WebSocket disconnected: ${socket.id}`);
});
});
}
// Parse a command string into args array
function parseCommand(command: string): string[] {
const parts: string[] = [];
let current = '';
let inQuote = false;
let quoteChar = '';
for (const char of command) {
if (inQuote) {
if (char === quoteChar) {
inQuote = false;
} else {
current += char;
}
} else if (char === '"' || char === "'") {
inQuote = true;
quoteChar = char;
} else if (char === ' ') {
if (current) {
parts.push(current);
current = '';
}
} else {
current += char;
}
}
if (current) parts.push(current);
return parts;
}
// Append log entry to a video record
async function appendVideoLog(videoId: string, log: string): Promise<void> {
try {
await Video.findByIdAndUpdate(videoId, {
$push: { cliLog: log },
});
} catch (error) {
logger.error(`Failed to append video log: ${error}`);
}
}
// Update video status
async function updateVideoStatus(
videoId: string,
status: string
): Promise<void> {
try {
await Video.findByIdAndUpdate(videoId, { status });
logger.info(`Video ${videoId} status updated to ${status}`);
} catch (error) {
logger.error(`Failed to update video status: ${error}`);
}
}
export { activeSessions };