Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MD] Register data source plugin interface to route handler context #2121

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/plugins/data_source/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@

export const PLUGIN_ID = 'dataSource';
export const PLUGIN_NAME = 'data_source';
export const DATA_SOURCE_SAVED_OBJECT_TYPE = 'data-source';
export const CREDENTIAL_SAVED_OBJECT_TYPE = 'credential';

export { Credential } from './credentials';
3 changes: 3 additions & 0 deletions src/plugins/data_source/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ export const configSchema = schema.object({
defaultValue: new Array(32).fill(0),
}),
}),
clientPool: schema.object({
size: schema.number({ defaultValue: 5 }),
}),
});

export type DataSourcePluginConfigType = TypeOf<typeof configSchema>;
29 changes: 29 additions & 0 deletions src/plugins/data_source/server/client/client_config.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
import { DataSourcePluginConfigType } from '../../config';
import { parseClientOptions } from './client_config';

const TEST_DATA_SOURCE_ENDPOINT = 'http://datasource.com';

const config = {
enabled: true,
clientPool: {
size: 5,
},
} as DataSourcePluginConfigType;

describe('parseClientOptions', () => {
test('include the ssl client configs as defaults', () => {
expect(parseClientOptions(config, TEST_DATA_SOURCE_ENDPOINT)).toEqual(
expect.objectContaining({
node: TEST_DATA_SOURCE_ENDPOINT,
ssl: {
requestCert: true,
rejectUnauthorized: true,
},
})
);
});
});
29 changes: 29 additions & 0 deletions src/plugins/data_source/server/client/client_config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { ClientOptions } from '@opensearch-project/opensearch';
import { DataSourcePluginConfigType } from '../../config';

/**
* Parse the client options from given data source config and endpoint
*
* @param config The config to generate the client options from.
* @param endpoint endpoint url of data source
*/
export function parseClientOptions(
// TODO: will use client configs, that comes from a merge result of user config and default opensearch client config,
config: DataSourcePluginConfigType,
endpoint: string
): ClientOptions {
const clientOptions: ClientOptions = {
node: endpoint,
ssl: {
requestCert: true,
rejectUnauthorized: true,
},
};

return clientOptions;
}
39 changes: 39 additions & 0 deletions src/plugins/data_source/server/client/client_pool.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { loggingSystemMock } from '../../../../core/server/mocks';
import { DataSourcePluginConfigType } from '../../config';
import { OpenSearchClientPool } from './client_pool';

const logger = loggingSystemMock.create();

describe('Client Pool', () => {
let service: OpenSearchClientPool;
let config: DataSourcePluginConfigType;

beforeEach(() => {
const mockLogger = logger.get('dataSource');
service = new OpenSearchClientPool(mockLogger);
config = {
enabled: true,
clientPool: {
size: 5,
},
} as DataSourcePluginConfigType;
});

afterEach(() => {
service.stop();
jest.clearAllMocks();
});

describe('setup()', () => {
test('exposes proper contract', async () => {
const setup = await service.setup(config);
expect(setup).toHaveProperty('getClientFromPool');
expect(setup).toHaveProperty('addClientToPool');
});
});
});
76 changes: 76 additions & 0 deletions src/plugins/data_source/server/client/client_pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { Client } from '@opensearch-project/opensearch';
import LRUCache from 'lru-cache';
import { Logger } from 'src/core/server';
import { DataSourcePluginConfigType } from '../../config';

export interface OpenSearchClientPoolSetup {
getClientFromPool: (id: string) => Client | undefined;
addClientToPool: (endpoint: string, client: Client) => void;
}

