From dca14b43b20055ffafde776e34e7f6aa57b807e0 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Sat, 16 Sep 2023 14:06:18 -0500 Subject: [PATCH] Rename os source rate/job_count to interval/count, acquire UNASSIGNED partitions before CLOSED partitions (#3327) * Rename os source rate/job_count to interval/count, acquire UNASSIGNED partitions before CLOSED partitions Signed-off-by: Taylor Gray * Rename count to index_read_count Signed-off-by: Taylor Gray --------- Signed-off-by: Taylor Gray --- .../DynamoDbSourceCoordinationStore.java | 14 ++++----- .../DynamoDbSourceCoordinationStoreTest.java | 10 +++---- .../inmemory/InMemoryPartitionAccessor.java | 18 +++++------ .../InMemoryPartitionAccessorTest.java | 4 +-- .../SchedulingParameterConfiguration.java | 16 +++++----- .../configuration/SearchConfiguration.java | 26 ---------------- .../opensearch/worker/WorkerCommonUtils.java | 8 ++--- .../OpenSearchSourceConfigurationTest.java | 30 ++++++++----------- .../SchedulingParameterConfigurationTest.java | 12 ++++---- .../SearchConfigurationTest.java | 8 ----- .../worker/NoSearchContextWorkerTest.java | 8 ++--- .../opensearch/worker/PitWorkerTest.java | 12 ++++---- .../opensearch/worker/ScrollWorkerTest.java | 8 ++--- 13 files changed, 67 insertions(+), 107 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java b/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java index 474c01b6c6..ad75a3df8d 100644 --- a/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java +++ b/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java @@ -91,17 +91,17 @@ public Optional tryAcquireAvailablePartition(final Str return acquiredAssignedItem; } - final Optional acquiredClosedItem = dynamoDbClientWrapper.getAvailablePartition( - ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED, - String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), 1); + final Optional acquiredUnassignedItem = dynamoDbClientWrapper.getAvailablePartition( + ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED, + String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), 5); - if (acquiredClosedItem.isPresent()) { - return acquiredClosedItem; + if (acquiredUnassignedItem.isPresent()) { + return acquiredUnassignedItem; } return dynamoDbClientWrapper.getAvailablePartition( - ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED, - String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), 5); + ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED, + String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), 1); } @Override diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java index a75bdf6799..5003b17150 100644 --- a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java +++ b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java @@ -251,6 +251,11 @@ void getAvailablePartition_with_acquired_CLOSED_partition_returns_the_partition( String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), 1)) .willReturn(Optional.empty()); + given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, + SourcePartitionStatus.UNASSIGNED, + String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), + 5)) + .willReturn(Optional.empty()); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), @@ -278,11 +283,6 @@ void getAvailablePartition_with_acquired_UNASSIGNED_partition_returns_the_partit String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), 1)) .willReturn(Optional.empty()); - given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, - SourcePartitionStatus.CLOSED, - String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), - 1)) - .willReturn(Optional.empty()); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), diff --git a/data-prepper-plugins/in-memory-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemoryPartitionAccessor.java b/data-prepper-plugins/in-memory-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemoryPartitionAccessor.java index 5b1a1ffedf..ebffe3fb78 100644 --- a/data-prepper-plugins/in-memory-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemoryPartitionAccessor.java +++ b/data-prepper-plugins/in-memory-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemoryPartitionAccessor.java @@ -85,15 +85,6 @@ public void queuePartition(final InMemorySourcePartitionStoreItem inMemorySource } public Optional getNextItem() { - final QueuedPartitionsItem nextClosedPartitionItem = closedPartitions.peek(); - - if (Objects.nonNull(nextClosedPartitionItem)) { - if (nextClosedPartitionItem.sortedTimestamp.isBefore(Instant.now()) && partitionLookup.containsKey(nextClosedPartitionItem.sourceIdentifier)) { - closedPartitions.remove(); - return Optional.ofNullable(partitionLookup.get(nextClosedPartitionItem.sourceIdentifier).get(nextClosedPartitionItem.partitionKey)); - } - } - final QueuedPartitionsItem nextUnassignedPartitionItem = unassignedPartitions.peek(); if (Objects.nonNull(nextUnassignedPartitionItem)) { @@ -103,6 +94,15 @@ public Optional getNextItem() { } } + final QueuedPartitionsItem nextClosedPartitionItem = closedPartitions.peek(); + + if (Objects.nonNull(nextClosedPartitionItem)) { + if (nextClosedPartitionItem.sortedTimestamp.isBefore(Instant.now()) && partitionLookup.containsKey(nextClosedPartitionItem.sourceIdentifier)) { + closedPartitions.remove(); + return Optional.ofNullable(partitionLookup.get(nextClosedPartitionItem.sourceIdentifier).get(nextClosedPartitionItem.partitionKey)); + } + } + return Optional.empty(); } diff --git a/data-prepper-plugins/in-memory-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemoryPartitionAccessorTest.java b/data-prepper-plugins/in-memory-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemoryPartitionAccessorTest.java index b348c23373..66cadba5c6 100644 --- a/data-prepper-plugins/in-memory-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemoryPartitionAccessorTest.java +++ b/data-prepper-plugins/in-memory-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/inmemory/InMemoryPartitionAccessorTest.java @@ -171,11 +171,11 @@ void queue_closed_then_unassigned_partitions_followed_by_get_returns_expected_pa final Optional acquiredItem = objectUnderTest.getNextItem(); assertThat(acquiredItem.isPresent(), equalTo(true)); - assertThat(acquiredItem.get(), equalTo(item)); + assertThat(acquiredItem.get(), equalTo(thirdItem)); final Optional secondAcquiredItem = objectUnderTest.getNextItem(); assertThat(secondAcquiredItem.isPresent(), equalTo(true)); - assertThat(secondAcquiredItem.get(), equalTo(thirdItem)); + assertThat(secondAcquiredItem.get(), equalTo(item)); } @Test diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SchedulingParameterConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SchedulingParameterConfiguration.java index c5b9385ce1..04cc0f1450 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SchedulingParameterConfiguration.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SchedulingParameterConfiguration.java @@ -15,12 +15,12 @@ public class SchedulingParameterConfiguration { - @JsonProperty("rate") - private Duration rate = Duration.ofHours(8); + @JsonProperty("interval") + private Duration interval = Duration.ofHours(8); @Min(1) - @JsonProperty("job_count") - private int jobCount = 1; + @JsonProperty("index_read_count") + private int indexReadCount = 1; @JsonProperty("start_time") private String startTime = Instant.now().toString(); @@ -28,12 +28,12 @@ public class SchedulingParameterConfiguration { @JsonIgnore private Instant startTimeInstant; - public Duration getRate() { - return rate; + public Duration getInterval() { + return interval; } - public int getJobCount() { - return jobCount; + public int getIndexReadCount() { + return indexReadCount; } public Instant getStartTime() { diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfiguration.java index f9a4f3d702..c5b6025d32 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfiguration.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfiguration.java @@ -5,17 +5,12 @@ package org.opensearch.dataprepper.plugins.source.opensearch.configuration; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import jakarta.validation.constraints.AssertTrue; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; - public class SearchConfiguration { private static final ObjectMapper objectMapper = new ObjectMapper(); @@ -27,12 +22,6 @@ public class SearchConfiguration { @JsonProperty("batch_size") private Integer batchSize = 1000; - @JsonProperty("query") - private String queryString = "{ \"query\": { \"match_all\": {} }}"; - - @JsonIgnore - private Map queryMap; - public SearchContextType getSearchContextType() { return searchContextType; } @@ -40,19 +29,4 @@ public SearchContextType getSearchContextType() { public Integer getBatchSize() { return batchSize; } - - public Map getQuery() { - return queryMap; - } - - @AssertTrue(message = "query is not a valid json string") - boolean isQueryValid() { - try { - queryMap = objectMapper.readValue(queryString, new TypeReference<>() {}); - return true; - } catch (final Exception e) { - LOG.error("Invalid query json string: ", e); - return false; - } - } } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java index 0d025cde61..2069fcb023 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/WorkerCommonUtils.java @@ -48,8 +48,8 @@ static Pair> createAcknowledgment if (result == true) { sourceCoordinator.closePartition( indexPartition.getPartitionKey(), - openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(), - openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount()); + openSearchSourceConfiguration.getSchedulingParameterConfiguration().getInterval(), + openSearchSourceConfiguration.getSchedulingParameterConfiguration().getIndexReadCount()); } completableFuture.complete(result); }, Duration.ofSeconds(ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS)); @@ -69,8 +69,8 @@ static void completeIndexPartition(final OpenSearchSourceConfiguration openSearc } else { sourceCoordinator.closePartition( indexPartition.getPartitionKey(), - openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(), - openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount()); + openSearchSourceConfiguration.getSchedulingParameterConfiguration().getInterval(), + openSearchSourceConfiguration.getSchedulingParameterConfiguration().getIndexReadCount()); } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfigurationTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfigurationTest.java index 950533c145..23f47908f3 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfigurationTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfigurationTest.java @@ -35,10 +35,9 @@ void open_search_source_username_password_only() throws JsonProcessingException " - index_name_regex: \"regex\"\n" + " - index_name_regex: \"regex-two\"\n" + "scheduling:\n" + - " job_count: 3\n" + + " index_read_count: 3\n" + "search_options:\n" + - " batch_size: 1000\n" + - " query: \"test\"\n"; + " batch_size: 1000\n"; final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class); assertThat(sourceConfiguration.getSearchConfiguration(), notNullValue()); @@ -67,10 +66,9 @@ void open_search_disabled_authentication() throws JsonProcessingException { " - index_name_regex: \"regex\"\n" + " - index_name_regex: \"regex-two\"\n" + "scheduling:\n" + - " job_count: 3\n" + + " index_read_count: 3\n" + "search_options:\n" + - " batch_size: 1000\n" + - " query: \"test\"\n"; + " batch_size: 1000\n"; final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class); assertThat(sourceConfiguration.getSearchConfiguration(), notNullValue()); @@ -100,10 +98,9 @@ void opensearch_source_aws_only() throws JsonProcessingException { " region: \"us-east-1\"\n" + " sts_role_arn: \"arn:aws:iam::123456789012:role/aos-role\"\n" + "scheduling:\n" + - " job_count: 3\n" + + " index_read_count: 3\n" + "search_options:\n" + - " batch_size: 1000\n" + - " query: \"test\"\n"; + " batch_size: 1000\n"; final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class); @@ -131,10 +128,9 @@ void opensearch_source_aws_sts_external_id() throws JsonProcessingException { " sts_role_arn: \"arn:aws:iam::123456789012:role/aos-role\"\n" + " sts_external_id: \"some-random-id\"\n" + "scheduling:\n" + - " job_count: 3\n" + + " index_read_count: 3\n" + "search_options:\n" + - " batch_size: 1000\n" + - " query: \"test\"\n"; + " batch_size: 1000\n"; final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class); @@ -166,10 +162,9 @@ void using_both_aws_config_and_username_password_is_invalid() throws JsonProcess " region: \"us-east-1\"\n" + " sts_role_arn: \"arn:aws:iam::123456789012:role/aos-role\"\n" + "scheduling:\n" + - " job_count: 3\n" + + " index_read_count: 3\n" + "search_options:\n" + - " batch_size: 1000\n" + - " query: \"test\"\n"; + " batch_size: 1000\n"; final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class); @@ -188,10 +183,9 @@ void one_of_username_password_or_aws_config_or_authDisabled_is_required() throws " - index_name_regex: \"regex\"\n" + " - index_name_regex: \"regex-two\"\n" + "scheduling:\n" + - " job_count: 3\n" + + " index_read_count: 3\n" + "search_options:\n" + - " batch_size: 1000\n" + - " query: \"test\"\n"; + " batch_size: 1000\n"; final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class); diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SchedulingParameterConfigurationTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SchedulingParameterConfigurationTest.java index dd049d7691..81e0629bef 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SchedulingParameterConfigurationTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SchedulingParameterConfigurationTest.java @@ -23,21 +23,21 @@ public class SchedulingParameterConfigurationTest { @Test void default_scheduling_configuration() { final SchedulingParameterConfiguration schedulingParameterConfiguration = new SchedulingParameterConfiguration(); - assertThat(schedulingParameterConfiguration.getJobCount(), equalTo(1)); + assertThat(schedulingParameterConfiguration.getIndexReadCount(), equalTo(1)); assertThat(schedulingParameterConfiguration.isStartTimeValid(), equalTo(true)); assertThat(schedulingParameterConfiguration.getStartTime().isBefore(Instant.now()), equalTo(true)); - assertThat(schedulingParameterConfiguration.getRate(), equalTo(Duration.ofHours(8))); + assertThat(schedulingParameterConfiguration.getInterval(), equalTo(Duration.ofHours(8))); } @Test void non_default_scheduling_configuration() throws JsonProcessingException { final String schedulingConfigurationYaml = - " job_count: 3\n" + + " index_read_count: 3\n" + " start_time: \"2007-12-03T10:15:30.00Z\"\n"; final SchedulingParameterConfiguration schedulingParameterConfiguration = objectMapper.readValue(schedulingConfigurationYaml, SchedulingParameterConfiguration.class); - assertThat(schedulingParameterConfiguration.getJobCount(), equalTo(3)); + assertThat(schedulingParameterConfiguration.getIndexReadCount(), equalTo(3)); assertThat(schedulingParameterConfiguration.isStartTimeValid(), equalTo(true)); assertThat(schedulingParameterConfiguration.getStartTime(), equalTo(Instant.parse("2007-12-03T10:15:30.00Z"))); } @@ -45,12 +45,12 @@ void non_default_scheduling_configuration() throws JsonProcessingException { @Test void invalid_start_time_configuration_test() throws JsonProcessingException { final String schedulingConfigurationYaml = - " job_count: 3\n" + + " index_read_count: 3\n" + " start_time: \"2007-12-03T10:15:30\"\n"; final SchedulingParameterConfiguration schedulingParameterConfiguration = objectMapper.readValue(schedulingConfigurationYaml, SchedulingParameterConfiguration.class); - assertThat(schedulingParameterConfiguration.getJobCount(), equalTo(3)); + assertThat(schedulingParameterConfiguration.getIndexReadCount(), equalTo(3)); assertThat(schedulingParameterConfiguration.isStartTimeValid(), equalTo(false)); } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfigurationTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfigurationTest.java index 1c9d4deb7c..4f68b09671 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfigurationTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/configuration/SearchConfigurationTest.java @@ -13,7 +13,6 @@ import java.util.Map; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; public class SearchConfigurationTest { @@ -25,7 +24,6 @@ public class SearchConfigurationTest { void default_search_configuration() { final SearchConfiguration searchConfiguration = new SearchConfiguration(); - assertThat(searchConfiguration.getQuery(), equalTo(null)); assertThat(searchConfiguration.getBatchSize(), equalTo(1000)); } @@ -33,13 +31,9 @@ void default_search_configuration() { void non_default_search_configuration() { final Map pluginSettings = new HashMap<>(); pluginSettings.put("batch_size", 2000); - pluginSettings.put("query", "{\"query\": {\"match_all\": {} }}"); final SearchConfiguration searchConfiguration = objectMapper.convertValue(pluginSettings, SearchConfiguration.class); assertThat(searchConfiguration.getBatchSize(),equalTo(2000)); - assertThat(searchConfiguration.isQueryValid(), equalTo(true)); - assertThat(searchConfiguration.getQuery(), notNullValue()); - assertThat(searchConfiguration.getQuery().containsKey("query"), equalTo(true)); } @Test @@ -47,10 +41,8 @@ void query_is_not_valid_json_string() { final Map pluginSettings = new HashMap<>(); pluginSettings.put("batch_size", 1000); - pluginSettings.put("query", "\\{query: \"my_query\"}"); final SearchConfiguration searchConfiguration = objectMapper.convertValue(pluginSettings, SearchConfiguration.class); assertThat(searchConfiguration.getBatchSize(),equalTo(1000)); - assertThat(searchConfiguration.isQueryValid(), equalTo(false)); } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java index 12eb0223d2..24af77aeac 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java @@ -188,8 +188,8 @@ void run_with_getNextPartition_with_non_empty_partition_processes_and_closes_tha when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty()); final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); - when(schedulingParameterConfiguration.getJobCount()).thenReturn(1); - when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO); + when(schedulingParameterConfiguration.getIndexReadCount()).thenReturn(1); + when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO); when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); doNothing().when(sourceCoordinator).closePartition(partitionKey, @@ -265,8 +265,8 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty()); final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); - when(schedulingParameterConfiguration.getJobCount()).thenReturn(1); - when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO); + when(schedulingParameterConfiguration.getIndexReadCount()).thenReturn(1); + when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO); when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); doNothing().when(sourceCoordinator).closePartition(partitionKey, diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java index 718dec6bb5..69a5ca6991 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java @@ -168,8 +168,8 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_ when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty()); final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); - when(schedulingParameterConfiguration.getJobCount()).thenReturn(1); - when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO); + when(schedulingParameterConfiguration.getIndexReadCount()).thenReturn(1); + when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO); when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); doNothing().when(sourceCoordinator).closePartition(partitionKey, @@ -268,8 +268,8 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty()); final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); - when(schedulingParameterConfiguration.getJobCount()).thenReturn(1); - when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO); + when(schedulingParameterConfiguration.getIndexReadCount()).thenReturn(1); + when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO); when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); doNothing().when(sourceCoordinator).closePartition(partitionKey, @@ -354,8 +354,8 @@ void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty()); final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); - when(schedulingParameterConfiguration.getJobCount()).thenReturn(1); - when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO); + when(schedulingParameterConfiguration.getIndexReadCount()).thenReturn(1); + when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO); when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); doNothing().when(sourceCoordinator).closePartition(partitionKey, diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java index f97838b187..b458a26d1b 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java @@ -167,8 +167,8 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_scro when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty()); final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); - when(schedulingParameterConfiguration.getJobCount()).thenReturn(1); - when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO); + when(schedulingParameterConfiguration.getIndexReadCount()).thenReturn(1); + when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO); when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); doNothing().when(sourceCoordinator).closePartition(partitionKey, @@ -262,8 +262,8 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty()); final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class); - when(schedulingParameterConfiguration.getJobCount()).thenReturn(1); - when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO); + when(schedulingParameterConfiguration.getIndexReadCount()).thenReturn(1); + when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO); when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration); doNothing().when(sourceCoordinator).closePartition(partitionKey,