diff --git a/api/src/controllers/discovery.js b/api/src/controllers/discovery.js index 1b38fd65..9bf9a86a 100644 --- a/api/src/controllers/discovery.js +++ b/api/src/controllers/discovery.js @@ -5,6 +5,7 @@ import { CeramicClient } from "@ceramicnetwork/http-client"; import RedisClient from "../clients/redis.js"; import { flattenSources } from "../utils/helpers.js"; import { DIDService } from "../services/did.js"; +import { DIDSession } from "did-session"; const redis = RedisClient.getInstance(); const pubSubClient = RedisClient.getPubSubInstance(); @@ -74,6 +75,81 @@ export const chat = async (req, res, next) => { } }; +const handleMessage = async (query, message, channel) => { + try { + const chatRequest = { + prompt: ` + For the given information, determine whether the new field might be relevant to the user's intent based on the question. You are a personal assistant whose task is to classify whether the new information is relevant or not. + If the information field might be NOT relevant, respond with "NOT_RELEVANT", and don't output anything else. If the information field might be relevant and new to the question, output an update in assistant tone for the user based on the information field. + You can use a conversational format (hypertext, links, etc.) in your message to extend the metadata of the information. + Do not add any HTML tags or title fields. Use links when useful. "Cast" means Farcaster Cast, so write your message accordingly. + Also, try to use a personal assistant tone, don't sell. + ---------------- + QUESTION: {query} + ---------------- + INFORMATION: {message} + `, + input: { + query, + message, + }, + }; + + const resp = await axios.post( + `${process.env.LLM_INDEXER_HOST}/chat/external`, + chatRequest, + { + responseType: "text", + }, + ); + + if (resp.data && !resp.data.includes("NOT_RELEVANT")) { + return resp.data; + } else { + console.log(`Not relevant`); + return false; + } + } catch (error) { + console.error("An error occurred:", error.message); + //throw new Error("Internal Server Error"); + } +}; + +export const updates = async (req, res, next) => { + const definition = req.app.get("runtimeDefinition"); + const { query, sources } = req.body; + + const session = await DIDSession.fromSession(req.query.session); + if (!session || !session.isAuthorized()) { + return res.status(401).json({ error: "Unauthorized" }); + } + + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", "no-cache"); + res.setHeader("Connection", "keep-alive"); + + const didService = new DIDService(definition); + const reqIndexIds = await flattenSources(sources, didService); + const reqIndexChannels = reqIndexIds.map((id) => `indexStream:${id}`); + + await pubSubClient.subscribe(reqIndexChannels, async (message, channel) => { + const response = await handleMessage(query, message, channel); + + if (response) { + const channelType = channel.replace(`indexStream:${reqIndexIds[0]}:`, ""); + res.write( + `data: ${JSON.stringify({ channel: channelType, data: JSON.parse(response) })}\n\n`, + ); + } + }); + + // Cleanup on client disconnect + req.on("close", () => { + pubSubClient.unsubscribe(reqIndexChannels); + res.end(); + }); +}; + export const search = async (req, res, next) => { try { const searchRequest = { diff --git a/api/src/packages/api.js b/api/src/packages/api.js index 6ac66b21..05c4c3b7 100644 --- a/api/src/packages/api.js +++ b/api/src/packages/api.js @@ -378,6 +378,17 @@ app.post( discoveryController.questions, ); +app.get( + "/discovery/updates", + validator.body( + Joi.object({ + sources: Joi.array().items(Joi.string()).required(), + query: Joi.string().min(1).optional(), + }), + ), + discoveryController.updates, +); + app.post( "/web2/webpage", authCheckMiddleware,