Skip to content

Commit

Permalink
Merge branch 'kafka-chromadb' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
serefyarar committed Feb 8, 2024
2 parents e54cfa0 + 0cbb5ed commit c0bd3ed
Show file tree
Hide file tree
Showing 13 changed files with 168 additions and 122 deletions.
5 changes: 5 additions & 0 deletions .env_example
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
REDIS_CONNECTION_STRING=redis://redis-master.env-dev:6379
CERAMIC_HOST=http://composedb.env-dev:7007
PORT=8000
KAFKA_HOST=kafka.env-dev:9092
INDEXER_DID_SESSION=
3 changes: 2 additions & 1 deletion api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
"version": "0.0.1",
"scripts": {
"api": "node --experimental-specifier-resolution=node --experimental-fetch src/packages/api.js",
"dev-api": "nodemon --experimental-fetch src/packages/api.js",
"dev-api": "nodemon --experimental-fetch src/packages/api.js",
"dev-kafka": "nodemon --experimental-fetch src/packages/consumer.js",
"kafka-consumer": "node --experimental-fetch src/packages/consumer.js",
"swagger": "npx swagger-typescript-api -p protocol.yaml -o ./src -n protocol.ts",
"reIndex": "node --experimental-fetch src/utils/reIndex.js"
Expand Down
65 changes: 53 additions & 12 deletions api/src/libs/kafka-indexer.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ if(process.env.NODE_ENV !== 'production'){
dotenv.config()
}

// Index Item (C)
export const createIndexItemEvent = async (id) => {
console.log("createIndexItemEvent", id)

Expand All @@ -17,10 +18,18 @@ export const createIndexItemEvent = async (id) => {
try {

const indexSession = await getPKPSessionForIndexer(indexItem.index);

await indexSession.did.authenticate();

// TODO:
// Crawl can be null due to some reasons,
// need to handle separately.
console.log("Indexer Item URL", `${process.env.LLM_INDEXER_HOST}/indexer/embedding`)
console.log("Indexer Item", indexItem.item.content)
console.log("Indexer Item", indexItem.item.title)

const embeddingResponse = await axios.post(`${process.env.LLM_INDEXER_HOST}/indexer/embedding`, {
text: indexItem.item.content
content: 'This is a test content'
})
const embeddingService = new EmbeddingService().setSession(indexSession)
const embedding = await embeddingService.createEmbedding({
Expand All @@ -39,22 +48,22 @@ export const createIndexItemEvent = async (id) => {
}

}

// Index item (UD)
export const updateIndexItemEvent = async (id) => {
console.log("updateIndexItemEvent", id)
}

export const updateWebPageEvent = async (id) => {
console.log("updateWebPageEvent", id)

const itemService = new ItemService()
const webPage = await itemService.getIndexItemById(id);
const indexItem = await itemService.getIndexItemById(id, false);


try {

const indexSession = await getPKPSession(webPage.index);
await indexSession.did.authenticate();
const indexSession = await getPKPSessionForIndexer(indexItem.index);

if (webPage.item.deletedAt) {
await indexSession.did.authenticate();

if (webPage.deletedAt) {
const deleteResponse = await axios.delete(`${process.env.LLM_INDEXER_HOST}/indexer/item:${webPage.item.id}`);

if (deleteResponse.status === 200) {
Expand All @@ -64,8 +73,6 @@ export const updateWebPageEvent = async (id) => {
}
}

console.log(`${JSON.stringify(webPage.item)} `)

const updateResponse = await axios.put(
`${process.env.LLM_INDEXER_HOST}/indexer/index`,
{
Expand All @@ -77,6 +84,40 @@ export const updateWebPageEvent = async (id) => {
} else {
console.log("IndexItem Update Failed.")
}

//
const embeddingResponse = await axios.post(`${process.env.LLM_INDEXER_HOST}/indexer/embedding`, {
content: indexItem.item.content
})
const embeddingService = new EmbeddingService().setSession(indexSession)
const embedding = await embeddingService.createEmbedding({
"indexId": indexItem.indexId,
"itemId": indexItem.itemId,
"modelName": embeddingResponse.data.model,
"category": "document",
"vector": embeddingResponse.data.vector,
"description": "Default document embeddings",
});

console.log("Embedding created", embedding.id)

} catch (e) {
console.log("Indexer updateIndexItemEvent error:", e.message);
}
}

export const updateWebPageEvent = async (id) => {
console.log("updateWebPageEvent", id)

const itemService = new ItemService()
const webPage = await itemService.getIndexItemById(id, false);

try {

const indexSession = await getPKPSession(webPage.index);
await indexSession.did.authenticate();

console.log("Indexer Item", webPage.deletedAt)

} catch (e) {
console.log(e)
Expand Down Expand Up @@ -119,11 +160,11 @@ export const createEmbeddingEvent = async (id) => {

try {
const indexResponse = await axios.post(`${process.env.LLM_INDEXER_HOST}/indexer/index`, payload)
console.log("IndexItem Indexed with it's content and embeddings.")
} catch (e) {
console.log(e)
}

console.log("IndexItem Indexed with it's content and embeddings.")

}

Expand Down
5 changes: 2 additions & 3 deletions api/src/libs/lit/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ export const getPKPSession = async (session, index) => {
}
}



const authSig = {
signedMessage: SiweMessage.fromCacao(session.cacao).toMessage(),
address: getAddress(session.did.parent.split(":").pop()),
Expand All @@ -169,6 +167,7 @@ export const getPKPSession = async (session, index) => {
const litNodeClient = new LitJsSdk.LitNodeClientNodeJs({
litNetwork: 'cayenne',
debug: false,
// debug: true,
});
await litNodeClient.connect();
const signerFunctionV0 = CID.parse(index.signerFunction).toV0().toString();
Expand All @@ -177,7 +176,7 @@ export const getPKPSession = async (session, index) => {
authSig,
jsParams: {
authSig,
chain: "ethereum",
chain: "ethereum", // polygon
publicKey: index.signerPublicKey,
didKey: didKey.id,
nonce: randomString(10),
Expand Down
2 changes: 1 addition & 1 deletion api/src/packages/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async function start() {

await redis.connect()
const consumerItems = kafka.consumer({
groupId: `index-consumer-dev-12`,
groupId: `index-consumer-dev-13`,
sessionTimeout: 300000,
heartbeatInterval: 10000,
rebalanceTimeout: 3000,
Expand Down
1 change: 1 addition & 0 deletions indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"@nestjs/microservices": "^10.3.0",
"@nestjs/platform-express": "^10.0.0",
"@nestjs/swagger": "^7.2.0",
"@types/json-patch": "^0.0.33",
"axios": "^1.6.5",
"chromadb": "^1.7.3",
"class-transformer": "^0.5.1",
Expand Down
111 changes: 35 additions & 76 deletions indexer/src/app/modules/agent.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ export class Agent {
}
}

public async createRetrieverChain(chain_type: string = 'query-v0', index_ids: string[], model_type: string = 'OpenAI', page: number, skip: number, limit: number): Promise<any> {
public async createRetrieverChain(chain_type: string = 'query-v0', index_ids: string[], model_type: string = 'OpenAI', page: number, limit: number): Promise<any> {

switch (chain_type) {

case 'query-v0':
return this.createQueryRetriever(index_ids, model_type, page, skip, limit);
return this.createQueryRetriever(index_ids, model_type, page, limit);

default:
throw new Error('Chain type not supported');
Expand All @@ -67,23 +67,32 @@ export class Agent {
private async createRAGChain (chroma_indices: string[], model_type: string): Promise<any>{

let model: any;
if (model_type == 'OpenAI') { model = new ChatOpenAI({ modelName: process.env.MODEL_CHAT }) }
else if (model_type == 'MistralAI') { model = new ChatMistralAI({ modelName: process.env.MISTRAL_MODEL_CHAT, apiKey: process.env.MISTRAL_API_KEY }) }
if (model_type == 'OpenAI') {
model = new ChatOpenAI({
modelName: process.env.MODEL_CHAT,
streaming: true,
})
} else if (model_type == 'MistralAI') { model = new ChatMistralAI({ modelName: process.env.MISTRAL_MODEL_CHAT, apiKey: process.env.MISTRAL_API_KEY }) }

const vectorStore = await Chroma.fromExistingCollection(
new OpenAIEmbeddings({ modelName: process.env.MODEL_EMBEDDING}),
{
url: process.env.CHROMA_URL,
collectionName: process.env.CHROMA_COLLECTION_NAME,
filter: {
indexId: chroma_indices
indexId: {
$in: chroma_indices
}
}
});

const documentCount = await vectorStore.collection.count();
if (!documentCount) throw new Error('Vector store not found');

const retriever = vectorStore.asRetriever();


// TODO: Prior information context -> glossary, etc.

// Prompt link: https://langstream.ai/2023/10/13/rag-chatbot-with-conversation/
const questionPrompt = PromptTemplate.fromTemplate(`
Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, in its original language.
Expand Down Expand Up @@ -126,7 +135,6 @@ export class Agent {
}

// const selfAskPrompt = await pull("hwchase17/self-ask-with-search")

const standalone_question = RunnableSequence.from([
{
question: (input) => input.question,
Expand All @@ -137,7 +145,6 @@ export class Agent {
new StringOutputParser(),
]);


const answerChain = RunnableSequence.from([
{
context: RunnableSequence.from([
Expand All @@ -156,15 +163,12 @@ export class Agent {
context: (input) => {

// Logger.log(input.context, "ChatService:answerChain:inputContext");

const docs_with_metadata = input.context.docs.map((doc: any) => {
return JSON.stringify(doc.metadata) + "\n" + doc.pageContent
})

// const serialized = formatDocumentsAsString(input.context.docs)
const serialized = docs_with_metadata.join("\n");

Logger.log(serialized.length, "ChatService:answerChain:contextLength");
// Logger.log(serialized?.length, "ChatService:answerChain:contextLength");
return serialized
},
question: (input) => {
Expand All @@ -185,77 +189,17 @@ export class Agent {
}
])


const final_chain = standalone_question.pipe(answerChain);

return final_chain

}

private async createQueryRetriever (chroma_indices: string[], model_type: string, page: number, skip: number, limit: number) {
private async createQueryRetriever (chroma_indices: string[], model_type: string, page: number, limit: number) {

// Not implemented yet
// https://js.langchain.com/docs/modules/data_connection/retrievers/self_query/chroma-self-query

const attributeInfo: AttributeInfo[] = [
{
name: 'indexId',
description: "The id of the generated index",
type: "string",
},
{
name: 'indexTitle',
description: "The title of the generated index",
type: "string",
},
{
name: 'indexCreatedAt',
description: 'Date of index creation',
type: 'date',
},
{
name: 'indexUpdatedAt',
description: 'Date of last update of the index',
type: 'date',
},
{
name: "webPageId",
description: 'The id of the indexed document (html, docx, pdf, etc)',
type: 'string',
},
{
name: "webPageUrl",
description: 'The web url of the indexed document (html, docx, pdf, etc)',
type: 'string',
},
{
name: 'webPageTitle',
description: "The title of the indexed document (html, docx, pdf, etc)",
type: 'string',
},
{
name: "webPageCreatedAt",
description: 'The date of creation the indexed document',
type: 'string',
},
{
name: "webPageContent",
description: 'The content of the indexed document',
type: 'string',
},
{
name: "webPageUpdatedAt",
description: 'The date of last update of the indexed document',
type: 'string',
},
{
name: "webPageDeletedAt",
description: 'The date of deletion of the indexed document',
type: 'string',
},

];
const documentContents = 'Document metadata'

let model: any;
if (model_type == 'OpenAI' ) { model = new ChatOpenAI({ modelName: process.env.MODEL_CHAT }) }
Expand All @@ -275,14 +219,28 @@ export class Agent {
url: process.env.CHROMA_URL,
collectionName: process.env.CHROMA_COLLECTION_NAME,
filter: {
indexId: chroma_indices
$and: [
{
indexId: {
$in: chroma_indices
}
},
{
webPageContent: {
$exists: true
}
}
]
}
});


const final_chain = RunnableSequence.from([
{
documents: async (input) => {
// Get embeddings of the query
const queryEmbedding = await embeddings.embedQuery(input.query)
// Fetch most similar semantic content according to query
const docs = await vectorStore.collection.query({
queryEmbeddings: [queryEmbedding],
nResults: (page * limit),
Expand All @@ -292,9 +250,9 @@ export class Agent {
},
{
documents: (input) => {
// Return ids and similarities
const ids = input.documents?.ids[0]
const similarities = input.documents?.distances[0]

return ids.map(function(id, i) {
return {
id: id,
Expand All @@ -303,6 +261,7 @@ export class Agent {
});
}
},
// Add pagination to retrieved documents
RunnableLambda.from((input) => {
return input.documents.slice((page-1)*limit, page*limit)
})
Expand Down
Loading

0 comments on commit c0bd3ed

Please sign in to comment.