File size: 5,639 Bytes
4114d85
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import { IChildProcessMessage, IReactFlowNode, IReactFlowObject, IRunChatflowMessageValue, INodeData } from './Interface'
import { buildLangchain, constructGraphs, getEndingNode, getStartingNodes, resolveVariables } from './utils'

export class ChildProcess {
    /**
     * Stop child process when app is killed
     */
    static async stopChildProcess() {
        setTimeout(() => {
            process.exit(0)
        }, 50000)
    }

    /**
     * Process prediction
     * @param {IRunChatflowMessageValue} messageValue
     * @return {Promise<void>}
     */
    async runChildProcess(messageValue: IRunChatflowMessageValue): Promise<void> {
        process.on('SIGTERM', ChildProcess.stopChildProcess)
        process.on('SIGINT', ChildProcess.stopChildProcess)

        await sendToParentProcess('start', '_')

        // Create a Queue and add our initial node in it
        const { endingNodeData, chatflow, incomingInput, componentNodes } = messageValue

        let nodeToExecuteData: INodeData
        let addToChatFlowPool: any = {}

        /* Don't rebuild the flow (to avoid duplicated upsert, recomputation) when all these conditions met:
         * - Node Data already exists in pool
         * - Still in sync (i.e the flow has not been modified since)
         * - Existing overrideConfig and new overrideConfig are the same
         * - Flow doesn't start with nodes that depend on incomingInput.question
         ***/
        if (endingNodeData) {
            nodeToExecuteData = endingNodeData
        } else {
            /*** Get chatflows and prepare data  ***/
            const flowData = chatflow.flowData
            const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
            const nodes = parsedFlowData.nodes
            const edges = parsedFlowData.edges

            /*** Get Ending Node with Directed Graph  ***/
            const { graph, nodeDependencies } = constructGraphs(nodes, edges)
            const directedGraph = graph
            const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
            if (!endingNodeId) {
                await sendToParentProcess('error', `Ending node must be either a Chain or Agent`)
                return
            }

            const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
            if (!endingNodeData) {
                await sendToParentProcess('error', `Ending node must be either a Chain or Agent`)
                return
            }

            if (
                endingNodeData.outputs &&
                Object.keys(endingNodeData.outputs).length &&
                !Object.values(endingNodeData.outputs).includes(endingNodeData.name)
            ) {
                await sendToParentProcess(
                    'error',
                    `Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
                )
                return
            }

            /*** Get Starting Nodes with Non-Directed Graph ***/
            const constructedObj = constructGraphs(nodes, edges, true)
            const nonDirectedGraph = constructedObj.graph
            const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)

            /*** BFS to traverse from Starting Nodes to Ending Node ***/
            const reactFlowNodes = await buildLangchain(
                startingNodeIds,
                nodes,
                graph,
                depthQueue,
                componentNodes,
                incomingInput.question,
                incomingInput?.overrideConfig
            )

            const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
            if (!nodeToExecute) {
                await sendToParentProcess('error', `Node ${endingNodeId} not found`)
                return
            }

            const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question)
            nodeToExecuteData = reactFlowNodeData

            const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
            addToChatFlowPool = {
                chatflowid: chatflow.id,
                nodeToExecuteData,
                startingNodes,
                overrideConfig: incomingInput?.overrideConfig
            }
        }

        const nodeInstanceFilePath = componentNodes[nodeToExecuteData.name].filePath as string
        const nodeModule = await import(nodeInstanceFilePath)
        const nodeInstance = new nodeModule.nodeClass()

        const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history })

        await sendToParentProcess('finish', { result, addToChatFlowPool })
    }
}

/**
 * Send data back to parent process
 * @param {string} key Key of message
 * @param {*} value Value of message
 * @returns {Promise<void>}
 */
async function sendToParentProcess(key: string, value: any): Promise<void> {
    // tslint:disable-line:no-any
    return new Promise((resolve, reject) => {
        process.send!(
            {
                key,
                value
            },
            (error: Error) => {
                if (error) {
                    return reject(error)
                }
                resolve()
            }
        )
    })
}

const childProcess = new ChildProcess()

process.on('message', async (message: IChildProcessMessage) => {
    if (message.key === 'start') {
        await childProcess.runChildProcess(message.value)
        process.exit()
    }
})