Skip to content

Commit

Permalink
[Obs AI Assistant] Extract recallFromConnectors from KB service (#1…
Browse files Browse the repository at this point in the history
…86796)

Minor refactor to extract `recallFromConnectors` from the KB service.
This change is made to make it easier to unit test parts of it, and
reduce the size of the KB service.
  • Loading branch information
sorenlouv authored Jun 25, 2024
1 parent bb6be5c commit 5903654
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ export function registerElasticsearchFunction({
},
},
async ({ arguments: { method, path, body } }) => {
const response = await (
await resources.context.core
).elasticsearch.client.asCurrentUser.transport.request({
const esClient = (await resources.context.core).elasticsearch.client;
const response = esClient.asCurrentUser.transport.request({
method,
path,
body,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ export function registerGetDatasetInfoFunction({
async ({ arguments: { index }, messages, chat }, signal) => {
const coreContext = await resources.context.core;

const esClient = coreContext.elasticsearch.client.asCurrentUser;
const esClient = coreContext.elasticsearch.client;
const savedObjectsClient = coreContext.savedObjects.client;

let indices: string[] = [];

try {
const body = await esClient.indices.resolveIndex({
const body = await esClient.asCurrentUser.indices.resolveIndex({
name: index === '' ? '*' : index,
expand_wildcards: 'open',
});
Expand Down Expand Up @@ -81,7 +81,7 @@ export function registerGetDatasetInfoFunction({
const relevantFieldNames = await getRelevantFieldNames({
index,
messages,
esClient,
esClient: esClient.asCurrentUser,
dataViews: await resources.plugins.dataViews.start(),
savedObjectsClient,
signal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import type { Logger } from '@kbn/logging';
import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server';
import pLimit from 'p-limit';
import pRetry from 'p-retry';
import { isEmpty, map, orderBy } from 'lodash';
import { map, orderBy } from 'lodash';
import { encode } from 'gpt-tokenizer';
import { MlTrainedModelDeploymentNodesStats } from '@elastic/elasticsearch/lib/api/types';
import { aiAssistantSearchConnectorIndexPattern } from '../../../common';
import { INDEX_QUEUED_DOCUMENTS_TASK_ID, INDEX_QUEUED_DOCUMENTS_TASK_TYPE } from '..';
import { KnowledgeBaseEntry, KnowledgeBaseEntryRole, UserInstruction } from '../../../common/types';
import type { ObservabilityAIAssistantResourceNames } from '../types';
import { getAccessQuery } from '../util/get_access_query';
import { getCategoryQuery } from '../util/get_category_query';
import { recallFromConnectors } from './recall_from_connectors';

interface Dependencies {
esClient: { asInternalUser: ElasticsearchClient };
Expand Down Expand Up @@ -348,122 +348,6 @@ export class KnowledgeBaseService {
}));
}

private async getConnectorIndices(
esClient: { asCurrentUser: ElasticsearchClient },
uiSettingsClient: IUiSettingsClient
) {
// improve performance by running this in parallel with the `uiSettingsClient` request
const responsePromise = esClient.asCurrentUser.transport.request({
method: 'GET',
path: '_connector',
querystring: {
filter_path: 'results.index_name',
},
});

const customSearchConnectorIndex = await uiSettingsClient.get<string>(
aiAssistantSearchConnectorIndexPattern
);

if (customSearchConnectorIndex) {
return customSearchConnectorIndex.split(',');
}

const response = (await responsePromise) as { results?: Array<{ index_name: string }> };
const connectorIndices = response.results?.map((result) => result.index_name);

// preserve backwards compatibility with 8.14 (may not be needed in the future)
if (isEmpty(connectorIndices)) {
return ['search-*'];
}

return connectorIndices;
}

private async recallFromConnectors({
queries,
esClient,
uiSettingsClient,
modelId,
}: {
queries: Array<{ text: string; boost?: number }>;
esClient: { asCurrentUser: ElasticsearchClient };
uiSettingsClient: IUiSettingsClient;
modelId: string;
}): Promise<RecalledEntry[]> {
const ML_INFERENCE_PREFIX = 'ml.inference.';

const connectorIndices = await this.getConnectorIndices(esClient, uiSettingsClient);

const fieldCaps = await esClient.asCurrentUser.fieldCaps({
index: connectorIndices,
fields: `${ML_INFERENCE_PREFIX}*`,
allow_no_indices: true,
types: ['sparse_vector'],
filters: '-metadata,-parent',
});

const fieldsWithVectors = Object.keys(fieldCaps.fields).map((field) =>
field.replace('_expanded.predicted_value', '').replace(ML_INFERENCE_PREFIX, '')
);

if (!fieldsWithVectors.length) {
return [];
}

const esQueries = fieldsWithVectors.flatMap((field) => {
const vectorField = `${ML_INFERENCE_PREFIX}${field}_expanded.predicted_value`;
const modelField = `${ML_INFERENCE_PREFIX}${field}_expanded.model_id`;

return queries.map(({ text, boost = 1 }) => {
return {
bool: {
should: [
{
text_expansion: {
[vectorField]: {
model_text: text,
model_id: modelId,
boost,
},
},
},
],
filter: [
{
term: {
[modelField]: modelId,
},
},
],
},
};
});
});

const response = await esClient.asCurrentUser.search<unknown>({
index: connectorIndices,
query: {
bool: {
should: esQueries,
},
},
size: 20,
_source: {
exclude: ['_*', 'ml*'],
},
});

const results = response.hits.hits.map((hit) => ({
text: JSON.stringify(hit._source),
score: hit._score!,
is_correction: false,
id: hit._id,
}));

return results;
}

recall = async ({
user,
queries,
Expand Down Expand Up @@ -499,7 +383,7 @@ export class KnowledgeBaseService {
}
throw error;
}),
this.recallFromConnectors({
recallFromConnectors({
esClient,
uiSettingsClient,
queries,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { IUiSettingsClient } from '@kbn/core-ui-settings-server';
import { isEmpty } from 'lodash';
import { RecalledEntry } from '.';
import { aiAssistantSearchConnectorIndexPattern } from '../../../common';

export async function recallFromConnectors({
queries,
esClient,
uiSettingsClient,
modelId,
}: {
queries: Array<{ text: string; boost?: number }>;
esClient: { asCurrentUser: ElasticsearchClient };
uiSettingsClient: IUiSettingsClient;
modelId: string;
}): Promise<RecalledEntry[]> {
const ML_INFERENCE_PREFIX = 'ml.inference.';

const connectorIndices = await getConnectorIndices(esClient, uiSettingsClient);

const fieldCaps = await esClient.asCurrentUser.fieldCaps({
index: connectorIndices,
fields: `${ML_INFERENCE_PREFIX}*`,
allow_no_indices: true,
types: ['sparse_vector'],
filters: '-metadata,-parent',
});

const fieldsWithVectors = Object.keys(fieldCaps.fields).map((field) =>
field.replace('_expanded.predicted_value', '').replace(ML_INFERENCE_PREFIX, '')
);

if (!fieldsWithVectors.length) {
return [];
}

const esQueries = fieldsWithVectors.flatMap((field) => {
const vectorField = `${ML_INFERENCE_PREFIX}${field}_expanded.predicted_value`;
const modelField = `${ML_INFERENCE_PREFIX}${field}_expanded.model_id`;

return queries.map(({ text, boost = 1 }) => {
return {
bool: {
should: [
{
text_expansion: {
[vectorField]: {
model_text: text,
model_id: modelId,
boost,
},
},
},
],
filter: [
{
term: {
[modelField]: modelId,
},
},
],
},
};
});
});

const response = await esClient.asCurrentUser.search<unknown>({
index: connectorIndices,
query: {
bool: {
should: esQueries,
},
},
size: 20,
_source: {
exclude: ['_*', 'ml*'],
},
});

const results = response.hits.hits.map((hit) => ({
text: JSON.stringify(hit._source),
score: hit._score!,
is_correction: false,
id: hit._id,
}));

return results;
}

async function getConnectorIndices(
esClient: { asCurrentUser: ElasticsearchClient },
uiSettingsClient: IUiSettingsClient
) {
// improve performance by running this in parallel with the `uiSettingsClient` request
const responsePromise = esClient.asCurrentUser.transport.request({
method: 'GET',
path: '_connector',
querystring: {
filter_path: 'results.index_name',
},
});

const customSearchConnectorIndex = await uiSettingsClient.get<string>(
aiAssistantSearchConnectorIndexPattern
);

if (customSearchConnectorIndex) {
return customSearchConnectorIndex.split(',');
}

const response = (await responsePromise) as { results?: Array<{ index_name: string }> };
const connectorIndices = response.results?.map((result) => result.index_name);

// preserve backwards compatibility with 8.14 (may not be needed in the future)
if (isEmpty(connectorIndices)) {
return ['search-*'];
}

return connectorIndices;
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
"@kbn/features-plugin",
"@kbn/cloud-plugin",
"@kbn/serverless",
"@kbn/core-elasticsearch-server",
"@kbn/core-ui-settings-server",
],
"exclude": ["target/**/*"]
}

0 comments on commit 5903654

Please sign in to comment.