diff --git a/src/plugins/data_source/common/index.ts b/src/plugins/data_source/common/index.ts index bf5c6b1b019..a2ae4186834 100644 --- a/src/plugins/data_source/common/index.ts +++ b/src/plugins/data_source/common/index.ts @@ -5,5 +5,7 @@ export const PLUGIN_ID = 'dataSource'; export const PLUGIN_NAME = 'data_source'; +export const DATA_SOURCE_SAVED_OBJECT_TYPE = 'data-source'; +export const CREDENTIAL_SAVED_OBJECT_TYPE = 'credential'; export { Credential } from './credentials'; diff --git a/src/plugins/data_source/config.ts b/src/plugins/data_source/config.ts index 5a8a654cd05..95e8bc3f96b 100644 --- a/src/plugins/data_source/config.ts +++ b/src/plugins/data_source/config.ts @@ -29,6 +29,9 @@ export const configSchema = schema.object({ defaultValue: new Array(32).fill(0), }), }), + clientPool: schema.object({ + size: schema.number({ defaultValue: 5 }), + }), }); export type DataSourcePluginConfigType = TypeOf; diff --git a/src/plugins/data_source/server/client/client_config.test.ts b/src/plugins/data_source/server/client/client_config.test.ts new file mode 100644 index 00000000000..39a3607ccba --- /dev/null +++ b/src/plugins/data_source/server/client/client_config.test.ts @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +import { DataSourcePluginConfigType } from '../../config'; +import { parseClientOptions } from './client_config'; + +const TEST_DATA_SOURCE_ENDPOINT = 'http://datasource.com'; + +const config = { + enabled: true, + clientPool: { + size: 5, + }, +} as DataSourcePluginConfigType; + +describe('parseClientOptions', () => { + test('include the ssl client configs as defaults', () => { + expect(parseClientOptions(config, TEST_DATA_SOURCE_ENDPOINT)).toEqual( + expect.objectContaining({ + node: TEST_DATA_SOURCE_ENDPOINT, + ssl: { + requestCert: true, + rejectUnauthorized: true, + }, + }) + ); + }); +}); diff --git a/src/plugins/data_source/server/client/client_config.ts b/src/plugins/data_source/server/client/client_config.ts new file mode 100644 index 00000000000..5973e5a0813 --- /dev/null +++ b/src/plugins/data_source/server/client/client_config.ts @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { ClientOptions } from '@opensearch-project/opensearch'; +import { DataSourcePluginConfigType } from '../../config'; + +/** + * Parse the client options from given data source config and endpoint + * + * @param config The config to generate the client options from. + * @param endpoint endpoint url of data source + */ +export function parseClientOptions( + // TODO: will use client configs, that comes from a merge result of user config and default opensearch client config, + config: DataSourcePluginConfigType, + endpoint: string +): ClientOptions { + const clientOptions: ClientOptions = { + node: endpoint, + ssl: { + requestCert: true, + rejectUnauthorized: true, + }, + }; + + return clientOptions; +} diff --git a/src/plugins/data_source/server/client/client_pool.test.ts b/src/plugins/data_source/server/client/client_pool.test.ts new file mode 100644 index 00000000000..92320e9610a --- /dev/null +++ b/src/plugins/data_source/server/client/client_pool.test.ts @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { loggingSystemMock } from '../../../../core/server/mocks'; +import { DataSourcePluginConfigType } from '../../config'; +import { OpenSearchClientPool } from './client_pool'; + +const logger = loggingSystemMock.create(); + +describe('Client Pool', () => { + let service: OpenSearchClientPool; + let config: DataSourcePluginConfigType; + + beforeEach(() => { + const mockLogger = logger.get('dataSource'); + service = new OpenSearchClientPool(mockLogger); + config = { + enabled: true, + clientPool: { + size: 5, + }, + } as DataSourcePluginConfigType; + }); + + afterEach(() => { + service.stop(); + jest.clearAllMocks(); + }); + + describe('setup()', () => { + test('exposes proper contract', async () => { + const setup = await service.setup(config); + expect(setup).toHaveProperty('getClientFromPool'); + expect(setup).toHaveProperty('addClientToPool'); + }); + }); +}); diff --git a/src/plugins/data_source/server/client/client_pool.ts b/src/plugins/data_source/server/client/client_pool.ts new file mode 100644 index 00000000000..fe6f20e51ae --- /dev/null +++ b/src/plugins/data_source/server/client/client_pool.ts @@ -0,0 +1,76 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { Client } from '@opensearch-project/opensearch'; +import LRUCache from 'lru-cache'; +import { Logger } from 'src/core/server'; +import { DataSourcePluginConfigType } from '../../config'; + +export interface OpenSearchClientPoolSetup { + getClientFromPool: (id: string) => Client | undefined; + addClientToPool: (endpoint: string, client: Client) => void; +} + +/** + * OpenSearch client pool. + * + * This client pool uses an LRU cache to manage OpenSearch Js client objects. + * It reuse TPC connections for each OpenSearch endpoint. + */ +export class OpenSearchClientPool { + // LRU cache + // key: data source endpoint url + // value: OpenSearch client object + private cache?: LRUCache; + private isClosed = false; + + constructor(private logger: Logger) {} + + public async setup(config: DataSourcePluginConfigType) { + const logger = this.logger; + const { size } = config.clientPool; + + this.cache = new LRUCache({ + max: size, + maxAge: 15 * 60 * 1000, // by default, TCP connection times out in 15 minutes + + async dispose(endpoint, client) { + try { + await client.close(); + } catch (error: any) { + // log and do nothing since we are anyways evicting the client object from cache + logger.warn( + `Error closing OpenSearch client when removing from client pool: ${error.message}` + ); + } + }, + }); + this.logger.info(`Created data source client pool of size ${size}`); + + const getClientFromPool = (endpoint: string) => { + return this.cache!.get(endpoint); + }; + + const addClientToPool = (endpoint: string, client: Client) => { + this.cache!.set(endpoint, client); + }; + + return { + getClientFromPool, + addClientToPool, + }; + } + + start() {} + + // close all clients in the pool + async stop() { + if (this.isClosed) { + return; + } + this.isClosed = true; + await Promise.all(this.cache!.values().map((client) => client.close())); + } +} diff --git a/src/plugins/data_source/server/client/configure_client.test.mocks.ts b/src/plugins/data_source/server/client/configure_client.test.mocks.ts new file mode 100644 index 00000000000..38a585ff202 --- /dev/null +++ b/src/plugins/data_source/server/client/configure_client.test.mocks.ts @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +export const ClientMock = jest.fn(); +jest.doMock('@opensearch-project/opensearch', () => { + const actual = jest.requireActual('@opensearch-project/opensearch'); + return { + ...actual, + Client: ClientMock, + }; +}); + +export const parseClientOptionsMock = jest.fn(); +jest.doMock('./client_config', () => ({ + parseClientOptions: parseClientOptionsMock, +})); diff --git a/src/plugins/data_source/server/client/configure_client.test.ts b/src/plugins/data_source/server/client/configure_client.test.ts new file mode 100644 index 00000000000..70dd078d4fd --- /dev/null +++ b/src/plugins/data_source/server/client/configure_client.test.ts @@ -0,0 +1,135 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { SavedObjectsClientContract } from '../../../../core/server'; +import { loggingSystemMock, savedObjectsClientMock } from '../../../../core/server/mocks'; +import { DATA_SOURCE_SAVED_OBJECT_TYPE, CREDENTIAL_SAVED_OBJECT_TYPE } from '../../common'; +import { + CredentialMaterialsType, + CredentialSavedObjectAttributes, +} from '../../common/credentials/types'; +import { DataSourceAttributes } from '../../common/data_sources'; +import { DataSourcePluginConfigType } from '../../config'; +import { ClientMock, parseClientOptionsMock } from './configure_client.test.mocks'; +import { OpenSearchClientPoolSetup } from './client_pool'; +import { configureClient } from './configure_client'; +import { ClientOptions } from '@opensearch-project/opensearch'; +// eslint-disable-next-line @osd/eslint/no-restricted-paths +import { opensearchClientMock } from '../../../../core/server/opensearch/client/mocks'; + +const DATA_SOURCE_ID = 'a54b76ec86771ee865a0f74a305dfff8'; +const CREDENETIAL_ID = 'a54dsaadasfasfwe22d23d23d2453df3'; + +describe('configureClient', () => { + let logger: ReturnType; + let config: DataSourcePluginConfigType; + let savedObjectsMock: jest.Mocked; + let clientPoolSetup: OpenSearchClientPoolSetup; + let clientOptions: ClientOptions; + let dataSourceAttr: DataSourceAttributes; + let dsClient: ReturnType; + + beforeEach(() => { + dsClient = opensearchClientMock.createInternalClient(); + logger = loggingSystemMock.createLogger(); + savedObjectsMock = savedObjectsClientMock.create(); + + config = { + enabled: true, + clientPool: { + size: 5, + }, + } as DataSourcePluginConfigType; + clientOptions = { + nodes: 'http://localhost', + ssl: { + requestCert: true, + rejectUnauthorized: true, + }, + } as ClientOptions; + dataSourceAttr = { + title: 'title', + endpoint: 'http://localhost', + noAuth: false, + } as DataSourceAttributes; + + clientPoolSetup = { + getClientFromPool: jest.fn(), + addClientToPool: jest.fn(), + }; + + const crendentialAttr = { + title: 'cred', + credentialMaterials: { + credentialMaterialsType: CredentialMaterialsType.UsernamePasswordType, + credentialMaterialsContent: { + username: 'username', + password: 'password', + }, + }, + } as CredentialSavedObjectAttributes; + + savedObjectsMock.get + .mockResolvedValueOnce({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: dataSourceAttr, + references: [{ name: 'user', type: CREDENTIAL_SAVED_OBJECT_TYPE, id: CREDENETIAL_ID }], + }) + .mockResolvedValueOnce({ + id: CREDENETIAL_ID, + type: CREDENTIAL_SAVED_OBJECT_TYPE, + attributes: crendentialAttr, + references: [], + }); + + ClientMock.mockImplementation(() => { + return dsClient; + }); + }); + + afterEach(() => { + ClientMock.mockReset(); + }); + // TODO: mark as skip until we fix the issue of mocking "@opensearch-project/opensearch" + test('configure client with noAuth == true, will call new Client() to create client', async () => { + savedObjectsMock.get.mockReset().mockResolvedValueOnce({ + id: DATA_SOURCE_ID, + type: DATA_SOURCE_SAVED_OBJECT_TYPE, + attributes: { ...dataSourceAttr, noAuth: true }, + references: [], + }); + + parseClientOptionsMock.mockReturnValue(clientOptions); + + const client = await configureClient( + DATA_SOURCE_ID, + savedObjectsMock, + clientPoolSetup, + config, + logger + ); + + expect(parseClientOptionsMock).toHaveBeenCalled(); + expect(ClientMock).toHaveBeenCalledTimes(1); + expect(ClientMock).toHaveBeenCalledWith(clientOptions); + expect(savedObjectsMock.get).toHaveBeenCalledTimes(1); + expect(client).toBe(dsClient.child.mock.results[0].value); + }); + + test('configure client with noAuth == false, will first call new Client()', async () => { + const client = await configureClient( + DATA_SOURCE_ID, + savedObjectsMock, + clientPoolSetup, + config, + logger + ); + + expect(ClientMock).toHaveBeenCalledTimes(1); + expect(savedObjectsMock.get).toHaveBeenCalledTimes(2); + expect(client).toBe(dsClient.child.mock.results[0].value); + }); +}); diff --git a/src/plugins/data_source/server/client/configure_client.ts b/src/plugins/data_source/server/client/configure_client.ts new file mode 100644 index 00000000000..15a824dabae --- /dev/null +++ b/src/plugins/data_source/server/client/configure_client.ts @@ -0,0 +1,124 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { Client } from '@opensearch-project/opensearch'; +import { + Logger, + SavedObject, + SavedObjectsClientContract, + SavedObjectsErrorHelpers, +} from '../../../../../src/core/server'; +import { DATA_SOURCE_SAVED_OBJECT_TYPE, CREDENTIAL_SAVED_OBJECT_TYPE } from '../../common'; +import { CredentialSavedObjectAttributes } from '../../common/credentials/types'; +import { DataSourceAttributes } from '../../common/data_sources'; +import { DataSourcePluginConfigType } from '../../config'; +import { parseClientOptions } from './client_config'; +import { OpenSearchClientPoolSetup } from './client_pool'; + +export const configureClient = async ( + dataSourceId: string, + savedObjects: SavedObjectsClientContract, + openSearchClientPoolSetup: OpenSearchClientPoolSetup, + config: DataSourcePluginConfigType, + logger: Logger +): Promise => { + const dataSource = await getDataSource(dataSourceId, savedObjects); + const rootClient = getRootClient(dataSource.attributes, config, openSearchClientPoolSetup); + + return getQueryClient(rootClient, dataSource, savedObjects); +}; + +export const getDataSource = async ( + dataSourceId: string, + savedObjects: SavedObjectsClientContract +): Promise> => { + try { + const dataSource = await savedObjects.get( + DATA_SOURCE_SAVED_OBJECT_TYPE, + dataSourceId + ); + return dataSource; + } catch (error: any) { + // it will cause 500 error when failed to get saved objects, need to handle such error gracefully + throw SavedObjectsErrorHelpers.createBadRequestError(error.message); + } +}; + +export const getCredential = async ( + credentialId: string, + savedObjects: SavedObjectsClientContract +): Promise> => { + try { + const credential = await savedObjects.get( + CREDENTIAL_SAVED_OBJECT_TYPE, + credentialId + ); + return credential; + } catch (error: any) { + // it will cause 500 error when failed to get saved objects, need to handle such error gracefully + throw SavedObjectsErrorHelpers.createBadRequestError(error.message); + } +}; + +/** + * Create a child client object with given auth info. + * + * @param rootClient root client for the connection with given data source endpoint. + * @param dataSource data source saved object + * @param savedObjects scoped saved object client + * @returns child client. + */ +const getQueryClient = async ( + rootClient: Client, + dataSource: SavedObject, + savedObjects: SavedObjectsClientContract +): Promise => { + if (dataSource.attributes.noAuth) { + return rootClient.child(); + } else { + const credential = await getCredential(dataSource.references[0].id, savedObjects); + return getBasicAuthClient(rootClient, credential.attributes); + } +}; + +/** + * Gets a root client object of the OpenSearch endpoint. + * Will attempt to get from cache, if cache miss, create a new one and load into cache. + * + * @param dataSourceAttr data source saved objects attributes. + * @param config data source config + * @returns OpenSearch client for the given data source endpoint. + */ +const getRootClient = ( + dataSourceAttr: DataSourceAttributes, + config: DataSourcePluginConfigType, + { getClientFromPool, addClientToPool }: OpenSearchClientPoolSetup +): Client => { + const endpoint = dataSourceAttr.endpoint; + const cachedClient = getClientFromPool(endpoint); + if (cachedClient) { + return cachedClient; + } else { + const clientOptions = parseClientOptions(config, endpoint); + + const client = new Client(clientOptions); + addClientToPool(endpoint, client); + + return client; + } +}; + +const getBasicAuthClient = ( + rootClient: Client, + credentialAttr: CredentialSavedObjectAttributes +): Client => { + const { username, password } = credentialAttr.credentialMaterials.credentialMaterialsContent; + return rootClient.child({ + auth: { + username, + password, + }, + }); +}; diff --git a/src/plugins/data_source/server/client/index.ts b/src/plugins/data_source/server/client/index.ts new file mode 100644 index 00000000000..8adc96115b9 --- /dev/null +++ b/src/plugins/data_source/server/client/index.ts @@ -0,0 +1,7 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +export { OpenSearchClientPool } from './client_pool'; +export { configureClient } from './configure_client'; diff --git a/src/plugins/data_source/server/data_source_service.test.ts b/src/plugins/data_source/server/data_source_service.test.ts new file mode 100644 index 00000000000..53dfb6f273e --- /dev/null +++ b/src/plugins/data_source/server/data_source_service.test.ts @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { loggingSystemMock } from '../../../core/server/mocks'; +import { DataSourcePluginConfigType } from '../config'; +import { DataSourceService } from './data_source_service'; + +const logger = loggingSystemMock.create(); + +describe('Data Source Service', () => { + let service: DataSourceService; + let config: DataSourcePluginConfigType; + + beforeEach(() => { + const mockLogger = logger.get('dataSource'); + service = new DataSourceService(mockLogger); + config = { + enabled: true, + clientPool: { + size: 5, + }, + } as DataSourcePluginConfigType; + }); + + afterEach(() => { + service.stop(); + jest.clearAllMocks(); + }); + + describe('setup()', () => { + test('exposes proper contract', async () => { + const setup = await service.setup(config); + expect(setup).toHaveProperty('getDataSourceClient'); + }); + }); +}); diff --git a/src/plugins/data_source/server/data_source_service.ts b/src/plugins/data_source/server/data_source_service.ts new file mode 100644 index 00000000000..5bb25492f59 --- /dev/null +++ b/src/plugins/data_source/server/data_source_service.ts @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { Logger, OpenSearchClient, SavedObjectsClientContract } from '../../../../src/core/server'; +import { DataSourcePluginConfigType } from '../config'; +import { OpenSearchClientPool, configureClient } from './client'; +export interface DataSourceServiceSetup { + getDataSourceClient: ( + dataSourceId: string, + // this saved objects client is used to fetch data source on behalf of users, caller should pass scoped saved objects client + savedObjects: SavedObjectsClientContract + ) => Promise; +} +export class DataSourceService { + private readonly openSearchClientPool: OpenSearchClientPool; + + constructor(private logger: Logger) { + this.openSearchClientPool = new OpenSearchClientPool(logger); + } + + async setup(config: DataSourcePluginConfigType) { + const openSearchClientPoolSetup = await this.openSearchClientPool.setup(config); + + const getDataSourceClient = async ( + dataSourceId: string, + savedObjects: SavedObjectsClientContract + ): Promise => { + return configureClient( + dataSourceId, + savedObjects, + openSearchClientPoolSetup, + config, + this.logger + ); + }; + + return { getDataSourceClient }; + } + + start() {} + + stop() { + this.openSearchClientPool.stop(); + } +} diff --git a/src/plugins/data_source/server/plugin.ts b/src/plugins/data_source/server/plugin.ts index 5a4ed403457..82a9ae65e6a 100644 --- a/src/plugins/data_source/server/plugin.ts +++ b/src/plugins/data_source/server/plugin.ts @@ -4,20 +4,28 @@ */ import { first } from 'rxjs/operators'; - -import { PluginInitializerContext, CoreSetup, CoreStart, Plugin, Logger } from 'src/core/server'; import { dataSource, credential, CredentialSavedObjectsClientWrapper } from './saved_objects'; import { DataSourcePluginConfigType } from '../config'; - +import { + PluginInitializerContext, + CoreSetup, + CoreStart, + Plugin, + Logger, + IContextProvider, + RequestHandler, +} from '../../../../src/core/server'; +import { DataSourceService, DataSourceServiceSetup } from './data_source_service'; import { DataSourcePluginSetup, DataSourcePluginStart } from './types'; - import { CryptographyClient } from './cryptography'; export class DataSourcePlugin implements Plugin { private readonly logger: Logger; + private readonly dataSourceService: DataSourceService; constructor(private initializerContext: PluginInitializerContext) { - this.logger = this.initializerContext.logger.get(); + this.logger = this.initializerContext.logger.get('dataSource'); + this.dataSourceService = new DataSourceService(this.logger); } public async setup(core: CoreSetup) { @@ -29,9 +37,11 @@ export class DataSourcePlugin implements Plugin(); + const config: DataSourcePluginConfigType = await config$.pipe(first()).toPromise(); + + // Fetch configs used to create credential saved objects client wrapper + const { wrappingKeyName, wrappingKeyNamespace, wrappingKey } = config.encryption; // Create credential saved objects client wrapper const credentialSavedObjectsClientWrapper = new CredentialSavedObjectsClientWrapper( @@ -45,6 +55,14 @@ export class DataSourcePlugin implements Plugin, 'dataSource'> => { + return (context, req) => { + return { + opensearch: { + getClient: (dataSourceId: string) => { + try { + return dataSourceService.getDataSourceClient( + dataSourceId, + context.core.savedObjects.client + ); + } catch (error: any) { + logger.error( + `Fail to get data source client for dataSourceId: [${dataSourceId}]. Detail: ${error.messages}` + ); + throw error; + } + }, + }, + }; + }; + }; } diff --git a/src/plugins/data_source/server/types.ts b/src/plugins/data_source/server/types.ts index a70c8c1bf19..bad309b4b87 100644 --- a/src/plugins/data_source/server/types.ts +++ b/src/plugins/data_source/server/types.ts @@ -3,6 +3,19 @@ * SPDX-License-Identifier: Apache-2.0 */ +import { OpenSearchClient } from 'src/core/server'; + +export interface DataSourcePluginRequestContext { + opensearch: { + getClient: (dataSourceId: string) => Promise; + }; +} +declare module 'src/core/server' { + interface RequestHandlerContext { + dataSource: DataSourcePluginRequestContext; + } +} + // eslint-disable-next-line @typescript-eslint/no-empty-interface export interface DataSourcePluginSetup {} // eslint-disable-next-line @typescript-eslint/no-empty-interface