|
import { IChildProcessMessage, IReactFlowNode, IReactFlowObject, IRunChatflowMessageValue, INodeData } from './Interface' |
|
import { buildLangchain, constructGraphs, getEndingNode, getStartingNodes, resolveVariables } from './utils' |
|
|
|
export class ChildProcess { |
|
|
|
|
|
|
|
static async stopChildProcess() { |
|
setTimeout(() => { |
|
process.exit(0) |
|
}, 50000) |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
async runChildProcess(messageValue: IRunChatflowMessageValue): Promise<void> { |
|
process.on('SIGTERM', ChildProcess.stopChildProcess) |
|
process.on('SIGINT', ChildProcess.stopChildProcess) |
|
|
|
await sendToParentProcess('start', '_') |
|
|
|
|
|
const { endingNodeData, chatflow, incomingInput, componentNodes } = messageValue |
|
|
|
let nodeToExecuteData: INodeData |
|
let addToChatFlowPool: any = {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (endingNodeData) { |
|
nodeToExecuteData = endingNodeData |
|
} else { |
|
|
|
const flowData = chatflow.flowData |
|
const parsedFlowData: IReactFlowObject = JSON.parse(flowData) |
|
const nodes = parsedFlowData.nodes |
|
const edges = parsedFlowData.edges |
|
|
|
|
|
const { graph, nodeDependencies } = constructGraphs(nodes, edges) |
|
const directedGraph = graph |
|
const endingNodeId = getEndingNode(nodeDependencies, directedGraph) |
|
if (!endingNodeId) { |
|
await sendToParentProcess('error', `Ending node must be either a Chain or Agent`) |
|
return |
|
} |
|
|
|
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data |
|
if (!endingNodeData) { |
|
await sendToParentProcess('error', `Ending node must be either a Chain or Agent`) |
|
return |
|
} |
|
|
|
if ( |
|
endingNodeData.outputs && |
|
Object.keys(endingNodeData.outputs).length && |
|
!Object.values(endingNodeData.outputs).includes(endingNodeData.name) |
|
) { |
|
await sendToParentProcess( |
|
'error', |
|
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction` |
|
) |
|
return |
|
} |
|
|
|
|
|
const constructedObj = constructGraphs(nodes, edges, true) |
|
const nonDirectedGraph = constructedObj.graph |
|
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId) |
|
|
|
|
|
const reactFlowNodes = await buildLangchain( |
|
startingNodeIds, |
|
nodes, |
|
graph, |
|
depthQueue, |
|
componentNodes, |
|
incomingInput.question, |
|
incomingInput?.overrideConfig |
|
) |
|
|
|
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId) |
|
if (!nodeToExecute) { |
|
await sendToParentProcess('error', `Node ${endingNodeId} not found`) |
|
return |
|
} |
|
|
|
const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question) |
|
nodeToExecuteData = reactFlowNodeData |
|
|
|
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id)) |
|
addToChatFlowPool = { |
|
chatflowid: chatflow.id, |
|
nodeToExecuteData, |
|
startingNodes, |
|
overrideConfig: incomingInput?.overrideConfig |
|
} |
|
} |
|
|
|
const nodeInstanceFilePath = componentNodes[nodeToExecuteData.name].filePath as string |
|
const nodeModule = await import(nodeInstanceFilePath) |
|
const nodeInstance = new nodeModule.nodeClass() |
|
|
|
const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history }) |
|
|
|
await sendToParentProcess('finish', { result, addToChatFlowPool }) |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async function sendToParentProcess(key: string, value: any): Promise<void> { |
|
|
|
return new Promise((resolve, reject) => { |
|
process.send!( |
|
{ |
|
key, |
|
value |
|
}, |
|
(error: Error) => { |
|
if (error) { |
|
return reject(error) |
|
} |
|
resolve() |
|
} |
|
) |
|
}) |
|
} |
|
|
|
const childProcess = new ChildProcess() |
|
|
|
process.on('message', async (message: IChildProcessMessage) => { |
|
if (message.key === 'start') { |
|
await childProcess.runChildProcess(message.value) |
|
process.exit() |
|
} |
|
}) |
|
|