From da54d2a06fa25204a713571d96bbe3c0978df9f0 Mon Sep 17 00:00:00 2001 From: serafettin Date: Sat, 1 Jun 2024 11:25:47 -0700 Subject: [PATCH] Iterate --- api/src/controllers/discovery.js | 42 +--------- api/src/libs/indexer.js | 81 +++++++++++++++---- .../site/indexes/AskIndexes/index.tsx | 29 ++++--- 3 files changed, 87 insertions(+), 65 deletions(-) diff --git a/api/src/controllers/discovery.js b/api/src/controllers/discovery.js index dee7540c..f4dbd3eb 100644 --- a/api/src/controllers/discovery.js +++ b/api/src/controllers/discovery.js @@ -63,7 +63,6 @@ export const chat = async (req, res, next) => { messages, }), ); - await redis.publish(`newSubscription`, id); } console.log("Stream ended", inferredCmd); res.end(); @@ -76,46 +75,9 @@ export const chat = async (req, res, next) => { }; export const updates = async (ws, req) => { - //const definition = req.app.get("runtimeDefinition"); - //const { id, messages, sources, ...rest } = req.body; - const { chatId } = req.params; - await pubSubClient.subscribe(`newUpdate:${chatId}`, async (itemId) => { - console.log(chatId, itemId, `newUpdate`); - console.log("New update triggered, fetching item data", itemId); - const subscriptionResp = await redis.hGet(`subscriptions`, chatId); - if (!subscriptionResp) { - return; - } - const { indexIds, messages } = JSON.parse(subscriptionResp); - const itemStream = await ceramic.loadStream(itemId); - const chatRequest = { - indexIds, - input: { - question: ` - Determine if the following information is relevant to the previous conversation. - If it is relevant, output a conversation simulating that you received real-time news for the user. - Use conversational output format suitable to data model, use bold texts and links when available. - Do not mention relevancy check, just share it as an update. - If it is not relevant, simply say "NOT_RELEVANT. - Information: ${JSON.stringify(itemStream.content)} - `, - chat_history: messages, - }, - }; - try { - let resp = await axios.post( - `${process.env.LLM_INDEXER_HOST}/chat/stream`, - chatRequest, - { - responseType: "text", - }, - ); - console.log("Update evaluation response", resp.data); - ws.send(resp.data); - } catch (e) { - console.log(e) - } + await pubSubClient.subscribe(`newUpdate:${chatId}`, async (data) => { + ws.send(data); }); }; export const search = async (req, res, next) => { diff --git a/api/src/libs/indexer.js b/api/src/libs/indexer.js index 05972876..41555fb9 100644 --- a/api/src/libs/indexer.js +++ b/api/src/libs/indexer.js @@ -49,7 +49,8 @@ class Indexer { // Check if the item is a webpage and has no content then return Exception if ( indexItem.item.__typename === "WebPage" && - (!indexItem.item.WebPage_content || indexItem.item.WebPage_content === "") + (!indexItem.item.WebPage_content || + indexItem.item.WebPage_content === "") ) { logger.warn( "Step [0]: No content found, createIndexItem event incomplete", @@ -71,6 +72,8 @@ class Indexer { return; } + await this.processAllSubscriptions(indexItem); + const embeddingResponse = await axios.post( `${process.env.LLM_INDEXER_HOST}/indexer/embeddings`, { @@ -187,6 +190,8 @@ class Indexer { `Step [2]: IndexItem UpdateEvent trigger for id: ${indexItem.id}`, ); + await this.processAllSubscriptions(indexItem); + const indexSession = await getPKPSessionForIndexer(indexItem.index); const embeddingService = new EmbeddingService( @@ -226,12 +231,8 @@ class Indexer { } } - async createEmbeddingEvent(id) { - logger.info(`Step [3]: createEmbeddingEvent trigger for id: ${id}`); - - const embeddingService = new EmbeddingService(this.definition); - const embedding = await embeddingService.getEmbeddingById(id); - const stream = await ceramic.loadStream(embedding.item.id); + async processAllSubscriptions(indexItem) { + const itemStream = await ceramic.loadStream(indexItem.itemId); const allSubscriptions = await redis.hGetAll(`subscriptions`); for (const [chatId, subscriptionPayload] of Object.entries( @@ -239,18 +240,68 @@ class Indexer { )) { // Parse the JSON string to an object let subscription = JSON.parse(subscriptionPayload); - console.log(embedding.item.id, embedding.index.id, subscription.indexIds); - if (subscription.indexIds.indexOf(embedding.index.id) >= 0) { - await redis.publish(`newUpdate:${chatId}`, embedding.item.id); + console.log(indexItem.item.id, indexItem.index.id, subscription.indexIds); + if (subscription.indexIds.indexOf(indexItem.index.id) >= 0) { + await this.processSubscription( + chatId, + subscription, + itemStream.content, + ); + } + } + } + async processSubscription(chatId, subscription, item) { + console.log(chatId, item, `newUpdate`); + console.log("New update triggered, fetching item data", item.id); + const subscriptionResp = await redis.hGet(`subscriptions`, chatId); + if (!subscriptionResp) { + return; + } + const { indexIds, messages } = subscription; + const chatRequest = { + indexIds, + input: { + question: ` + Determine if the following information is relevant to the previous conversation. + If it is relevant, output a conversation simulating that you received real-time news for the user. + Use conversational output format suitable to data model, use bold texts and links when available. + Do not mention relevancy check, just share it as an update. + If it is not relevant, simply say "NOT_RELEVANT". + Information: ${JSON.stringify(item)} + `, + chat_history: messages, + }, + }; + try { + let resp = await axios.post( + `${process.env.LLM_INDEXER_HOST}/chat/stream`, + chatRequest, + { + responseType: "text", + }, + ); + console.log("Update evaluation response", resp.data); + if (resp.data && !resp.data.includes("NOT_RELEVANT")) { + await redis.publish(`newUpdate:${chatId}`, resp.data); } + } catch (e) { + console.log(e); } + } + + async createEmbeddingEvent(id) { + logger.info(`Step [3]: createEmbeddingEvent trigger for id: ${id}`); + + const embeddingService = new EmbeddingService(this.definition); + const embedding = await embeddingService.getEmbeddingById(id); + const itemStream = await ceramic.loadStream(embedding.item.id); + const doc = { - ...stream.content, - id: stream.id.toString(), - controllerDID: stream.metadata.controller, + ...itemStream.content, + id: itemStream.id.toString(), + controllerDID: itemStream.metadata.controller, modelName: embedding.item.__typename, }; - console.log(doc); const payload = { indexId: embedding.index.id, indexTitle: embedding.index.title, @@ -273,7 +324,7 @@ class Indexer { } try { - const indexResponse = await axios.post( + await axios.post( `${process.env.LLM_INDEXER_HOST}/indexer/index?indexId=${embedding.index.id}`, payload, ); diff --git a/web-app/src/components/site/indexes/AskIndexes/index.tsx b/web-app/src/components/site/indexes/AskIndexes/index.tsx index f8fd2c0c..da4cc4f5 100644 --- a/web-app/src/components/site/indexes/AskIndexes/index.tsx +++ b/web-app/src/components/site/indexes/AskIndexes/index.tsx @@ -69,16 +69,6 @@ const AskIndexes: FC = ({ chatID, sources }) => { fetchDefaultQuestions(); }, [fetchDefaultQuestions]); - const socketUrl = `${process.env.NEXT_PUBLIC_API_URL!.replace(/^https/, "wss")}${API_ENDPOINTS.DISCOVERY_UPDATES.replace(":chatID", chatID)}`; - const ws = new WebSocket(socketUrl); - ws.onmessage = async (event) => { - console.log(event); - await append({ - id: chatID, - content: event.data, - role: "assistant", - }); - }; const handleEditClick = (message: Message, indexOfMessage: number) => { setEditingMessage(message); setEditingIndex(indexOfMessage); @@ -174,6 +164,25 @@ const AskIndexes: FC = ({ chatID, sources }) => { scrollToBottom(); }, [messages, isLoading, scrollToBottom]); + useEffect(() => { + const socketUrl = `${process.env.NEXT_PUBLIC_API_URL!.replace(/^https/, "wss")}${API_ENDPOINTS.DISCOVERY_UPDATES.replace(":chatID", chatID)}`; + const ws = new WebSocket(socketUrl); + + ws.onmessage = async (event) => { + setMessages([ + ...messages, + { + id: chatID, + content: event.data, + role: "assistant", + } as Message, + ] as Message[]); + }; + + return () => { + ws.close(); + }; + }, [chatID, setMessages, messages]); if (leftSectionIndexes.length === 0) { return ; }