From 37e9bb4f95b9a49e71ecb9f8c6fd7b4a07c8fe7f Mon Sep 17 00:00:00 2001 From: Maicon Matsubara Date: Sun, 24 Nov 2024 16:32:00 -0300 Subject: [PATCH 1/6] Added redis open connection if it is closed --- .../RedisBackedChatMemory/RedisBackedChatMemory.ts | 2 ++ start.js | 13 +++++++++++++ 2 files changed, 15 insertions(+) create mode 100644 start.js diff --git a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts index 4d6ac3c729b..d80a89697c2 100644 --- a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts +++ b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts @@ -203,6 +203,8 @@ class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { ): Promise { if (!this.redisClient) return [] + if (this.redisClient.status == 'end' || this.redisClient.status == 'close') await this.redisClient.connect() + const id = overrideSessionId ? overrideSessionId : this.sessionId const rawStoredMessages = await this.redisClient.lrange(id, this.windowSize ? this.windowSize * -1 : 0, -1) const orderedMessages = rawStoredMessages.reverse().map((message) => JSON.parse(message)) diff --git a/start.js b/start.js new file mode 100644 index 00000000000..7cec7d9d7c9 --- /dev/null +++ b/start.js @@ -0,0 +1,13 @@ +require('dotenv').config() +const flowise = require('flowise') + +flowise.start({ + PORT: process.env.PORT, + DEBUG: process.env.DEBUG === 'true', + DATABASE_PATH: process.env.DATABASE_PATH, + APIKEY_PATH: process.env.APIKEY_PATH, + SECRETKEY_PATH: process.env.SECRETKEY_PATH, + LOG_PATH: process.env.LOG_PATH, + BLOB_STORAGE_PATH: process.env.BLOB_STORAGE_PATH, + SHOW_COMMUNITY_NODES: process.env.SHOW_COMMUNITY_NODES +}) From 7e69d249706388145546b9692dd36e1bebeedc89 Mon Sep 17 00:00:00 2001 From: Maicon Matsubara Date: Sun, 24 Nov 2024 16:36:16 -0300 Subject: [PATCH 2/6] Removed unecessary modification --- start.js | 13 ------------- 1 file changed, 13 deletions(-) delete mode 100644 start.js diff --git a/start.js b/start.js deleted file mode 100644 index 7cec7d9d7c9..00000000000 --- a/start.js +++ /dev/null @@ -1,13 +0,0 @@ -require('dotenv').config() -const flowise = require('flowise') - -flowise.start({ - PORT: process.env.PORT, - DEBUG: process.env.DEBUG === 'true', - DATABASE_PATH: process.env.DATABASE_PATH, - APIKEY_PATH: process.env.APIKEY_PATH, - SECRETKEY_PATH: process.env.SECRETKEY_PATH, - LOG_PATH: process.env.LOG_PATH, - BLOB_STORAGE_PATH: process.env.BLOB_STORAGE_PATH, - SHOW_COMMUNITY_NODES: process.env.SHOW_COMMUNITY_NODES -}) From 0b932acc171303cd316a82009b66cb83b395902f Mon Sep 17 00:00:00 2001 From: Maicon Matsubara Date: Sun, 24 Nov 2024 19:00:00 -0300 Subject: [PATCH 3/6] Added check connection in all methods --- .../RedisBackedChatMemory/RedisBackedChatMemory.ts | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts index d80a89697c2..5bad550c4fb 100644 --- a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts +++ b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts @@ -196,6 +196,10 @@ class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { this.sessionTTL = fields.sessionTTL } + async checkRedisConnection() { + if (this.redisClient.status == 'end' || this.redisClient.status == 'close') await this.redisClient.connect() + } + async getChatMessages( overrideSessionId = '', returnBaseMessages = false, @@ -203,7 +207,7 @@ class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { ): Promise { if (!this.redisClient) return [] - if (this.redisClient.status == 'end' || this.redisClient.status == 'close') await this.redisClient.connect() + this.checkRedisConnection() const id = overrideSessionId ? overrideSessionId : this.sessionId const rawStoredMessages = await this.redisClient.lrange(id, this.windowSize ? this.windowSize * -1 : 0, -1) @@ -218,6 +222,8 @@ class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise { if (!this.redisClient) return + this.checkRedisConnection() + const id = overrideSessionId ? overrideSessionId : this.sessionId const input = msgArray.find((msg) => msg.type === 'userMessage') const output = msgArray.find((msg) => msg.type === 'apiMessage') @@ -240,6 +246,8 @@ class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { async clearChatMessages(overrideSessionId = ''): Promise { if (!this.redisClient) return + this.checkRedisConnection() + const id = overrideSessionId ? overrideSessionId : this.sessionId await this.redisClient.del(id) await this.clear() From 6540153e495a1a26cca3b0eab37ceb112015867b Mon Sep 17 00:00:00 2001 From: Maicon Matsubara Date: Sun, 24 Nov 2024 19:05:31 -0300 Subject: [PATCH 4/6] Renamed method --- .../memory/RedisBackedChatMemory/RedisBackedChatMemory.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts index 5bad550c4fb..98ad3f70aa1 100644 --- a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts +++ b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts @@ -196,7 +196,7 @@ class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { this.sessionTTL = fields.sessionTTL } - async checkRedisConnection() { + async openClosedConnection() { if (this.redisClient.status == 'end' || this.redisClient.status == 'close') await this.redisClient.connect() } @@ -207,7 +207,7 @@ class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { ): Promise { if (!this.redisClient) return [] - this.checkRedisConnection() + this.openClosedConnection() const id = overrideSessionId ? overrideSessionId : this.sessionId const rawStoredMessages = await this.redisClient.lrange(id, this.windowSize ? this.windowSize * -1 : 0, -1) @@ -222,7 +222,7 @@ class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise { if (!this.redisClient) return - this.checkRedisConnection() + this.openClosedConnection() const id = overrideSessionId ? overrideSessionId : this.sessionId const input = msgArray.find((msg) => msg.type === 'userMessage') @@ -246,7 +246,7 @@ class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { async clearChatMessages(overrideSessionId = ''): Promise { if (!this.redisClient) return - this.checkRedisConnection() + this.openClosedConnection() const id = overrideSessionId ? overrideSessionId : this.sessionId await this.redisClient.del(id) From cc719318ee88a8b4421b12571cd7b61f2a4ad0f0 Mon Sep 17 00:00:00 2001 From: Maicon Matsubara Date: Sun, 24 Nov 2024 19:08:53 -0300 Subject: [PATCH 5/6] added await on method call --- .../memory/RedisBackedChatMemory/RedisBackedChatMemory.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts index 98ad3f70aa1..18f25e60313 100644 --- a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts +++ b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts @@ -207,7 +207,7 @@ class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { ): Promise { if (!this.redisClient) return [] - this.openClosedConnection() + await this.openClosedConnection() const id = overrideSessionId ? overrideSessionId : this.sessionId const rawStoredMessages = await this.redisClient.lrange(id, this.windowSize ? this.windowSize * -1 : 0, -1) @@ -222,7 +222,7 @@ class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise { if (!this.redisClient) return - this.openClosedConnection() + await this.openClosedConnection() const id = overrideSessionId ? overrideSessionId : this.sessionId const input = msgArray.find((msg) => msg.type === 'userMessage') @@ -246,7 +246,7 @@ class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { async clearChatMessages(overrideSessionId = ''): Promise { if (!this.redisClient) return - this.openClosedConnection() + await this.openClosedConnection() const id = overrideSessionId ? overrideSessionId : this.sessionId await this.redisClient.del(id) From d22f9392adb4c09708040c6dd5a90daf17739cc3 Mon Sep 17 00:00:00 2001 From: Maicon Matsubara Date: Wed, 27 Nov 2024 09:44:52 -0300 Subject: [PATCH 6/6] Refactor Redis connection handling: remove singleton pattern, ensure connections are opened and closed per operation. --- .../RedisBackedChatMemory.ts | 178 ++++++------------ 1 file changed, 56 insertions(+), 122 deletions(-) diff --git a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts index 18f25e60313..c60b9431fb2 100644 --- a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts +++ b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts @@ -1,7 +1,5 @@ import { Redis, RedisOptions } from 'ioredis' -import { isEqual } from 'lodash' import { BufferMemory, BufferMemoryInput } from 'langchain/memory' -import { RedisChatMessageHistory, RedisChatMessageHistoryInput } from '@langchain/community/stores/message/ioredis' import { mapStoredMessageToChatMessage, BaseMessage, AIMessage, HumanMessage } from '@langchain/core/messages' import { INode, INodeData, INodeParams, ICommonObject, MessageType, IMessage, MemoryMethods, FlowiseMemory } from '../../../src/Interface' import { @@ -12,42 +10,6 @@ import { mapChatMessageToBaseMessage } from '../../../src/utils' -let redisClientSingleton: Redis -let redisClientOption: RedisOptions -let redisClientUrl: string - -const getRedisClientbyOption = (option: RedisOptions) => { - if (!redisClientSingleton) { - // if client doesn't exists - redisClientSingleton = new Redis(option) - redisClientOption = option - return redisClientSingleton - } else if (redisClientSingleton && !isEqual(option, redisClientOption)) { - // if client exists but option changed - redisClientSingleton.quit() - redisClientSingleton = new Redis(option) - redisClientOption = option - return redisClientSingleton - } - return redisClientSingleton -} - -const getRedisClientbyUrl = (url: string) => { - if (!redisClientSingleton) { - // if client doesn't exists - redisClientSingleton = new Redis(url) - redisClientUrl = url - return redisClientSingleton - } else if (redisClientSingleton && url !== redisClientUrl) { - // if client exists but option changed - redisClientSingleton.quit() - redisClientSingleton = new Redis(url) - redisClientUrl = url - return redisClientSingleton - } - return redisClientSingleton -} - class RedisBackedChatMemory_Memory implements INode { label: string name: string @@ -114,11 +76,11 @@ class RedisBackedChatMemory_Memory implements INode { } async init(nodeData: INodeData, _: string, options: ICommonObject): Promise { - return await initalizeRedis(nodeData, options) + return await initializeRedis(nodeData, options) } } -const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Promise => { +const initializeRedis = async (nodeData: INodeData, options: ICommonObject): Promise => { const sessionTTL = nodeData.inputs?.sessionTTL as number const memoryKey = nodeData.inputs?.memoryKey as string const sessionId = nodeData.inputs?.sessionId as string @@ -127,77 +89,55 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom const credentialData = await getCredentialData(nodeData.credential ?? '', options) const redisUrl = getCredentialParam('redisUrl', credentialData, nodeData) - let client: Redis - - if (!redisUrl || redisUrl === '') { - const username = getCredentialParam('redisCacheUser', credentialData, nodeData) - const password = getCredentialParam('redisCachePwd', credentialData, nodeData) - const portStr = getCredentialParam('redisCachePort', credentialData, nodeData) - const host = getCredentialParam('redisCacheHost', credentialData, nodeData) - const sslEnabled = getCredentialParam('redisCacheSslEnabled', credentialData, nodeData) - - const tlsOptions = sslEnabled === true ? { tls: { rejectUnauthorized: false } } : {} - - client = getRedisClientbyOption({ - port: portStr ? parseInt(portStr) : 6379, - host, - username, - password, - ...tlsOptions - }) - } else { - client = getRedisClientbyUrl(redisUrl) - } - - let obj: RedisChatMessageHistoryInput = { - sessionId, - client - } - - if (sessionTTL) { - obj = { - ...obj, - sessionTTL - } - } - - const redisChatMessageHistory = new RedisChatMessageHistory(obj) + const redisOptions = redisUrl + ? redisUrl + : ({ + port: parseInt(getCredentialParam('redisCachePort', credentialData, nodeData) || '6379'), + host: getCredentialParam('redisCacheHost', credentialData, nodeData), + username: getCredentialParam('redisCacheUser', credentialData, nodeData), + password: getCredentialParam('redisCachePwd', credentialData, nodeData), + tls: getCredentialParam('redisCacheSslEnabled', credentialData, nodeData) ? { rejectUnauthorized: false } : undefined + } as RedisOptions) const memory = new BufferMemoryExtended({ memoryKey: memoryKey ?? 'chat_history', - chatHistory: redisChatMessageHistory, sessionId, windowSize, - redisClient: client, - sessionTTL + sessionTTL, + redisOptions }) return memory } interface BufferMemoryExtendedInput { - redisClient: Redis sessionId: string windowSize?: number sessionTTL?: number + redisOptions: RedisOptions | string } class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { sessionId = '' - redisClient: Redis windowSize?: number sessionTTL?: number + redisOptions: RedisOptions | string constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) { super(fields) this.sessionId = fields.sessionId - this.redisClient = fields.redisClient this.windowSize = fields.windowSize this.sessionTTL = fields.sessionTTL + this.redisOptions = fields.redisOptions } - async openClosedConnection() { - if (this.redisClient.status == 'end' || this.redisClient.status == 'close') await this.redisClient.connect() + private async withRedisClient(fn: (client: Redis) => Promise): Promise { + const client = typeof this.redisOptions === 'string' ? new Redis(this.redisOptions) : new Redis(this.redisOptions) + try { + return await fn(client) + } finally { + await client.quit() + } } async getChatMessages( @@ -205,52 +145,46 @@ class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { returnBaseMessages = false, prependMessages?: IMessage[] ): Promise { - if (!this.redisClient) return [] - - await this.openClosedConnection() - - const id = overrideSessionId ? overrideSessionId : this.sessionId - const rawStoredMessages = await this.redisClient.lrange(id, this.windowSize ? this.windowSize * -1 : 0, -1) - const orderedMessages = rawStoredMessages.reverse().map((message) => JSON.parse(message)) - const baseMessages = orderedMessages.map(mapStoredMessageToChatMessage) - if (prependMessages?.length) { - baseMessages.unshift(...(await mapChatMessageToBaseMessage(prependMessages))) - } - return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + return this.withRedisClient(async (client) => { + const id = overrideSessionId ? overrideSessionId : this.sessionId + const rawStoredMessages = await client.lrange(id, this.windowSize ? this.windowSize * -1 : 0, -1) + const orderedMessages = rawStoredMessages.reverse().map((message) => JSON.parse(message)) + const baseMessages = orderedMessages.map(mapStoredMessageToChatMessage) + if (prependMessages?.length) { + baseMessages.unshift(...(await mapChatMessageToBaseMessage(prependMessages))) + } + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + }) } async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise { - if (!this.redisClient) return - - await this.openClosedConnection() - - const id = overrideSessionId ? overrideSessionId : this.sessionId - const input = msgArray.find((msg) => msg.type === 'userMessage') - const output = msgArray.find((msg) => msg.type === 'apiMessage') - - if (input) { - const newInputMessage = new HumanMessage(input.text) - const messageToAdd = [newInputMessage].map((msg) => msg.toDict()) - await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0])) - if (this.sessionTTL) await this.redisClient.expire(id, this.sessionTTL) - } + await this.withRedisClient(async (client) => { + const id = overrideSessionId ? overrideSessionId : this.sessionId + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') + + if (input) { + const newInputMessage = new HumanMessage(input.text) + const messageToAdd = [newInputMessage].map((msg) => msg.toDict()) + await client.lpush(id, JSON.stringify(messageToAdd[0])) + if (this.sessionTTL) await client.expire(id, this.sessionTTL) + } - if (output) { - const newOutputMessage = new AIMessage(output.text) - const messageToAdd = [newOutputMessage].map((msg) => msg.toDict()) - await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0])) - if (this.sessionTTL) await this.redisClient.expire(id, this.sessionTTL) - } + if (output) { + const newOutputMessage = new AIMessage(output.text) + const messageToAdd = [newOutputMessage].map((msg) => msg.toDict()) + await client.lpush(id, JSON.stringify(messageToAdd[0])) + if (this.sessionTTL) await client.expire(id, this.sessionTTL) + } + }) } async clearChatMessages(overrideSessionId = ''): Promise { - if (!this.redisClient) return - - await this.openClosedConnection() - - const id = overrideSessionId ? overrideSessionId : this.sessionId - await this.redisClient.del(id) - await this.clear() + await this.withRedisClient(async (client) => { + const id = overrideSessionId ? overrideSessionId : this.sessionId + await client.del(id) + await this.clear() + }) } }