Skip to content

Commit

Permalink
Migrate from legacy elasticsearch client to opensearch-js client in `…
Browse files Browse the repository at this point in the history
…osd-opensearch-archiver` package (opensearch-project#4142)

Signed-off-by: Manasvini B Suryanarayana <manasvis@amazon.com>
Signed-off-by: Josh Romero <rmerqg@amazon.com>
Co-authored-by: Josh Romero <rmerqg@amazon.com>
  • Loading branch information
manasvinibs and joshuarrrr authored Jun 27, 2023
1 parent d285ecb commit a44b09f
Show file tree
Hide file tree
Showing 22 changed files with 130 additions and 106 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fix EUI/OUI type errors ([#3798](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/3798))
- Remove unused Sass in `tile_map` plugin ([#4110](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/4110))
- [Table Visualization] Remove custom styling for text-align:center in favor of OUI utility class. ([#4164](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/4164))
- Migrate from legacy elasticsearch client to opensearch-js client in `osd-opensearch-archiver` package([#4142](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/4142))
- Replace the use of `bluebird` in `saved_objects` plugin ([#4026](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/4026))
- [Vis Colors] Replace color maps with OUI color palettes ([#4293](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/4293))
- [Vis Colors] [Maps] Replace hardcoded color to OUI color in `maps_legacy` plugin ([#4294](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/4294))
Expand Down
6 changes: 2 additions & 4 deletions packages/osd-opensearch-archiver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
},
"dependencies": {
"@osd/dev-utils": "1.0.0",
"elasticsearch": "^16.7.0"
"@opensearch-project/opensearch": "^2.2.0"
},
"devDependencies": {
"@types/elasticsearch": "^5.0.33"
}
"devDependencies": {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* under the License.
*/

import { Client } from 'elasticsearch';
import { Client } from '@opensearch-project/opensearch';
import { ToolingLog, OsdClient } from '@osd/dev-utils';

import {
Expand Down
4 changes: 2 additions & 2 deletions packages/osd-opensearch-archiver/src/actions/load.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import { resolve } from 'path';
import { createReadStream } from 'fs';
import { Readable } from 'stream';
import { ToolingLog, OsdClient } from '@osd/dev-utils';
import { Client } from 'elasticsearch';
import { Client } from '@opensearch-project/opensearch';

import { createPromiseFromStreams, concatStreamProviders } from '../lib/streams';

Expand Down Expand Up @@ -114,7 +114,7 @@ export async function loadAction({

await client.indices.refresh({
index: '_all',
allowNoIndices: true,
allow_no_indices: true,
});

// If we affected the OpenSearch Dashboards index, we need to ensure it's migrated...
Expand Down
2 changes: 1 addition & 1 deletion packages/osd-opensearch-archiver/src/actions/save.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import { resolve } from 'path';
import { createWriteStream, mkdirSync } from 'fs';
import { Readable, Writable } from 'stream';
import { Client } from 'elasticsearch';
import { Client } from '@opensearch-project/opensearch';
import { ToolingLog } from '@osd/dev-utils';

import { createListStream, createPromiseFromStreams } from '../lib/streams';
Expand Down
2 changes: 1 addition & 1 deletion packages/osd-opensearch-archiver/src/actions/unload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import { resolve } from 'path';
import { createReadStream } from 'fs';
import { Readable, Writable } from 'stream';
import { Client } from 'elasticsearch';
import { Client } from '@opensearch-project/opensearch';
import { ToolingLog, OsdClient } from '@osd/dev-utils';

import { createPromiseFromStreams } from '../lib/streams';
Expand Down
13 changes: 7 additions & 6 deletions packages/osd-opensearch-archiver/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ import Path from 'path';
import readline from 'readline';

import { RunWithCommands, createFlagError } from '@osd/dev-utils';
import { Client, ClientOptions } from '@opensearch-project/opensearch';
import { readConfigFile } from '@osd/test';
import legacyElasticsearch from 'elasticsearch';

import { OpenSearchArchiver } from './opensearch_archiver';

Expand All @@ -56,7 +56,7 @@ export function runCli() {
default: ${defaultConfigPath}
--opensearch-url url for OpenSearch, prefer the --config flag
--opensearch-dashboards-url url for OpenSearch Dashboards, prefer the --config flag
--dir where arechives are stored, prefer the --config flag
--dir where archives are stored, prefer the --config flag
`,
},
async extendContext({ log, flags, addCleanupTask }) {
Expand Down Expand Up @@ -99,10 +99,11 @@ export function runCli() {
throw createFlagError('--dir or --config must be defined');
}

const client = new legacyElasticsearch.Client({
host: opensearchUrl,
log: flags.verbose ? 'trace' : [],
});
const clientOptions: ClientOptions = {
node: opensearchUrl.toString(),
};

const client = new Client(clientOptions);
addCleanupTask(() => client.close());

const opensearchArchiver = new OpenSearchArchiver({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ describe('opensearchArchiver: createGenerateDocRecordsStream()', () => {
expect(params).to.have.property('index', 'logstash-*');
expect(params).to.have.property('size', 1000);
return {
hits: {
total: 0,
hits: [],
body: {
hits: {
total: 0,
hits: [],
},
},
};
},
Expand All @@ -74,9 +76,11 @@ describe('opensearchArchiver: createGenerateDocRecordsStream()', () => {
expect(params).to.have.property('scroll', '1m');
expect(params).to.have.property('rest_total_hits_as_int', true);
return {
hits: {
total: 0,
hits: [],
body: {
hits: {
total: 0,
hits: [],
},
},
};
},
Expand All @@ -101,25 +105,27 @@ describe('opensearchArchiver: createGenerateDocRecordsStream()', () => {
expect(params).to.have.property('index', 'index1');
await delay(200);
return {
_scroll_id: 'index1ScrollId',
hits: { total: 2, hits: [{ _id: 1, _index: '.opensearch_dashboards_1' }] },
body: {
_scroll_id: 'index1ScrollId',
hits: { total: 2, hits: [{ _id: 1, _index: '.opensearch_dashboards_1' }] },
},
};
},
async (name, params) => {
expect(name).to.be('scroll');
expect(params).to.have.property('scrollId', 'index1ScrollId');
expect(params).to.have.property('scroll_id', 'index1ScrollId');
expect(Date.now() - checkpoint).to.not.be.lessThan(200);
checkpoint = Date.now();
await delay(200);
return { hits: { total: 2, hits: [{ _id: 2, _index: 'foo' }] } };
return { body: { hits: { total: 2, hits: [{ _id: 2, _index: 'foo' }] } } };
},
async (name, params) => {
expect(name).to.be('search');
expect(params).to.have.property('index', 'index2');
expect(Date.now() - checkpoint).to.not.be.lessThan(200);
checkpoint = Date.now();
await delay(200);
return { hits: { total: 0, hits: [] } };
return { body: { hits: { total: 0, hits: [] } } };
},
]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/

import { Transform } from 'stream';
import { Client, SearchParams, SearchResponse } from 'elasticsearch';
import { Client, ApiResponse } from '@opensearch-project/opensearch';
import { Stats } from '../stats';
import { Progress } from '../progress';

Expand All @@ -53,7 +53,7 @@ export function createGenerateDocRecordsStream({
async transform(index, enc, callback) {
try {
let remainingHits = 0;
let resp: SearchResponse<any> | null = null;
let resp: ApiResponse<any> | null = null;

while (!resp || remainingHits > 0) {
if (!resp) {
Expand All @@ -66,17 +66,17 @@ export function createGenerateDocRecordsStream({
query,
},
rest_total_hits_as_int: true, // not declared on SearchParams type
} as SearchParams);
remainingHits = resp.hits.total;
});
remainingHits = resp.body.hits.total;
progress.addToTotal(remainingHits);
} else {
resp = await client.scroll({
scrollId: resp._scroll_id!,
scroll_id: resp.body._scroll_id!,
scroll: SCROLL_TIMEOUT,
});
}

for (const hit of resp.hits.hits) {
for (const hit of resp.body?.hits.hits) {
remainingHits -= 1;
stats.archivedDoc(hit._index);
this.push({
Expand All @@ -94,7 +94,7 @@ export function createGenerateDocRecordsStream({
});
}

progress.addToComplete(resp.hits.hits.length);
progress.addToComplete(resp.body.hits.hits.length);
}

callback(undefined);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ describe('opensearchArchiver: createIndexDocRecordsStream()', () => {
const client = createStubClient([
async (name, params) => {
expect(name).to.be('bulk');
expect(params).to.eql({
body: recordsToBulkBody(records),
requestTimeout: 120000,
});
return { ok: true };
expect(params).to.eql({ body: recordsToBulkBody(records) });
return {
body: {
ok: true,
},
};
},
]);
const stats = createStubStats();
Expand All @@ -88,19 +89,21 @@ describe('opensearchArchiver: createIndexDocRecordsStream()', () => {
const client = createStubClient([
async (name, params) => {
expect(name).to.be('bulk');
expect(params).to.eql({
body: recordsToBulkBody(records.slice(0, 1)),
requestTimeout: 120000,
});
return { ok: true };
expect(params).to.eql({ body: recordsToBulkBody(records.slice(0, 1)) });
return {
body: {
ok: true,
},
};
},
async (name, params) => {
expect(name).to.be('bulk');
expect(params).to.eql({
body: recordsToBulkBody(records.slice(1)),
requestTimeout: 120000,
});
return { ok: true };
expect(params).to.eql({ body: recordsToBulkBody(records.slice(1)) });
return {
body: {
ok: true,
},
};
},
]);
const stats = createStubStats();
Expand All @@ -124,21 +127,23 @@ describe('opensearchArchiver: createIndexDocRecordsStream()', () => {
const client = createStubClient([
async (name, params) => {
expect(name).to.be('bulk');
expect(params).to.eql({
body: recordsToBulkBody(records.slice(0, 1)),
requestTimeout: 120000,
});
expect(params).to.eql({ body: recordsToBulkBody(records.slice(0, 1)) });
await delay(delayMs);
return { ok: true };
return {
body: {
ok: true,
},
};
},
async (name, params) => {
expect(name).to.be('bulk');
expect(params).to.eql({
body: recordsToBulkBody(records.slice(1)),
requestTimeout: 120000,
});
expect(params).to.eql({ body: recordsToBulkBody(records.slice(1)) });
expect(Date.now() - start).to.not.be.lessThan(delayMs);
return { ok: true };
return {
body: {
ok: true,
},
};
},
]);
const progress = new Progress();
Expand All @@ -160,17 +165,29 @@ describe('opensearchArchiver: createIndexDocRecordsStream()', () => {
async (name, params) => {
expect(name).to.be('bulk');
expect(params.body.length).to.eql(1 * 2);
return { ok: true };
return {
body: {
ok: true,
},
};
},
async (name, params) => {
expect(name).to.be('bulk');
expect(params.body.length).to.eql(299 * 2);
return { ok: true };
return {
body: {
ok: true,
},
};
},
async (name, params) => {
expect(name).to.be('bulk');
expect(params.body.length).to.eql(1 * 2);
return { ok: true };
return {
body: {
ok: true,
},
};
},
]);
const progress = new Progress();
Expand All @@ -189,8 +206,8 @@ describe('opensearchArchiver: createIndexDocRecordsStream()', () => {
const records = createPersonDocRecords(2);
const stats = createStubStats();
const client = createStubClient([
async () => ({ ok: true }),
async () => ({ errors: true, forcedError: true }),
async () => ({ body: { ok: true } }),
async () => ({ body: { errors: true, forcedError: true } }),
]);
const progress = new Progress();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* under the License.
*/

import { Client } from 'elasticsearch';
import { Client } from '@opensearch-project/opensearch';
import { Writable } from 'stream';
import { Stats } from '../stats';
import { Progress } from '../progress';
Expand Down Expand Up @@ -58,8 +58,8 @@ export function createIndexDocRecordsStream(
);
});

const resp = await client.bulk({ requestTimeout: 2 * 60 * 1000, body });
if (resp.errors) {
const resp = await client.bulk({ body }, { requestTimeout: 2 * 60 * 1000 });
if (resp.body.errors) {
throw new Error(`Failed to index all documents: ${JSON.stringify(resp, null, 2)}`);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* under the License.
*/

import { Client } from 'elasticsearch';
import { Client } from '@opensearch-project/opensearch';
import sinon from 'sinon';
import Chance from 'chance';
import { times } from 'lodash';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ describe('opensearchArchiver: createCreateIndexStream()', () => {
expect((client.indices.getAlias as sinon.SinonSpy).calledOnce).to.be.ok();
expect((client.indices.getAlias as sinon.SinonSpy).args[0][0]).to.eql({
name: 'existing-index',
ignore: [404],
});
expect((client.indices.delete as sinon.SinonSpy).calledOnce).to.be.ok();
expect((client.indices.delete as sinon.SinonSpy).args[0][0]).to.eql({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import { Transform, Readable } from 'stream';
import { inspect } from 'util';

import { Client } from 'elasticsearch';
import { Client } from '@opensearch-project/opensearch';
import { ToolingLog } from '@osd/dev-utils';

import { Stats } from '../stats';
Expand Down
Loading

0 comments on commit a44b09f

Please sign in to comment.