File size: 5,639 Bytes
4114d85 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
import { IChildProcessMessage, IReactFlowNode, IReactFlowObject, IRunChatflowMessageValue, INodeData } from './Interface'
import { buildLangchain, constructGraphs, getEndingNode, getStartingNodes, resolveVariables } from './utils'
export class ChildProcess {
/**
* Stop child process when app is killed
*/
static async stopChildProcess() {
setTimeout(() => {
process.exit(0)
}, 50000)
}
/**
* Process prediction
* @param {IRunChatflowMessageValue} messageValue
* @return {Promise<void>}
*/
async runChildProcess(messageValue: IRunChatflowMessageValue): Promise<void> {
process.on('SIGTERM', ChildProcess.stopChildProcess)
process.on('SIGINT', ChildProcess.stopChildProcess)
await sendToParentProcess('start', '_')
// Create a Queue and add our initial node in it
const { endingNodeData, chatflow, incomingInput, componentNodes } = messageValue
let nodeToExecuteData: INodeData
let addToChatFlowPool: any = {}
/* Don't rebuild the flow (to avoid duplicated upsert, recomputation) when all these conditions met:
* - Node Data already exists in pool
* - Still in sync (i.e the flow has not been modified since)
* - Existing overrideConfig and new overrideConfig are the same
* - Flow doesn't start with nodes that depend on incomingInput.question
***/
if (endingNodeData) {
nodeToExecuteData = endingNodeData
} else {
/*** Get chatflows and prepare data ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
if (!endingNodeId) {
await sendToParentProcess('error', `Ending node must be either a Chain or Agent`)
return
}
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) {
await sendToParentProcess('error', `Ending node must be either a Chain or Agent`)
return
}
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
) {
await sendToParentProcess(
'error',
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
)
return
}
/*** Get Starting Nodes with Non-Directed Graph ***/
const constructedObj = constructGraphs(nodes, edges, true)
const nonDirectedGraph = constructedObj.graph
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
/*** BFS to traverse from Starting Nodes to Ending Node ***/
const reactFlowNodes = await buildLangchain(
startingNodeIds,
nodes,
graph,
depthQueue,
componentNodes,
incomingInput.question,
incomingInput?.overrideConfig
)
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
if (!nodeToExecute) {
await sendToParentProcess('error', `Node ${endingNodeId} not found`)
return
}
const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question)
nodeToExecuteData = reactFlowNodeData
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
addToChatFlowPool = {
chatflowid: chatflow.id,
nodeToExecuteData,
startingNodes,
overrideConfig: incomingInput?.overrideConfig
}
}
const nodeInstanceFilePath = componentNodes[nodeToExecuteData.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass()
const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history })
await sendToParentProcess('finish', { result, addToChatFlowPool })
}
}
/**
* Send data back to parent process
* @param {string} key Key of message
* @param {*} value Value of message
* @returns {Promise<void>}
*/
async function sendToParentProcess(key: string, value: any): Promise<void> {
// tslint:disable-line:no-any
return new Promise((resolve, reject) => {
process.send!(
{
key,
value
},
(error: Error) => {
if (error) {
return reject(error)
}
resolve()
}
)
})
}
const childProcess = new ChildProcess()
process.on('message', async (message: IChildProcessMessage) => {
if (message.key === 'start') {
await childProcess.runChildProcess(message.value)
process.exit()
}
})
|