Skip to content

Commit

Permalink
[MD] Add client management module and register getClient() to route…
Browse files Browse the repository at this point in the history
… handler context (opensearch-project#2121)

* Add client management module and register `getClient()` interface to route handler context
Signed-off-by: Zhongnan Su <szhongna@amazon.com>
  • Loading branch information
zhongnansu authored and kristenTian committed Sep 12, 2022
1 parent 851e3dc commit 1ded394
Show file tree
Hide file tree
Showing 14 changed files with 614 additions and 9 deletions.
2 changes: 2 additions & 0 deletions src/plugins/data_source/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@

export const PLUGIN_ID = 'dataSource';
export const PLUGIN_NAME = 'data_source';
export const DATA_SOURCE_SAVED_OBJECT_TYPE = 'data-source';
export const CREDENTIAL_SAVED_OBJECT_TYPE = 'credential';

export { Credential } from './credentials';
3 changes: 3 additions & 0 deletions src/plugins/data_source/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ export const configSchema = schema.object({
defaultValue: new Array(32).fill(0),
}),
}),
clientPool: schema.object({
size: schema.number({ defaultValue: 5 }),
}),
});

export type DataSourcePluginConfigType = TypeOf<typeof configSchema>;
29 changes: 29 additions & 0 deletions src/plugins/data_source/server/client/client_config.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
import { DataSourcePluginConfigType } from '../../config';
import { parseClientOptions } from './client_config';

const TEST_DATA_SOURCE_ENDPOINT = 'http://datasource.com';

const config = {
enabled: true,
clientPool: {
size: 5,
},
} as DataSourcePluginConfigType;

describe('parseClientOptions', () => {
test('include the ssl client configs as defaults', () => {
expect(parseClientOptions(config, TEST_DATA_SOURCE_ENDPOINT)).toEqual(
expect.objectContaining({
node: TEST_DATA_SOURCE_ENDPOINT,
ssl: {
requestCert: true,
rejectUnauthorized: true,
},
})
);
});
});
29 changes: 29 additions & 0 deletions src/plugins/data_source/server/client/client_config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { ClientOptions } from '@opensearch-project/opensearch';
import { DataSourcePluginConfigType } from '../../config';

/**
* Parse the client options from given data source config and endpoint
*
* @param config The config to generate the client options from.
* @param endpoint endpoint url of data source
*/
export function parseClientOptions(
// TODO: will use client configs, that comes from a merge result of user config and default opensearch client config,
config: DataSourcePluginConfigType,
endpoint: string
): ClientOptions {
const clientOptions: ClientOptions = {
node: endpoint,
ssl: {
requestCert: true,
rejectUnauthorized: true,
},
};

return clientOptions;
}
39 changes: 39 additions & 0 deletions src/plugins/data_source/server/client/client_pool.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { loggingSystemMock } from '../../../../core/server/mocks';
import { DataSourcePluginConfigType } from '../../config';
import { OpenSearchClientPool } from './client_pool';

const logger = loggingSystemMock.create();

describe('Client Pool', () => {
let service: OpenSearchClientPool;
let config: DataSourcePluginConfigType;

beforeEach(() => {
const mockLogger = logger.get('dataSource');
service = new OpenSearchClientPool(mockLogger);
config = {
enabled: true,
clientPool: {
size: 5,
},
} as DataSourcePluginConfigType;
});

afterEach(() => {
service.stop();
jest.clearAllMocks();
});

describe('setup()', () => {
test('exposes proper contract', async () => {
const setup = await service.setup(config);
expect(setup).toHaveProperty('getClientFromPool');
expect(setup).toHaveProperty('addClientToPool');
});
});
});
76 changes: 76 additions & 0 deletions src/plugins/data_source/server/client/client_pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { Client } from '@opensearch-project/opensearch';
import LRUCache from 'lru-cache';
import { Logger } from 'src/core/server';
import { DataSourcePluginConfigType } from '../../config';

export interface OpenSearchClientPoolSetup {
getClientFromPool: (id: string) => Client | undefined;
addClientToPool: (endpoint: string, client: Client) => void;
}