/**
* OpenSearch client pool.
*
* This client pool uses an LRU cache to manage OpenSearch Js client objects.
* It reuse TPC connections for each OpenSearch endpoint.
*/
export class OpenSearchClientPool {
// LRU cache
// key: data source endpoint url
// value: OpenSearch client object
private cache?: LRUCache<string, Client>;
private isClosed = false;

constructor(private logger: Logger) {}

public async setup(config: DataSourcePluginConfigType) {
const logger = this.logger;
const { size } = config.clientPool;

this.cache = new LRUCache({
max: size,
maxAge: 15 * 60 * 1000, // by default, TCP connection times out in 15 minutes

async dispose(endpoint, client) {
try {
await client.close();
} catch (error: any) {
// log and do nothing since we are anyways evicting the client object from cache
logger.warn(
`Error closing OpenSearch client when removing from client pool: ${error.message}`
);
}
},
});
this.logger.info(`Created data source client pool of size ${size}`);

const getClientFromPool = (endpoint: string) => {
return this.cache!.get(endpoint);
};

const addClientToPool = (endpoint: string, client: Client) => {
this.cache!.set(endpoint, client);
};

return {
getClientFromPool,
addClientToPool,
};
}

start() {}

// close all clients in the pool
async stop() {
if (this.isClosed) {
return;
}
this.isClosed = true;
await Promise.all(this.cache!.values().map((client) => client.close()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

export const ClientMock = jest.fn();
jest.doMock('@opensearch-project/opensearch', () => {
const actual = jest.requireActual('@opensearch-project/opensearch');
return {
...actual,
Client: ClientMock,
};
});

export const parseClientOptionsMock = jest.fn();
jest.doMock('./client_config', () => ({
parseClientOptions: parseClientOptionsMock,
}));
135 changes: 135 additions & 0 deletions src/plugins/data_source/server/client/configure_client.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { SavedObjectsClientContract } from '../../../../core/server';
import { loggingSystemMock, savedObjectsClientMock } from '../../../../core/server/mocks';
import { DATA_SOURCE_SAVED_OBJECT_TYPE, CREDENTIAL_SAVED_OBJECT_TYPE } from '../../common';
import {
CredentialMaterialsType,
CredentialSavedObjectAttributes,
} from '../../common/credentials/types';
import { DataSourceAttributes } from '../../common/data_sources';
import { DataSourcePluginConfigType } from '../../config';
import { ClientMock, parseClientOptionsMock } from './configure_client.test.mocks';
import { OpenSearchClientPoolSetup } from './client_pool';
import { configureClient } from './configure_client';
import { ClientOptions } from '@opensearch-project/opensearch';
// eslint-disable-next-line @osd/eslint/no-restricted-paths
import { opensearchClientMock } from '../../../../core/server/opensearch/client/mocks';

const DATA_SOURCE_ID = 'a54b76ec86771ee865a0f74a305dfff8';
const CREDENETIAL_ID = 'a54dsaadasfasfwe22d23d23d2453df3';

describe('configureClient', () => {
let logger: ReturnType<typeof loggingSystemMock.createLogger>;
let config: DataSourcePluginConfigType;
let savedObjectsMock: jest.Mocked<SavedObjectsClientContract>;
let clientPoolSetup: OpenSearchClientPoolSetup;
let clientOptions: ClientOptions;
let dataSourceAttr: DataSourceAttributes;
let dsClient: ReturnType<typeof opensearchClientMock.createInternalClient>;

beforeEach(() => {
dsClient = opensearchClientMock.createInternalClient();
logger = loggingSystemMock.createLogger();
savedObjectsMock = savedObjectsClientMock.create();

config = {
enabled: true,
clientPool: {
size: 5,
},
} as DataSourcePluginConfigType;
clientOptions = {
nodes: 'http://localhost',
ssl: {
requestCert: true,
rejectUnauthorized: true,
},
} as ClientOptions;
dataSourceAttr = {
title: 'title',
endpoint: 'http://localhost',
noAuth: false,
} as DataSourceAttributes;

clientPoolSetup = {
getClientFromPool: jest.fn(),
addClientToPool: jest.fn(),
};

const crendentialAttr = {
title: 'cred',
credentialMaterials: {
credentialMaterialsType: CredentialMaterialsType.UsernamePasswordType,
credentialMaterialsContent: {
username: 'username',
password: 'password',
},
},
} as CredentialSavedObjectAttributes;

savedObjectsMock.get
.mockResolvedValueOnce({
id: DATA_SOURCE_ID,
type: DATA_SOURCE_SAVED_OBJECT_TYPE,
attributes: dataSourceAttr,
references: [{ name: 'user', type: CREDENTIAL_SAVED_OBJECT_TYPE, id: CREDENETIAL_ID }],
})
.mockResolvedValueOnce({
id: CREDENETIAL_ID,
type: CREDENTIAL_SAVED_OBJECT_TYPE,
attributes: crendentialAttr,
references: [],
});

ClientMock.mockImplementation(() => {
return dsClient;
});
});

afterEach(() => {
ClientMock.mockReset();
});
// TODO: mark as skip until we fix the issue of mocking "@opensearch-project/opensearch"
test('configure client with noAuth == true, will call new Client() to create client', async () => {
savedObjectsMock.get.mockReset().mockResolvedValueOnce({
id: DATA_SOURCE_ID,
type: DATA_SOURCE_SAVED_OBJECT_TYPE,
attributes: { ...dataSourceAttr, noAuth: true },
references: [],
});

parseClientOptionsMock.mockReturnValue(clientOptions);

const client = await configureClient(
DATA_SOURCE_ID,
savedObjectsMock,
clientPoolSetup,
config,
logger
);

expect(parseClientOptionsMock).toHaveBeenCalled();
expect(ClientMock).toHaveBeenCalledTimes(1);
expect(ClientMock).toHaveBeenCalledWith(clientOptions);
expect(savedObjectsMock.get).toHaveBeenCalledTimes(1);
expect(client).toBe(dsClient.child.mock.results[0].value);
});

test('configure client with noAuth == false, will first call new Client()', async () => {
const client = await configureClient(
DATA_SOURCE_ID,
savedObjectsMock,
clientPoolSetup,
config,
logger
);

expect(ClientMock).toHaveBeenCalledTimes(1);
expect(savedObjectsMock.get).toHaveBeenCalledTimes(2);
expect(client).toBe(dsClient.child.mock.results[0].value);
});
});
Loading