Skip to content

Commit

Permalink
fix(api-elasticsearch): create cursor from PrimitiveValue type
Browse files Browse the repository at this point in the history
  • Loading branch information
brunozoric committed Oct 17, 2024
1 parent b9cc85b commit 45b6e76
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ jest.mock("~/tasks/dataSynchronization/createFactories", () => {
return {
createFactories: (): IFactories => {
return {
createElasticsearch: ({ manager }) => {
createElasticsearchToDynamoDb: ({ manager }) => {
return {
run: async input => {
return manager.response.continue({
...input,
elasticsearch: {
elasticsearchToDynamoDb: {
finished: true
}
});
Expand Down Expand Up @@ -72,7 +72,7 @@ describe("data synchronization - elasticsearch", () => {
const task = await context.tasks.createTask<IDataSynchronizationInput>({
definitionId: DATA_SYNCHRONIZATION_TASK,
input: {
elasticsearch: {
elasticsearchToDynamoDb: {
finished: true
},
dynamoDbElasticsearch: {
Expand Down Expand Up @@ -110,7 +110,7 @@ describe("data synchronization - elasticsearch", () => {
dynamoDbElasticsearch: {
finished: true
},
elasticsearch: {
elasticsearchToDynamoDb: {
finished: true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ export class DataSynchronizationTaskRunner {
* First we check if we need to sync Elasticsearch.
*/
//
if (!input.elasticsearch?.finished) {
const sync = this.factories.createElasticsearch({
if (!input.elasticsearchToDynamoDb?.finished) {
const sync = this.factories.createElasticsearchToDynamoDb({
manager: this.manager,
indexManager: this.indexManager
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { IFactories } from "./types";
import { DynamoDbElasticsearchSynchronization } from "./dynamoDbElasticsearch/DynamoDbElasticsearchSynchronization";
import { ElasticsearchSynchronization } from "./elasticsearch/ElasticsearchSynchronization";
import { ElasticsearchToDynamoDbSynchronization } from "./elasticsearch/ElasticsearchToDynamoDbSynchronization";
import { DynamoDbSynchronization } from "./dynamoDb/DynamoDbSynchronization";

export const createFactories = (): IFactories => {
return {
createElasticsearch: params => {
return new ElasticsearchSynchronization(params);
createElasticsearchToDynamoDb: params => {
return new ElasticsearchToDynamoDbSynchronization(params);
},
createDynamoDbElasticsearch: params => {
return new DynamoDbElasticsearchSynchronization(params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ interface ISearchResultHitsItemSource {
id: string;
}

export class ElasticsearchSynchronization implements ISynchronization {
export class ElasticsearchToDynamoDbSynchronization implements ISynchronization {
private readonly manager: IDataSynchronizationManager;
private readonly indexManager: IIndexManager;

Expand All @@ -41,8 +41,8 @@ export class ElasticsearchSynchronization implements ISynchronization {
}

public async run(input: IDataSynchronizationInput): Promise<ITaskResponseResult> {
const lastIndex = input.elasticsearch?.index;
let cursor = input.elasticsearch?.cursor;
const lastIndex = input.elasticsearchToDynamoDb?.index;
let cursor = input.elasticsearchToDynamoDb?.cursor;
const indexes = await this.fetchAllIndexes();
const next = lastIndex ? indexes.findIndex(index => index === lastIndex) + 1 : 0;

Expand All @@ -54,8 +54,8 @@ export class ElasticsearchSynchronization implements ISynchronization {
} else if (this.manager.isCloseToTimeout()) {
return this.manager.response.continue({
...input,
elasticsearch: {
...input.elasticsearch,
elasticsearchToDynamoDb: {
...input.elasticsearchToDynamoDb,
index: currentIndex,
cursor
}
Expand Down Expand Up @@ -85,8 +85,8 @@ export class ElasticsearchSynchronization implements ISynchronization {
});
return this.manager.response.continue({
...input,
elasticsearch: {
...input.elasticsearch,
elasticsearchToDynamoDb: {
...input.elasticsearchToDynamoDb,
index: currentIndex,
cursor: done ? undefined : newCursor
}
Expand All @@ -98,8 +98,8 @@ export class ElasticsearchSynchronization implements ISynchronization {

return this.manager.response.continue({
...input,
elasticsearch: {
...input.elasticsearch,
elasticsearchToDynamoDb: {
...input.elasticsearchToDynamoDb,
finished: true
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ export interface IDataSynchronizationInputValue {
finished?: boolean;
}

export interface IDataSynchronizationInputElasticsearchValue
export interface IDataSynchronizationInputElasticsearchToDynamoDbValue
extends IDataSynchronizationInputValue {
index?: string;
cursor?: PrimitiveValue[];
}

export interface IDataSynchronizationInput {
elasticsearch?: IDataSynchronizationInputElasticsearchValue;
elasticsearchToDynamoDb?: IDataSynchronizationInputElasticsearchToDynamoDbValue;
dynamoDbElasticsearch?: IDataSynchronizationInputValue;
dynamoDb?: IDataSynchronizationInputValue;
}
Expand Down Expand Up @@ -49,7 +49,7 @@ export interface IDynamoDbSyncFactory {
}

export interface IFactories {
createElasticsearch: IElasticsearchSyncFactory;
createElasticsearchToDynamoDb: IElasticsearchSyncFactory;
createDynamoDbElasticsearch: IDynamoDbElasticsearchSyncFactory;
createDynamoDb: IDynamoDbSyncFactory;
}
Expand Down
12 changes: 8 additions & 4 deletions packages/api-elasticsearch/src/cursors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ import { PrimitiveValue } from "~/types";
/**
* Encode a received cursor value into something that can be passed on to the user.
*/
export const encodeCursor = (cursor?: string | string[] | null): string | undefined => {
if (!cursor) {
export const encodeCursor = (input?: PrimitiveValue[]): string | undefined => {
if (!input) {
return undefined;
}

cursor = Array.isArray(cursor) ? cursor.map(encodeURIComponent) : encodeURIComponent(cursor);
const cursor = Array.isArray(input)
? input
.filter((item: PrimitiveValue): item is string | number | boolean => item !== null)
.map(item => encodeURIComponent(item))
: encodeURIComponent(input);

try {
return Buffer.from(JSON.stringify(cursor)).toString("base64");
Expand All @@ -28,7 +32,7 @@ export const decodeCursor = (cursor?: string | null): PrimitiveValue[] | undefin
try {
const value = JSON.parse(Buffer.from(cursor, "base64").toString("ascii"));
if (Array.isArray(value)) {
return value.map(decodeURIComponent);
return value.filter(item => item !== null).map(decodeURIComponent);
}
const decoded = decodeURIComponent(value);
return decoded ? [decoded] : undefined;
Expand Down
4 changes: 2 additions & 2 deletions packages/api-elasticsearch/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ApiResponse, Client } from "@elastic/elasticsearch";
import { BoolQueryConfig as esBoolQueryConfig, PrimitiveValue, Query as esQuery } from "elastic-ts";
import { BoolQueryConfig, PrimitiveValue, Query as esQuery } from "elastic-ts";
import { Context, GenericRecord } from "@webiny/api/types";
/**
* Re-export some dep lib types.
Expand All @@ -15,7 +15,7 @@ export interface ElasticsearchContext extends Context {
* To simplify our plugins, we say that query contains arrays of objects, not single objects.
* And that they all are defined as empty arrays at the start.
*/
export interface ElasticsearchBoolQueryConfig extends esBoolQueryConfig {
export interface ElasticsearchBoolQueryConfig extends BoolQueryConfig {
must: esQuery[];
filter: esQuery[];
should: esQuery[];
Expand Down

0 comments on commit 45b6e76

Please sign in to comment.