/**
* OpenSearch client pool.
*
* This client pool uses an LRU cache to manage OpenSearch Js client objects.
* It reuse TPC connections for each OpenSearch endpoint.
*/
export class OpenSearchClientPool {
// LRU cache
// key: data source endpoint url
// value: OpenSearch client object
private cache?: LRUCache<string, Client>;
private isClosed = false;

constructor(private logger: Logger) {}

public async setup(config: DataSourcePluginConfigType) {
const logger = this.logger;
const { size } = config.clientPool;

this.cache = new LRUCache({
max: size,
maxAge: 15 * 60 * 1000, // by default, TCP connection times out in 15 minutes

async dispose(endpoint, client) {
try {
await client.close();
} catch (error: any) {
// log and do nothing since we are anyways evicting the client object from cache
logger.warn(
`Error closing OpenSearch client when removing from client pool: ${error.message}`
);
}
},
});
this.logger.info(`Created data source client pool of size ${size}`);

const getClientFromPool = (endpoint: string) => {
return this.cache!.get(endpoint);
};

const addClientToPool = (endpoint: string, client: Client) => {
this.cache!.set(endpoint, client);
};

return {
getClientFromPool,
addClientToPool,
};
}

start() {}

// close all clients in the pool
async stop() {
if (this.isClosed) {
return;
}
this.isClosed = true;
await Promise.all(this.cache!.values().map((client) => client.close()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

export const ClientMock = jest.fn();
jest.doMock('@opensearch-project/opensearch', () => {
const actual = jest.requireActual('@opensearch-project/opensearch');
return {
...actual,
Client: ClientMock,
};
});

export const parseClientOptionsMock = jest.fn();
jest.doMock('./client_config', () => ({
parseClientOptions: parseClientOptionsMock,
}));
135 changes: 135 additions & 0 deletions src/plugins/data_source/server/client/configure_client.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { SavedObjectsClientContract } from '../../../../core/server';
import { loggingSystemMock, savedObjectsClientMock } from '../../../../core/server/mocks';
import { DATA_SOURCE_SAVED_OBJECT_TYPE, CREDENTIAL_SAVED_OBJECT_TYPE } from '../../common';
import {
CredentialMaterialsType,
CredentialSavedObjectAttributes,
} from '../../common/credentials/types';
import { DataSourceAttributes } from '../../common/data_sources';
import { DataSourcePluginConfigType } from '../../config';
import { ClientMock, parseClientOptionsMock } from './configure_client.test.mocks';
import { OpenSearchClientPoolSetup } from './client_pool';
import { configureClient } from './configure_client';
import { ClientOptions } from '@opensearch-project/opensearch';
// eslint-disable-next-line @osd/eslint/no-restricted-paths
import { opensearchClientMock } from '../../../../core/server/opensearch/client/mocks';

const DATA_SOURCE_ID = 'a54b76ec86771ee865a0f74a305dfff8';
const CREDENETIAL_ID = 'a54dsaadasfasfwe22d23d23d2453df3';

describe('configureClient', () => {
let logger: ReturnType<typeof loggingSystemMock.createLogger>;
let config: DataSourcePluginConfigType;
let savedObjectsMock: jest.Mocked<SavedObjectsClientContract>;
let clientPoolSetup: OpenSearchClientPoolSetup;
let clientOptions: ClientOptions;
let dataSourceAttr: DataSourceAttributes;
let dsClient: ReturnType<typeof opensearchClientMock.createInternalClient>;

beforeEach(() => {
dsClient = opensearchClientMock.createInternalClient();
logger = loggingSystemMock.createLogger();
savedObjectsMock = savedObjectsClientMock.create();

config = {
enabled: true,
clientPool: {
size: 5,
},
} as DataSourcePluginConfigType;
clientOptions = {
nodes: 'http://localhost',
ssl: {
requestCert: true,
rejectUnauthorized: true,
},
} as ClientOptions;
dataSourceAttr = {
title: 'title',
endpoint: 'http://localhost',
noAuth: false,
} as DataSourceAttributes;

clientPoolSetup = {
getClientFromPool: jest.fn(),
addClientToPool: jest.fn(),
};

const crendentialAttr = {
title: 'cred',
credentialMaterials: {
credentialMaterialsType: CredentialMaterialsType.UsernamePasswordType,
credentialMaterialsContent: {
username: 'username',
password: 'password',
},
},
} as CredentialSavedObjectAttributes;

savedObjectsMock.get
.mockResolvedValueOnce({
id: DATA_SOURCE_ID,
type: DATA_SOURCE_SAVED_OBJECT_TYPE,
attributes: dataSourceAttr,
references: [{ name: 'user', type: CREDENTIAL_SAVED_OBJECT_TYPE, id: CREDENETIAL_ID }],
})
.mockResolvedValueOnce({
id: CREDENETIAL_ID,
type: CREDENTIAL_SAVED_OBJECT_TYPE,
attributes: crendentialAttr,
references: [],
});

ClientMock.mockImplementation(() => {
return dsClient;
});
});

afterEach(() => {
ClientMock.mockReset();
});
// TODO: mark as skip until we fix the issue of mocking "@opensearch-project/opensearch"
test('configure client with noAuth == true, will call new Client() to create client', async () => {
savedObjectsMock.get.mockReset().mockResolvedValueOnce({
id: DATA_SOURCE_ID,
type: DATA_SOURCE_SAVED_OBJECT_TYPE,
attributes: { ...dataSourceAttr, noAuth: true },
references: [],
});

parseClientOptionsMock.mockReturnValue(clientOptions);

const client = await configureClient(
DATA_SOURCE_ID,
savedObjectsMock,
clientPoolSetup,
config,
logger
);

expect(parseClientOptionsMock).toHaveBeenCalled();
expect(ClientMock).toHaveBeenCalledTimes(1);
expect(ClientMock).toHaveBeenCalledWith(clientOptions);
expect(savedObjectsMock.get).toHaveBeenCalledTimes(1);
expect(client).toBe(dsClient.child.mock.results[0].value);
});

test('configure client with noAuth == false, will first call new Client()', async () => {
const client = await configureClient(
DATA_SOURCE_ID,
savedObjectsMock,
clientPoolSetup,
config,
logger
);

expect(ClientMock).toHaveBeenCalledTimes(1);
expect(savedObjectsMock.get).toHaveBeenCalledTimes(2);
expect(client).toBe(dsClient.child.mock.results[0].value);
});
});
Loading

0 comments on commit 1ded394

Please sign in to comment.