Skip to content

Commit

Permalink
Iterate
Browse files Browse the repository at this point in the history
  • Loading branch information
serefyarar committed Jun 1, 2024
1 parent 49a3482 commit da54d2a
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 65 deletions.
42 changes: 2 additions & 40 deletions api/src/controllers/discovery.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ export const chat = async (req, res, next) => {
messages,
}),
);
await redis.publish(`newSubscription`, id);
}
console.log("Stream ended", inferredCmd);
res.end();
Expand All @@ -76,46 +75,9 @@ export const chat = async (req, res, next) => {
};

export const updates = async (ws, req) => {
//const definition = req.app.get("runtimeDefinition");
//const { id, messages, sources, ...rest } = req.body;

const { chatId } = req.params;
await pubSubClient.subscribe(`newUpdate:${chatId}`, async (itemId) => {
console.log(chatId, itemId, `newUpdate`);
console.log("New update triggered, fetching item data", itemId);
const subscriptionResp = await redis.hGet(`subscriptions`, chatId);
if (!subscriptionResp) {
return;
}
const { indexIds, messages } = JSON.parse(subscriptionResp);
const itemStream = await ceramic.loadStream(itemId);
const chatRequest = {
indexIds,
input: {
question: `
Determine if the following information is relevant to the previous conversation.
If it is relevant, output a conversation simulating that you received real-time news for the user.
Use conversational output format suitable to data model, use bold texts and links when available.
Do not mention relevancy check, just share it as an update.
If it is not relevant, simply say "NOT_RELEVANT.
Information: ${JSON.stringify(itemStream.content)}
`,
chat_history: messages,
},
};
try {
let resp = await axios.post(
`${process.env.LLM_INDEXER_HOST}/chat/stream`,
chatRequest,
{
responseType: "text",
},
);
console.log("Update evaluation response", resp.data);
ws.send(resp.data);
} catch (e) {
console.log(e)
}
await pubSubClient.subscribe(`newUpdate:${chatId}`, async (data) => {
ws.send(data);
});
};
export const search = async (req, res, next) => {
Expand Down
81 changes: 66 additions & 15 deletions api/src/libs/indexer.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class Indexer {
// Check if the item is a webpage and has no content then return Exception
if (
indexItem.item.__typename === "WebPage" &&
(!indexItem.item.WebPage_content || indexItem.item.WebPage_content === "")
(!indexItem.item.WebPage_content ||
indexItem.item.WebPage_content === "")
) {
logger.warn(
"Step [0]: No content found, createIndexItem event incomplete",
Expand All @@ -71,6 +72,8 @@ class Indexer {
return;
}

await this.processAllSubscriptions(indexItem);

const embeddingResponse = await axios.post(
`${process.env.LLM_INDEXER_HOST}/indexer/embeddings`,
{
Expand Down Expand Up @@ -187,6 +190,8 @@ class Indexer {
`Step [2]: IndexItem UpdateEvent trigger for id: ${indexItem.id}`,
);

await this.processAllSubscriptions(indexItem);

const indexSession = await getPKPSessionForIndexer(indexItem.index);

const embeddingService = new EmbeddingService(
Expand Down Expand Up @@ -226,31 +231,77 @@ class Indexer {
}
}

async createEmbeddingEvent(id) {
logger.info(`Step [3]: createEmbeddingEvent trigger for id: ${id}`);

const embeddingService = new EmbeddingService(this.definition);
const embedding = await embeddingService.getEmbeddingById(id);
const stream = await ceramic.loadStream(embedding.item.id);
async processAllSubscriptions(indexItem) {
const itemStream = await ceramic.loadStream(indexItem.itemId);

const allSubscriptions = await redis.hGetAll(`subscriptions`);
for (const [chatId, subscriptionPayload] of Object.entries(
allSubscriptions,
)) {
// Parse the JSON string to an object
let subscription = JSON.parse(subscriptionPayload);
console.log(embedding.item.id, embedding.index.id, subscription.indexIds);
if (subscription.indexIds.indexOf(embedding.index.id) >= 0) {
await redis.publish(`newUpdate:${chatId}`, embedding.item.id);
console.log(indexItem.item.id, indexItem.index.id, subscription.indexIds);
if (subscription.indexIds.indexOf(indexItem.index.id) >= 0) {
await this.processSubscription(
chatId,
subscription,
itemStream.content,
);
}
}
}
async processSubscription(chatId, subscription, item) {
console.log(chatId, item, `newUpdate`);
console.log("New update triggered, fetching item data", item.id);
const subscriptionResp = await redis.hGet(`subscriptions`, chatId);
if (!subscriptionResp) {
return;
}
const { indexIds, messages } = subscription;
const chatRequest = {
indexIds,
input: {
question: `
Determine if the following information is relevant to the previous conversation.
If it is relevant, output a conversation simulating that you received real-time news for the user.
Use conversational output format suitable to data model, use bold texts and links when available.
Do not mention relevancy check, just share it as an update.
If it is not relevant, simply say "NOT_RELEVANT".
Information: ${JSON.stringify(item)}
`,
chat_history: messages,
},
};
try {
let resp = await axios.post(
`${process.env.LLM_INDEXER_HOST}/chat/stream`,
chatRequest,
{
responseType: "text",
},
);
console.log("Update evaluation response", resp.data);
if (resp.data && !resp.data.includes("NOT_RELEVANT")) {
await redis.publish(`newUpdate:${chatId}`, resp.data);
}
} catch (e) {
console.log(e);
}
}

async createEmbeddingEvent(id) {
logger.info(`Step [3]: createEmbeddingEvent trigger for id: ${id}`);

const embeddingService = new EmbeddingService(this.definition);
const embedding = await embeddingService.getEmbeddingById(id);
const itemStream = await ceramic.loadStream(embedding.item.id);

const doc = {
...stream.content,
id: stream.id.toString(),
controllerDID: stream.metadata.controller,
...itemStream.content,
id: itemStream.id.toString(),
controllerDID: itemStream.metadata.controller,
modelName: embedding.item.__typename,
};
console.log(doc);
const payload = {
indexId: embedding.index.id,
indexTitle: embedding.index.title,
Expand All @@ -273,7 +324,7 @@ class Indexer {
}

try {
const indexResponse = await axios.post(
await axios.post(
`${process.env.LLM_INDEXER_HOST}/indexer/index?indexId=${embedding.index.id}`,
payload,
);
Expand Down
29 changes: 19 additions & 10 deletions web-app/src/components/site/indexes/AskIndexes/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,6 @@ const AskIndexes: FC<AskIndexesProps> = ({ chatID, sources }) => {
fetchDefaultQuestions();
}, [fetchDefaultQuestions]);

const socketUrl = `${process.env.NEXT_PUBLIC_API_URL!.replace(/^https/, "wss")}${API_ENDPOINTS.DISCOVERY_UPDATES.replace(":chatID", chatID)}`;
const ws = new WebSocket(socketUrl);
ws.onmessage = async (event) => {
console.log(event);
await append({
id: chatID,
content: event.data,
role: "assistant",
});
};
const handleEditClick = (message: Message, indexOfMessage: number) => {
setEditingMessage(message);
setEditingIndex(indexOfMessage);
Expand Down Expand Up @@ -174,6 +164,25 @@ const AskIndexes: FC<AskIndexesProps> = ({ chatID, sources }) => {
scrollToBottom();
}, [messages, isLoading, scrollToBottom]);

useEffect(() => {
const socketUrl = `${process.env.NEXT_PUBLIC_API_URL!.replace(/^https/, "wss")}${API_ENDPOINTS.DISCOVERY_UPDATES.replace(":chatID", chatID)}`;
const ws = new WebSocket(socketUrl);

ws.onmessage = async (event) => {
setMessages([
...messages,
{
id: chatID,
content: event.data,
role: "assistant",
} as Message,
] as Message[]);
};

return () => {
ws.close();
};
}, [chatID, setMessages, messages]);
if (leftSectionIndexes.length === 0) {
return <NoIndexes tabKey={leftTabKey} />;
}
Expand Down

0 comments on commit da54d2a

Please sign in to comment.