rohan13's picture
Flowise Changes
4114d85
import express, { Request, Response } from 'express'
import multer from 'multer'
import path from 'path'
import cors from 'cors'
import http from 'http'
import * as fs from 'fs'
import basicAuth from 'express-basic-auth'
import { Server } from 'socket.io'
import {
IChatFlow,
IncomingInput,
IReactFlowNode,
IReactFlowObject,
INodeData,
IDatabaseExport,
IRunChatflowMessageValue,
IChildProcessMessage
} from './Interface'
import {
getNodeModulesPackagePath,
getStartingNodes,
buildLangchain,
getEndingNode,
constructGraphs,
resolveVariables,
isStartNodeDependOnInput,
getAPIKeys,
addAPIKey,
updateAPIKey,
deleteAPIKey,
compareKeys,
mapMimeTypeToInputField,
findAvailableConfigs,
isSameOverrideConfig,
replaceAllAPIKeys,
isFlowValidForStream,
isVectorStoreFaiss
} from './utils'
import { cloneDeep } from 'lodash'
import { getDataSource } from './DataSource'
import { NodesPool } from './NodesPool'
import { ChatFlow } from './entity/ChatFlow'
import { ChatMessage } from './entity/ChatMessage'
import { ChatflowPool } from './ChatflowPool'
import { ICommonObject } from 'flowise-components'
import { fork } from 'child_process'
export class App {
app: express.Application
nodesPool: NodesPool
chatflowPool: ChatflowPool
AppDataSource = getDataSource()
constructor() {
this.app = express()
}
async initDatabase() {
// Initialize database
this.AppDataSource.initialize()
.then(async () => {
console.info('πŸ“¦[server]: Data Source has been initialized!')
// Initialize pools
this.nodesPool = new NodesPool()
await this.nodesPool.initialize()
this.chatflowPool = new ChatflowPool()
// Initialize API keys
await getAPIKeys()
})
.catch((err) => {
console.error('❌[server]: Error during Data Source initialization:', err)
})
}
async config(socketIO?: Server) {
// Limit is needed to allow sending/receiving base64 encoded string
this.app.use(express.json({ limit: '50mb' }))
this.app.use(express.urlencoded({ limit: '50mb', extended: true }))
// Allow access from *
this.app.use(cors())
if (process.env.FLOWISE_USERNAME && process.env.FLOWISE_PASSWORD) {
const username = process.env.FLOWISE_USERNAME
const password = process.env.FLOWISE_PASSWORD
const basicAuthMiddleware = basicAuth({
users: { [username]: password }
})
const whitelistURLs = ['/api/v1/prediction/', '/api/v1/node-icon/', '/api/v1/chatflows-streaming']
this.app.use((req, res, next) => {
if (req.url.includes('/api/v1/')) {
whitelistURLs.some((url) => req.url.includes(url)) ? next() : basicAuthMiddleware(req, res, next)
} else next()
})
}
const upload = multer({ dest: `${path.join(__dirname, '..', 'uploads')}/` })
// ----------------------------------------
// Nodes
// ----------------------------------------
// Get all component nodes
this.app.get('/api/v1/nodes', (req: Request, res: Response) => {
const returnData = []
for (const nodeName in this.nodesPool.componentNodes) {
const clonedNode = cloneDeep(this.nodesPool.componentNodes[nodeName])
returnData.push(clonedNode)
}
return res.json(returnData)
})
// Get specific component node via name
this.app.get('/api/v1/nodes/:name', (req: Request, res: Response) => {
if (Object.prototype.hasOwnProperty.call(this.nodesPool.componentNodes, req.params.name)) {
return res.json(this.nodesPool.componentNodes[req.params.name])
} else {
throw new Error(`Node ${req.params.name} not found`)
}
})
// Returns specific component node icon via name
this.app.get('/api/v1/node-icon/:name', (req: Request, res: Response) => {
if (Object.prototype.hasOwnProperty.call(this.nodesPool.componentNodes, req.params.name)) {
const nodeInstance = this.nodesPool.componentNodes[req.params.name]
if (nodeInstance.icon === undefined) {
throw new Error(`Node ${req.params.name} icon not found`)
}
if (nodeInstance.icon.endsWith('.svg') || nodeInstance.icon.endsWith('.png') || nodeInstance.icon.endsWith('.jpg')) {
const filepath = nodeInstance.icon
res.sendFile(filepath)
} else {
throw new Error(`Node ${req.params.name} icon is missing icon`)
}
} else {
throw new Error(`Node ${req.params.name} not found`)
}
})
// ----------------------------------------
// Chatflows
// ----------------------------------------
// Get all chatflows
this.app.get('/api/v1/chatflows', async (req: Request, res: Response) => {
const chatflows: IChatFlow[] = await this.AppDataSource.getRepository(ChatFlow).find()
return res.json(chatflows)
})
// Get specific chatflow via id
this.app.get('/api/v1/chatflows/:id', async (req: Request, res: Response) => {
const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({
id: req.params.id
})
if (chatflow) return res.json(chatflow)
return res.status(404).send(`Chatflow ${req.params.id} not found`)
})
// Save chatflow
this.app.post('/api/v1/chatflows', async (req: Request, res: Response) => {
const body = req.body
const newChatFlow = new ChatFlow()
Object.assign(newChatFlow, body)
const chatflow = this.AppDataSource.getRepository(ChatFlow).create(newChatFlow)
const results = await this.AppDataSource.getRepository(ChatFlow).save(chatflow)
return res.json(results)
})
// Update chatflow
this.app.put('/api/v1/chatflows/:id', async (req: Request, res: Response) => {
const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({
id: req.params.id
})
if (!chatflow) {
res.status(404).send(`Chatflow ${req.params.id} not found`)
return
}
const body = req.body
const updateChatFlow = new ChatFlow()
Object.assign(updateChatFlow, body)
this.AppDataSource.getRepository(ChatFlow).merge(chatflow, updateChatFlow)
const result = await this.AppDataSource.getRepository(ChatFlow).save(chatflow)
// Update chatflowpool inSync to false, to build Langchain again because data has been changed
this.chatflowPool.updateInSync(chatflow.id, false)
return res.json(result)
})
// Delete chatflow via id
this.app.delete('/api/v1/chatflows/:id', async (req: Request, res: Response) => {
const results = await this.AppDataSource.getRepository(ChatFlow).delete({ id: req.params.id })
return res.json(results)
})
// Check if chatflow valid for streaming
this.app.get('/api/v1/chatflows-streaming/:id', async (req: Request, res: Response) => {
const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({
id: req.params.id
})
if (!chatflow) return res.status(404).send(`Chatflow ${req.params.id} not found`)
/*** Get Ending Node with Directed Graph ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const endingNodeId = getEndingNode(nodeDependencies, graph)
if (!endingNodeId) return res.status(500).send(`Ending node must be either a Chain or Agent`)
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) return res.status(500).send(`Ending node must be either a Chain or Agent`)
const obj = {
isStreaming: isFlowValidForStream(nodes, endingNodeData)
}
return res.json(obj)
})
// ----------------------------------------
// ChatMessage
// ----------------------------------------
// Get all chatmessages from chatflowid
this.app.get('/api/v1/chatmessage/:id', async (req: Request, res: Response) => {
const chatmessages = await this.AppDataSource.getRepository(ChatMessage).findBy({
chatflowid: req.params.id
})
return res.json(chatmessages)
})
// Add chatmessages for chatflowid
this.app.post('/api/v1/chatmessage/:id', async (req: Request, res: Response) => {
const body = req.body
const newChatMessage = new ChatMessage()
Object.assign(newChatMessage, body)
const chatmessage = this.AppDataSource.getRepository(ChatMessage).create(newChatMessage)
const results = await this.AppDataSource.getRepository(ChatMessage).save(chatmessage)
return res.json(results)
})
// Delete all chatmessages from chatflowid
this.app.delete('/api/v1/chatmessage/:id', async (req: Request, res: Response) => {
const results = await this.AppDataSource.getRepository(ChatMessage).delete({ chatflowid: req.params.id })
return res.json(results)
})
// ----------------------------------------
// Configuration
// ----------------------------------------
this.app.get('/api/v1/flow-config/:id', async (req: Request, res: Response) => {
const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({
id: req.params.id
})
if (!chatflow) return res.status(404).send(`Chatflow ${req.params.id} not found`)
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const availableConfigs = findAvailableConfigs(nodes)
return res.json(availableConfigs)
})
// ----------------------------------------
// Export Load Chatflow & ChatMessage & Apikeys
// ----------------------------------------
this.app.get('/api/v1/database/export', async (req: Request, res: Response) => {
const chatmessages = await this.AppDataSource.getRepository(ChatMessage).find()
const chatflows = await this.AppDataSource.getRepository(ChatFlow).find()
const apikeys = await getAPIKeys()
const result: IDatabaseExport = {
chatmessages,
chatflows,
apikeys
}
return res.json(result)
})
this.app.post('/api/v1/database/load', async (req: Request, res: Response) => {
const databaseItems: IDatabaseExport = req.body
await this.AppDataSource.getRepository(ChatFlow).delete({})
await this.AppDataSource.getRepository(ChatMessage).delete({})
let error = ''
// Get a new query runner instance
const queryRunner = this.AppDataSource.createQueryRunner()
// Start a new transaction
await queryRunner.startTransaction()
try {
const chatflows: ChatFlow[] = databaseItems.chatflows
const chatmessages: ChatMessage[] = databaseItems.chatmessages
await queryRunner.manager.insert(ChatFlow, chatflows)
await queryRunner.manager.insert(ChatMessage, chatmessages)
await queryRunner.commitTransaction()
} catch (err: any) {
error = err?.message ?? 'Error loading database'
await queryRunner.rollbackTransaction()
} finally {
await queryRunner.release()
}
await replaceAllAPIKeys(databaseItems.apikeys)
if (error) return res.status(500).send(error)
return res.status(201).send('OK')
})
// ----------------------------------------
// Prediction
// ----------------------------------------
// Send input message and get prediction result (External)
this.app.post('/api/v1/prediction/:id', upload.array('files'), async (req: Request, res: Response) => {
await this.processPrediction(req, res, socketIO)
})
// Send input message and get prediction result (Internal)
this.app.post('/api/v1/internal-prediction/:id', async (req: Request, res: Response) => {
await this.processPrediction(req, res, socketIO, true)
})
// ----------------------------------------
// Marketplaces
// ----------------------------------------
// Get all chatflows for marketplaces
this.app.get('/api/v1/marketplaces', async (req: Request, res: Response) => {
const marketplaceDir = path.join(__dirname, '..', 'marketplaces')
const jsonsInDir = fs.readdirSync(marketplaceDir).filter((file) => path.extname(file) === '.json')
const templates: any[] = []
jsonsInDir.forEach((file, index) => {
const filePath = path.join(__dirname, '..', 'marketplaces', file)
const fileData = fs.readFileSync(filePath)
const fileDataObj = JSON.parse(fileData.toString())
const template = {
id: index,
name: file.split('.json')[0],
flowData: fileData.toString(),
description: fileDataObj?.description || ''
}
templates.push(template)
})
return res.json(templates)
})
// ----------------------------------------
// API Keys
// ----------------------------------------
// Get api keys
this.app.get('/api/v1/apikey', async (req: Request, res: Response) => {
const keys = await getAPIKeys()
return res.json(keys)
})
// Add new api key
this.app.post('/api/v1/apikey', async (req: Request, res: Response) => {
const keys = await addAPIKey(req.body.keyName)
return res.json(keys)
})
// Update api key
this.app.put('/api/v1/apikey/:id', async (req: Request, res: Response) => {
const keys = await updateAPIKey(req.params.id, req.body.keyName)
return res.json(keys)
})
// Delete new api key
this.app.delete('/api/v1/apikey/:id', async (req: Request, res: Response) => {
const keys = await deleteAPIKey(req.params.id)
return res.json(keys)
})
// ----------------------------------------
// Serve UI static
// ----------------------------------------
const packagePath = getNodeModulesPackagePath('flowise-ui')
const uiBuildPath = path.join(packagePath, 'build')
const uiHtmlPath = path.join(packagePath, 'build', 'index.html')
this.app.use('/', express.static(uiBuildPath))
// All other requests not handled will return React app
this.app.use((req, res) => {
res.sendFile(uiHtmlPath)
})
}
/**
* Validate API Key
* @param {Request} req
* @param {Response} res
* @param {ChatFlow} chatflow
*/
async validateKey(req: Request, res: Response, chatflow: ChatFlow) {
const chatFlowApiKeyId = chatflow.apikeyid
const authorizationHeader = (req.headers['Authorization'] as string) ?? (req.headers['authorization'] as string) ?? ''
if (chatFlowApiKeyId && !authorizationHeader) return res.status(401).send(`Unauthorized`)
const suppliedKey = authorizationHeader.split(`Bearer `).pop()
if (chatFlowApiKeyId && suppliedKey) {
const keys = await getAPIKeys()
const apiSecret = keys.find((key) => key.id === chatFlowApiKeyId)?.apiSecret
if (!compareKeys(apiSecret, suppliedKey)) return res.status(401).send(`Unauthorized`)
}
}
/**
* Start child process
* @param {ChatFlow} chatflow
* @param {IncomingInput} incomingInput
* @param {INodeData} endingNodeData
*/
async startChildProcess(chatflow: ChatFlow, incomingInput: IncomingInput, endingNodeData?: INodeData) {
try {
const controller = new AbortController()
const { signal } = controller
let childpath = path.join(__dirname, '..', 'dist', 'ChildProcess.js')
if (!fs.existsSync(childpath)) childpath = 'ChildProcess.ts'
const childProcess = fork(childpath, [], { signal })
const value = {
chatflow,
incomingInput,
componentNodes: cloneDeep(this.nodesPool.componentNodes),
endingNodeData
} as IRunChatflowMessageValue
childProcess.send({ key: 'start', value } as IChildProcessMessage)
let childProcessTimeout: NodeJS.Timeout
return new Promise((resolve, reject) => {
childProcess.on('message', async (message: IChildProcessMessage) => {
if (message.key === 'finish') {
const { result, addToChatFlowPool } = message.value as ICommonObject
if (childProcessTimeout) {
clearTimeout(childProcessTimeout)
}
if (Object.keys(addToChatFlowPool).length) {
const { chatflowid, nodeToExecuteData, startingNodes, overrideConfig } = addToChatFlowPool
this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, overrideConfig)
}
resolve(result)
}
if (message.key === 'start') {
if (process.env.EXECUTION_TIMEOUT) {
childProcessTimeout = setTimeout(async () => {
childProcess.kill()
resolve(undefined)
}, parseInt(process.env.EXECUTION_TIMEOUT, 10))
}
}
if (message.key === 'error') {
let errMessage = message.value as string
if (childProcessTimeout) {
clearTimeout(childProcessTimeout)
}
reject(errMessage)
}
})
})
} catch (err) {
console.error(err)
}
}
/**
* Process Prediction
* @param {Request} req
* @param {Response} res
* @param {Server} socketIO
* @param {boolean} isInternal
*/
async processPrediction(req: Request, res: Response, socketIO?: Server, isInternal = false) {
try {
const chatflowid = req.params.id
let incomingInput: IncomingInput = req.body
let nodeToExecuteData: INodeData
const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({
id: chatflowid
})
if (!chatflow) return res.status(404).send(`Chatflow ${chatflowid} not found`)
if (!isInternal) {
await this.validateKey(req, res, chatflow)
}
let isStreamValid = false
const files = (req.files as any[]) || []
if (files.length) {
const overrideConfig: ICommonObject = { ...req.body }
for (const file of files) {
const fileData = fs.readFileSync(file.path, { encoding: 'base64' })
const dataBase64String = `data:${file.mimetype};base64,${fileData},filename:${file.filename}`
const fileInputField = mapMimeTypeToInputField(file.mimetype)
if (overrideConfig[fileInputField]) {
overrideConfig[fileInputField] = JSON.stringify([...JSON.parse(overrideConfig[fileInputField]), dataBase64String])
} else {
overrideConfig[fileInputField] = JSON.stringify([dataBase64String])
}
}
incomingInput = {
question: req.body.question ?? 'hello',
overrideConfig,
history: []
}
}
/* Don't rebuild the flow (to avoid duplicated upsert, recomputation) when all these conditions met:
* - Node Data already exists in pool
* - Still in sync (i.e the flow has not been modified since)
* - Existing overrideConfig and new overrideConfig are the same
* - Flow doesn't start with nodes that depend on incomingInput.question
***/
const isRebuildNeeded = () => {
return (
Object.prototype.hasOwnProperty.call(this.chatflowPool.activeChatflows, chatflowid) &&
this.chatflowPool.activeChatflows[chatflowid].inSync &&
isSameOverrideConfig(
isInternal,
this.chatflowPool.activeChatflows[chatflowid].overrideConfig,
incomingInput.overrideConfig
) &&
!isStartNodeDependOnInput(this.chatflowPool.activeChatflows[chatflowid].startingNodes)
)
}
if (process.env.EXECUTION_MODE === 'child') {
if (isRebuildNeeded()) {
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
try {
const result = await this.startChildProcess(chatflow, incomingInput, nodeToExecuteData)
return res.json(result)
} catch (error) {
return res.status(500).send(error)
}
} else {
try {
const result = await this.startChildProcess(chatflow, incomingInput)
return res.json(result)
} catch (error) {
return res.status(500).send(error)
}
}
} else {
/*** Get chatflows and prepare data ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
if (isRebuildNeeded()) {
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
isStreamValid = isFlowValidForStream(nodes, nodeToExecuteData)
} else {
/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
if (!endingNodeId) return res.status(500).send(`Ending node must be either a Chain or Agent`)
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) return res.status(500).send(`Ending node must be either a Chain or Agent`)
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
) {
return res
.status(500)
.send(
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
)
}
isStreamValid = isFlowValidForStream(nodes, endingNodeData)
/*** Get Starting Nodes with Non-Directed Graph ***/
const constructedObj = constructGraphs(nodes, edges, true)
const nonDirectedGraph = constructedObj.graph
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
/*** BFS to traverse from Starting Nodes to Ending Node ***/
const reactFlowNodes = await buildLangchain(
startingNodeIds,
nodes,
graph,
depthQueue,
this.nodesPool.componentNodes,
incomingInput.question,
incomingInput?.overrideConfig
)
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`)
const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question)
nodeToExecuteData = reactFlowNodeData
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, incomingInput?.overrideConfig)
}
const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass()
isStreamValid = isStreamValid && !isVectorStoreFaiss(nodeToExecuteData)
const result = isStreamValid
? await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatHistory: incomingInput.history,
socketIO,
socketIOClientId: incomingInput.socketIOClientId
})
: await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history })
return res.json(result)
}
} catch (e: any) {
return res.status(500).send(e.message)
}
}
async stopApp() {
try {
const removePromises: any[] = []
await Promise.all(removePromises)
} catch (e) {
console.error(`❌[server]: Flowise Server shut down error: ${e}`)
}
}
}
let serverApp: App | undefined
export async function start(): Promise<void> {
serverApp = new App()
const port = parseInt(process.env.PORT || '', 10) || 7860
const server = http.createServer(serverApp.app)
const io = new Server(server, {
cors: {
origin: '*'
}
})
await serverApp.initDatabase()
await serverApp.config(io)
server.listen(port, () => {
console.info(`⚑️[server]: Flowise Server is listening at ${port}`)
})
}
export function getInstance(): App | undefined {
return serverApp
}