rohan13's picture
Flowise Changes
4114d85
import path from 'path'
import fs from 'fs'
import moment from 'moment'
import {
IComponentNodes,
IDepthQueue,
IExploredNode,
INodeDependencies,
INodeDirectedGraph,
INodeQueue,
IReactFlowEdge,
IReactFlowNode,
IVariableDict,
INodeData,
IOverrideConfig
} from '../Interface'
import { cloneDeep, get, omit, merge } from 'lodash'
import { ICommonObject, getInputVariables } from 'flowise-components'
import { scryptSync, randomBytes, timingSafeEqual } from 'crypto'
const QUESTION_VAR_PREFIX = 'question'
/**
* Returns the home folder path of the user if
* none can be found it falls back to the current
* working directory
*
*/
export const getUserHome = (): string => {
let variableName = 'HOME'
if (process.platform === 'win32') {
variableName = 'USERPROFILE'
}
if (process.env[variableName] === undefined) {
// If for some reason the variable does not exist
// fall back to current folder
return process.cwd()
}
return process.env[variableName] as string
}
/**
* Returns the path of node modules package
* @param {string} packageName
* @returns {string}
*/
export const getNodeModulesPackagePath = (packageName: string): string => {
const checkPaths = [
path.join(__dirname, '..', 'node_modules', packageName),
path.join(__dirname, '..', '..', 'node_modules', packageName),
path.join(__dirname, '..', '..', '..', 'node_modules', packageName),
path.join(__dirname, '..', '..', '..', '..', 'node_modules', packageName),
path.join(__dirname, '..', '..', '..', '..', '..', 'node_modules', packageName)
]
for (const checkPath of checkPaths) {
if (fs.existsSync(checkPath)) {
return checkPath
}
}
return ''
}
/**
* Construct graph and node dependencies score
* @param {IReactFlowNode[]} reactFlowNodes
* @param {IReactFlowEdge[]} reactFlowEdges
* @param {boolean} isNondirected
*/
export const constructGraphs = (reactFlowNodes: IReactFlowNode[], reactFlowEdges: IReactFlowEdge[], isNondirected = false) => {
const nodeDependencies = {} as INodeDependencies
const graph = {} as INodeDirectedGraph
for (let i = 0; i < reactFlowNodes.length; i += 1) {
const nodeId = reactFlowNodes[i].id
nodeDependencies[nodeId] = 0
graph[nodeId] = []
}
for (let i = 0; i < reactFlowEdges.length; i += 1) {
const source = reactFlowEdges[i].source
const target = reactFlowEdges[i].target
if (Object.prototype.hasOwnProperty.call(graph, source)) {
graph[source].push(target)
} else {
graph[source] = [target]
}
if (isNondirected) {
if (Object.prototype.hasOwnProperty.call(graph, target)) {
graph[target].push(source)
} else {
graph[target] = [source]
}
}
nodeDependencies[target] += 1
}
return { graph, nodeDependencies }
}
/**
* Get starting nodes and check if flow is valid
* @param {INodeDependencies} graph
* @param {string} endNodeId
*/
export const getStartingNodes = (graph: INodeDirectedGraph, endNodeId: string) => {
const visited = new Set<string>()
const queue: Array<[string, number]> = [[endNodeId, 0]]
const depthQueue: IDepthQueue = {
[endNodeId]: 0
}
let maxDepth = 0
let startingNodeIds: string[] = []
while (queue.length > 0) {
const [currentNode, depth] = queue.shift()!
if (visited.has(currentNode)) {
continue
}
visited.add(currentNode)
if (depth > maxDepth) {
maxDepth = depth
startingNodeIds = [currentNode]
} else if (depth === maxDepth) {
startingNodeIds.push(currentNode)
}
for (const neighbor of graph[currentNode]) {
if (!visited.has(neighbor)) {
queue.push([neighbor, depth + 1])
depthQueue[neighbor] = depth + 1
}
}
}
const depthQueueReversed: IDepthQueue = {}
for (const nodeId in depthQueue) {
if (Object.prototype.hasOwnProperty.call(depthQueue, nodeId)) {
depthQueueReversed[nodeId] = Math.abs(depthQueue[nodeId] - maxDepth)
}
}
return { startingNodeIds, depthQueue: depthQueueReversed }
}
/**
* Get ending node and check if flow is valid
* @param {INodeDependencies} nodeDependencies
* @param {INodeDirectedGraph} graph
*/
export const getEndingNode = (nodeDependencies: INodeDependencies, graph: INodeDirectedGraph) => {
let endingNodeId = ''
Object.keys(graph).forEach((nodeId) => {
if (Object.keys(nodeDependencies).length === 1) {
endingNodeId = nodeId
} else if (!graph[nodeId].length && nodeDependencies[nodeId] > 0) {
endingNodeId = nodeId
}
})
return endingNodeId
}
/**
* Build langchain from start to end
* @param {string} startingNodeId
* @param {IReactFlowNode[]} reactFlowNodes
* @param {INodeDirectedGraph} graph
* @param {IDepthQueue} depthQueue
* @param {IComponentNodes} componentNodes
* @param {string} question
*/
export const buildLangchain = async (
startingNodeIds: string[],
reactFlowNodes: IReactFlowNode[],
graph: INodeDirectedGraph,
depthQueue: IDepthQueue,
componentNodes: IComponentNodes,
question: string,
overrideConfig?: ICommonObject
) => {
const flowNodes = cloneDeep(reactFlowNodes)
// Create a Queue and add our initial node in it
const nodeQueue = [] as INodeQueue[]
const exploredNode = {} as IExploredNode
// In the case of infinite loop, only max 3 loops will be executed
const maxLoop = 3
for (let i = 0; i < startingNodeIds.length; i += 1) {
nodeQueue.push({ nodeId: startingNodeIds[i], depth: 0 })
exploredNode[startingNodeIds[i]] = { remainingLoop: maxLoop, lastSeenDepth: 0 }
}
while (nodeQueue.length) {
const { nodeId, depth } = nodeQueue.shift() as INodeQueue
const reactFlowNode = flowNodes.find((nd) => nd.id === nodeId)
const nodeIndex = flowNodes.findIndex((nd) => nd.id === nodeId)
if (!reactFlowNode || reactFlowNode === undefined || nodeIndex < 0) continue
try {
const nodeInstanceFilePath = componentNodes[reactFlowNode.data.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const newNodeInstance = new nodeModule.nodeClass()
let flowNodeData = cloneDeep(reactFlowNode.data)
if (overrideConfig) flowNodeData = replaceInputsWithConfig(flowNodeData, overrideConfig)
const reactFlowNodeData: INodeData = resolveVariables(flowNodeData, flowNodes, question)
flowNodes[nodeIndex].data.instance = await newNodeInstance.init(reactFlowNodeData, question)
} catch (e: any) {
console.error(e)
throw new Error(e)
}
const neighbourNodeIds = graph[nodeId]
const nextDepth = depth + 1
// Find other nodes that are on the same depth level
const sameDepthNodeIds = Object.keys(depthQueue).filter((key) => depthQueue[key] === nextDepth)
for (const id of sameDepthNodeIds) {
if (neighbourNodeIds.includes(id)) continue
neighbourNodeIds.push(id)
}
for (let i = 0; i < neighbourNodeIds.length; i += 1) {
const neighNodeId = neighbourNodeIds[i]
// If nodeId has been seen, cycle detected
if (Object.prototype.hasOwnProperty.call(exploredNode, neighNodeId)) {
const { remainingLoop, lastSeenDepth } = exploredNode[neighNodeId]
if (lastSeenDepth === nextDepth) continue
if (remainingLoop === 0) {
break
}
const remainingLoopMinusOne = remainingLoop - 1
exploredNode[neighNodeId] = { remainingLoop: remainingLoopMinusOne, lastSeenDepth: nextDepth }
nodeQueue.push({ nodeId: neighNodeId, depth: nextDepth })
} else {
exploredNode[neighNodeId] = { remainingLoop: maxLoop, lastSeenDepth: nextDepth }
nodeQueue.push({ nodeId: neighNodeId, depth: nextDepth })
}
}
}
return flowNodes
}
/**
* Get variable value from outputResponses.output
* @param {string} paramValue
* @param {IReactFlowNode[]} reactFlowNodes
* @param {string} question
* @param {boolean} isAcceptVariable
* @returns {string}
*/
export const getVariableValue = (paramValue: string, reactFlowNodes: IReactFlowNode[], question: string, isAcceptVariable = false) => {
let returnVal = paramValue
const variableStack = []
const variableDict = {} as IVariableDict
let startIdx = 0
const endIdx = returnVal.length - 1
while (startIdx < endIdx) {
const substr = returnVal.substring(startIdx, startIdx + 2)
// Store the opening double curly bracket
if (substr === '{{') {
variableStack.push({ substr, startIdx: startIdx + 2 })
}
// Found the complete variable
if (substr === '}}' && variableStack.length > 0 && variableStack[variableStack.length - 1].substr === '{{') {
const variableStartIdx = variableStack[variableStack.length - 1].startIdx
const variableEndIdx = startIdx
const variableFullPath = returnVal.substring(variableStartIdx, variableEndIdx)
if (isAcceptVariable && variableFullPath === QUESTION_VAR_PREFIX) {
variableDict[`{{${variableFullPath}}}`] = question
}
// Split by first occurrence of '.' to get just nodeId
const [variableNodeId, _] = variableFullPath.split('.')
const executedNode = reactFlowNodes.find((nd) => nd.id === variableNodeId)
if (executedNode) {
const variableValue = get(executedNode.data, 'instance')
if (isAcceptVariable) {
variableDict[`{{${variableFullPath}}}`] = variableValue
} else {
returnVal = variableValue
}
}
variableStack.pop()
}
startIdx += 1
}
if (isAcceptVariable) {
const variablePaths = Object.keys(variableDict)
variablePaths.sort() // Sort by length of variable path because longer path could possibly contains nested variable
variablePaths.forEach((path) => {
const variableValue = variableDict[path]
// Replace all occurrence
returnVal = returnVal.split(path).join(variableValue)
})
return returnVal
}
return returnVal
}
/**
* Temporarily disable streaming if vectorStore is Faiss
* @param {INodeData} flowNodeData
* @returns {boolean}
*/
export const isVectorStoreFaiss = (flowNodeData: INodeData) => {
if (flowNodeData.inputs && flowNodeData.inputs.vectorStoreRetriever) {
const vectorStoreRetriever = flowNodeData.inputs.vectorStoreRetriever
if (typeof vectorStoreRetriever === 'string' && vectorStoreRetriever.includes('faiss')) return true
if (
typeof vectorStoreRetriever === 'object' &&
vectorStoreRetriever.vectorStore &&
vectorStoreRetriever.vectorStore.constructor.name === 'FaissStore'
)
return true
}
return false
}
/**
* Loop through each inputs and resolve variable if neccessary
* @param {INodeData} reactFlowNodeData
* @param {IReactFlowNode[]} reactFlowNodes
* @param {string} question
* @returns {INodeData}
*/
export const resolveVariables = (reactFlowNodeData: INodeData, reactFlowNodes: IReactFlowNode[], question: string): INodeData => {
let flowNodeData = cloneDeep(reactFlowNodeData)
if (reactFlowNodeData.instance && isVectorStoreFaiss(reactFlowNodeData)) {
// omit and merge because cloneDeep of instance gives "Illegal invocation" Exception
const flowNodeDataWithoutInstance = cloneDeep(omit(reactFlowNodeData, ['instance']))
flowNodeData = merge(flowNodeDataWithoutInstance, { instance: reactFlowNodeData.instance })
}
const types = 'inputs'
const getParamValues = (paramsObj: ICommonObject) => {
for (const key in paramsObj) {
const paramValue: string = paramsObj[key]
if (Array.isArray(paramValue)) {
const resolvedInstances = []
for (const param of paramValue) {
const resolvedInstance = getVariableValue(param, reactFlowNodes, question)
resolvedInstances.push(resolvedInstance)
}
paramsObj[key] = resolvedInstances
} else {
const isAcceptVariable = reactFlowNodeData.inputParams.find((param) => param.name === key)?.acceptVariable ?? false
const resolvedInstance = getVariableValue(paramValue, reactFlowNodes, question, isAcceptVariable)
paramsObj[key] = resolvedInstance
}
}
}
const paramsObj = flowNodeData[types] ?? {}
getParamValues(paramsObj)
return flowNodeData
}
/**
* Loop through each inputs and replace their value with override config values
* @param {INodeData} flowNodeData
* @param {ICommonObject} overrideConfig
* @returns {INodeData}
*/
export const replaceInputsWithConfig = (flowNodeData: INodeData, overrideConfig: ICommonObject) => {
const types = 'inputs'
const getParamValues = (paramsObj: ICommonObject) => {
for (const key in paramsObj) {
const paramValue: string = paramsObj[key]
paramsObj[key] = overrideConfig[key] ?? paramValue
}
}
const paramsObj = flowNodeData[types] ?? {}
getParamValues(paramsObj)
return flowNodeData
}
/**
* Rebuild flow if LLMChain has dependency on other chains
* User Question => Prompt_0 => LLMChain_0 => Prompt-1 => LLMChain_1
* @param {IReactFlowNode[]} startingNodes
* @returns {boolean}
*/
export const isStartNodeDependOnInput = (startingNodes: IReactFlowNode[]): boolean => {
for (const node of startingNodes) {
for (const inputName in node.data.inputs) {
const inputVariables = getInputVariables(node.data.inputs[inputName])
if (inputVariables.length > 0) return true
}
}
return false
}
/**
* Rebuild flow if new override config is provided
* @param {boolean} isInternal
* @param {ICommonObject} existingOverrideConfig
* @param {ICommonObject} newOverrideConfig
* @returns {boolean}
*/
export const isSameOverrideConfig = (
isInternal: boolean,
existingOverrideConfig?: ICommonObject,
newOverrideConfig?: ICommonObject
): boolean => {
if (isInternal) {
if (existingOverrideConfig && Object.keys(existingOverrideConfig).length) return false
return true
}
// If existing and new overrideconfig are the same
if (
existingOverrideConfig &&
Object.keys(existingOverrideConfig).length &&
newOverrideConfig &&
Object.keys(newOverrideConfig).length &&
JSON.stringify(existingOverrideConfig) === JSON.stringify(newOverrideConfig)
) {
return true
}
// If there is no existing and new overrideconfig
if (!existingOverrideConfig && !newOverrideConfig) return true
return false
}
/**
* Returns the api key path
* @returns {string}
*/
export const getAPIKeyPath = (): string => {
return path.join(__dirname, '..', '..', 'api.json')
}
/**
* Generate the api key
* @returns {string}
*/
export const generateAPIKey = (): string => {
const buffer = randomBytes(32)
return buffer.toString('base64')
}
/**
* Generate the secret key
* @param {string} apiKey
* @returns {string}
*/
export const generateSecretHash = (apiKey: string): string => {
const salt = randomBytes(8).toString('hex')
const buffer = scryptSync(apiKey, salt, 64) as Buffer
return `${buffer.toString('hex')}.${salt}`
}
/**
* Verify valid keys
* @param {string} storedKey
* @param {string} suppliedKey
* @returns {boolean}
*/
export const compareKeys = (storedKey: string, suppliedKey: string): boolean => {
const [hashedPassword, salt] = storedKey.split('.')
const buffer = scryptSync(suppliedKey, salt, 64) as Buffer
return timingSafeEqual(Buffer.from(hashedPassword, 'hex'), buffer)
}
/**
* Get API keys
* @returns {Promise<ICommonObject[]>}
*/
export const getAPIKeys = async (): Promise<ICommonObject[]> => {
try {
const content = await fs.promises.readFile(getAPIKeyPath(), 'utf8')
return JSON.parse(content)
} catch (error) {
const keyName = 'DefaultKey'
const apiKey = generateAPIKey()
const apiSecret = generateSecretHash(apiKey)
const content = [
{
keyName,
apiKey,
apiSecret,
createdAt: moment().format('DD-MMM-YY'),
id: randomBytes(16).toString('hex')
}
]
await fs.promises.writeFile(getAPIKeyPath(), JSON.stringify(content), 'utf8')
return content
}
}
/**
* Add new API key
* @param {string} keyName
* @returns {Promise<ICommonObject[]>}
*/
export const addAPIKey = async (keyName: string): Promise<ICommonObject[]> => {
const existingAPIKeys = await getAPIKeys()
const apiKey = generateAPIKey()
const apiSecret = generateSecretHash(apiKey)
const content = [
...existingAPIKeys,
{
keyName,
apiKey,
apiSecret,
createdAt: moment().format('DD-MMM-YY'),
id: randomBytes(16).toString('hex')
}
]
await fs.promises.writeFile(getAPIKeyPath(), JSON.stringify(content), 'utf8')
return content
}
/**
* Update existing API key
* @param {string} keyIdToUpdate
* @param {string} newKeyName
* @returns {Promise<ICommonObject[]>}
*/
export const updateAPIKey = async (keyIdToUpdate: string, newKeyName: string): Promise<ICommonObject[]> => {
const existingAPIKeys = await getAPIKeys()
const keyIndex = existingAPIKeys.findIndex((key) => key.id === keyIdToUpdate)
if (keyIndex < 0) return []
existingAPIKeys[keyIndex].keyName = newKeyName
await fs.promises.writeFile(getAPIKeyPath(), JSON.stringify(existingAPIKeys), 'utf8')
return existingAPIKeys
}
/**
* Delete API key
* @param {string} keyIdToDelete
* @returns {Promise<ICommonObject[]>}
*/
export const deleteAPIKey = async (keyIdToDelete: string): Promise<ICommonObject[]> => {
const existingAPIKeys = await getAPIKeys()
const result = existingAPIKeys.filter((key) => key.id !== keyIdToDelete)
await fs.promises.writeFile(getAPIKeyPath(), JSON.stringify(result), 'utf8')
return result
}
/**
* Replace all api keys
* @param {ICommonObject[]} content
* @returns {Promise<void>}
*/
export const replaceAllAPIKeys = async (content: ICommonObject[]): Promise<void> => {
try {
await fs.promises.writeFile(getAPIKeyPath(), JSON.stringify(content), 'utf8')
} catch (error) {
console.error(error)
}
}
/**
* Map MimeType to InputField
* @param {string} mimeType
* @returns {Promise<string>}
*/
export const mapMimeTypeToInputField = (mimeType: string) => {
switch (mimeType) {
case 'text/plain':
return 'txtFile'
case 'application/pdf':
return 'pdfFile'
case 'application/json':
return 'jsonFile'
case 'text/csv':
return 'csvFile'
case 'application/vnd.openxmlformats-officedocument.wordprocessingml.document':
return 'docxFile'
default:
return ''
}
}
/**
* Find all available inpur params config
* @param {IReactFlowNode[]} reactFlowNodes
* @returns {Promise<IOverrideConfig[]>}
*/
export const findAvailableConfigs = (reactFlowNodes: IReactFlowNode[]) => {
const configs: IOverrideConfig[] = []
for (const flowNode of reactFlowNodes) {
for (const inputParam of flowNode.data.inputParams) {
let obj: IOverrideConfig
if (inputParam.type === 'password' || inputParam.type === 'options') {
continue
} else if (inputParam.type === 'file') {
obj = {
node: flowNode.data.label,
label: inputParam.label,
name: 'files',
type: inputParam.fileType ?? inputParam.type
}
} else {
obj = {
node: flowNode.data.label,
label: inputParam.label,
name: inputParam.name,
type: inputParam.type
}
}
if (!configs.some((config) => JSON.stringify(config) === JSON.stringify(obj))) {
configs.push(obj)
}
}
}
return configs
}
/**
* Check to see if flow valid for stream
* @param {IReactFlowNode[]} reactFlowNodes
* @param {INodeData} endingNodeData
* @returns {boolean}
*/
export const isFlowValidForStream = (reactFlowNodes: IReactFlowNode[], endingNodeData: INodeData) => {
const streamAvailableLLMs = {
'Chat Models': ['azureChatOpenAI', 'chatOpenAI', 'chatAnthropic'],
LLMs: ['azureOpenAI', 'openAI']
}
let isChatOrLLMsExist = false
for (const flowNode of reactFlowNodes) {
const data = flowNode.data
if (data.category === 'Chat Models' || data.category === 'LLMs') {
isChatOrLLMsExist = true
const validLLMs = streamAvailableLLMs[data.category]
if (!validLLMs.includes(data.name)) return false
}
}
return isChatOrLLMsExist && endingNodeData.category === 'Chains' && !isVectorStoreFaiss(endingNodeData)
}