Skip to content

Commit

Permalink
Rename os source rate/job_count to interval/count, acquire UNASSIGNED…
Browse files Browse the repository at this point in the history
… partitions before CLOSED partitions (#3327)

* Rename os source rate/job_count to interval/count, acquire UNASSIGNED partitions before CLOSED partitions
Signed-off-by: Taylor Gray <tylgry@amazon.com>

* Rename count to index_read_count

Signed-off-by: Taylor Gray <tylgry@amazon.com>

---------

Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 authored Sep 16, 2023
1 parent c95eb92 commit dca14b4
Show file tree
Hide file tree
Showing 13 changed files with 67 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,17 @@ public Optional<SourcePartitionStoreItem> tryAcquireAvailablePartition(final Str
return acquiredAssignedItem;
}

final Optional<SourcePartitionStoreItem> acquiredClosedItem = dynamoDbClientWrapper.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), 1);
final Optional<SourcePartitionStoreItem> acquiredUnassignedItem = dynamoDbClientWrapper.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), 5);

if (acquiredClosedItem.isPresent()) {
return acquiredClosedItem;
if (acquiredUnassignedItem.isPresent()) {
return acquiredUnassignedItem;
}

return dynamoDbClientWrapper.getAvailablePartition(
ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), 5);
ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), 1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ void getAvailablePartition_with_acquired_CLOSED_partition_returns_the_partition(
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED),
1))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED),
5))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.CLOSED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED),
Expand Down Expand Up @@ -278,11 +283,6 @@ void getAvailablePartition_with_acquired_UNASSIGNED_partition_returns_the_partit
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED),
1))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.CLOSED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED),
1))
.willReturn(Optional.empty());
given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout,
SourcePartitionStatus.UNASSIGNED,
String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,6 @@ public void queuePartition(final InMemorySourcePartitionStoreItem inMemorySource
}

public Optional<SourcePartitionStoreItem> getNextItem() {
final QueuedPartitionsItem nextClosedPartitionItem = closedPartitions.peek();

if (Objects.nonNull(nextClosedPartitionItem)) {
if (nextClosedPartitionItem.sortedTimestamp.isBefore(Instant.now()) && partitionLookup.containsKey(nextClosedPartitionItem.sourceIdentifier)) {
closedPartitions.remove();
return Optional.ofNullable(partitionLookup.get(nextClosedPartitionItem.sourceIdentifier).get(nextClosedPartitionItem.partitionKey));
}
}

final QueuedPartitionsItem nextUnassignedPartitionItem = unassignedPartitions.peek();

if (Objects.nonNull(nextUnassignedPartitionItem)) {
Expand All @@ -103,6 +94,15 @@ public Optional<SourcePartitionStoreItem> getNextItem() {
}
}

final QueuedPartitionsItem nextClosedPartitionItem = closedPartitions.peek();

if (Objects.nonNull(nextClosedPartitionItem)) {
if (nextClosedPartitionItem.sortedTimestamp.isBefore(Instant.now()) && partitionLookup.containsKey(nextClosedPartitionItem.sourceIdentifier)) {
closedPartitions.remove();
return Optional.ofNullable(partitionLookup.get(nextClosedPartitionItem.sourceIdentifier).get(nextClosedPartitionItem.partitionKey));
}
}

return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ void queue_closed_then_unassigned_partitions_followed_by_get_returns_expected_pa

final Optional<SourcePartitionStoreItem> acquiredItem = objectUnderTest.getNextItem();
assertThat(acquiredItem.isPresent(), equalTo(true));
assertThat(acquiredItem.get(), equalTo(item));
assertThat(acquiredItem.get(), equalTo(thirdItem));

final Optional<SourcePartitionStoreItem> secondAcquiredItem = objectUnderTest.getNextItem();
assertThat(secondAcquiredItem.isPresent(), equalTo(true));
assertThat(secondAcquiredItem.get(), equalTo(thirdItem));
assertThat(secondAcquiredItem.get(), equalTo(item));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@

public class SchedulingParameterConfiguration {

@JsonProperty("rate")
private Duration rate = Duration.ofHours(8);
@JsonProperty("interval")
private Duration interval = Duration.ofHours(8);

@Min(1)
@JsonProperty("job_count")
private int jobCount = 1;
@JsonProperty("index_read_count")
private int indexReadCount = 1;

@JsonProperty("start_time")
private String startTime = Instant.now().toString();

@JsonIgnore
private Instant startTimeInstant;

public Duration getRate() {
return rate;
public Duration getInterval() {
return interval;
}

public int getJobCount() {
return jobCount;
public int getIndexReadCount() {
return indexReadCount;
}

public Instant getStartTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,12 @@

package org.opensearch.dataprepper.plugins.source.opensearch.configuration;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.validation.constraints.AssertTrue;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.SearchContextType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class SearchConfiguration {

private static final ObjectMapper objectMapper = new ObjectMapper();
Expand All @@ -27,32 +22,11 @@ public class SearchConfiguration {
@JsonProperty("batch_size")
private Integer batchSize = 1000;

@JsonProperty("query")
private String queryString = "{ \"query\": { \"match_all\": {} }}";

@JsonIgnore
private Map<String, Object> queryMap;

public SearchContextType getSearchContextType() {
return searchContextType;
}

public Integer getBatchSize() {
return batchSize;
}

public Map<String, Object> getQuery() {
return queryMap;
}

@AssertTrue(message = "query is not a valid json string")
boolean isQueryValid() {
try {
queryMap = objectMapper.readValue(queryString, new TypeReference<>() {});
return true;
} catch (final Exception e) {
LOG.error("Invalid query json string: ", e);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ static Pair<AcknowledgementSet, CompletableFuture<Boolean>> createAcknowledgment
if (result == true) {
sourceCoordinator.closePartition(
indexPartition.getPartitionKey(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount());
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getInterval(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getIndexReadCount());
}
completableFuture.complete(result);
}, Duration.ofSeconds(ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS));
Expand All @@ -69,8 +69,8 @@ static void completeIndexPartition(final OpenSearchSourceConfiguration openSearc
} else {
sourceCoordinator.closePartition(
indexPartition.getPartitionKey(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount());
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getInterval(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getIndexReadCount());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ void open_search_source_username_password_only() throws JsonProcessingException
" - index_name_regex: \"regex\"\n" +
" - index_name_regex: \"regex-two\"\n" +
"scheduling:\n" +
" job_count: 3\n" +
" index_read_count: 3\n" +
"search_options:\n" +
" batch_size: 1000\n" +
" query: \"test\"\n";
" batch_size: 1000\n";
final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

assertThat(sourceConfiguration.getSearchConfiguration(), notNullValue());
Expand Down Expand Up @@ -67,10 +66,9 @@ void open_search_disabled_authentication() throws JsonProcessingException {
" - index_name_regex: \"regex\"\n" +
" - index_name_regex: \"regex-two\"\n" +
"scheduling:\n" +
" job_count: 3\n" +
" index_read_count: 3\n" +
"search_options:\n" +
" batch_size: 1000\n" +
" query: \"test\"\n";
" batch_size: 1000\n";
final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

assertThat(sourceConfiguration.getSearchConfiguration(), notNullValue());
Expand Down Expand Up @@ -100,10 +98,9 @@ void opensearch_source_aws_only() throws JsonProcessingException {
" region: \"us-east-1\"\n" +
" sts_role_arn: \"arn:aws:iam::123456789012:role/aos-role\"\n" +
"scheduling:\n" +
" job_count: 3\n" +
" index_read_count: 3\n" +
"search_options:\n" +
" batch_size: 1000\n" +
" query: \"test\"\n";
" batch_size: 1000\n";

final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

Expand Down Expand Up @@ -131,10 +128,9 @@ void opensearch_source_aws_sts_external_id() throws JsonProcessingException {
" sts_role_arn: \"arn:aws:iam::123456789012:role/aos-role\"\n" +
" sts_external_id: \"some-random-id\"\n" +
"scheduling:\n" +
" job_count: 3\n" +
" index_read_count: 3\n" +
"search_options:\n" +
" batch_size: 1000\n" +
" query: \"test\"\n";
" batch_size: 1000\n";

final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

Expand Down Expand Up @@ -166,10 +162,9 @@ void using_both_aws_config_and_username_password_is_invalid() throws JsonProcess
" region: \"us-east-1\"\n" +
" sts_role_arn: \"arn:aws:iam::123456789012:role/aos-role\"\n" +
"scheduling:\n" +
" job_count: 3\n" +
" index_read_count: 3\n" +
"search_options:\n" +
" batch_size: 1000\n" +
" query: \"test\"\n";
" batch_size: 1000\n";

final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

Expand All @@ -188,10 +183,9 @@ void one_of_username_password_or_aws_config_or_authDisabled_is_required() throws
" - index_name_regex: \"regex\"\n" +
" - index_name_regex: \"regex-two\"\n" +
"scheduling:\n" +
" job_count: 3\n" +
" index_read_count: 3\n" +
"search_options:\n" +
" batch_size: 1000\n" +
" query: \"test\"\n";
" batch_size: 1000\n";

final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,34 @@ public class SchedulingParameterConfigurationTest {
@Test
void default_scheduling_configuration() {
final SchedulingParameterConfiguration schedulingParameterConfiguration = new SchedulingParameterConfiguration();
assertThat(schedulingParameterConfiguration.getJobCount(), equalTo(1));
assertThat(schedulingParameterConfiguration.getIndexReadCount(), equalTo(1));
assertThat(schedulingParameterConfiguration.isStartTimeValid(), equalTo(true));
assertThat(schedulingParameterConfiguration.getStartTime().isBefore(Instant.now()), equalTo(true));
assertThat(schedulingParameterConfiguration.getRate(), equalTo(Duration.ofHours(8)));
assertThat(schedulingParameterConfiguration.getInterval(), equalTo(Duration.ofHours(8)));
}

@Test
void non_default_scheduling_configuration() throws JsonProcessingException {
final String schedulingConfigurationYaml =
" job_count: 3\n" +
" index_read_count: 3\n" +
" start_time: \"2007-12-03T10:15:30.00Z\"\n";

final SchedulingParameterConfiguration schedulingParameterConfiguration = objectMapper.readValue(schedulingConfigurationYaml, SchedulingParameterConfiguration.class);

assertThat(schedulingParameterConfiguration.getJobCount(), equalTo(3));
assertThat(schedulingParameterConfiguration.getIndexReadCount(), equalTo(3));
assertThat(schedulingParameterConfiguration.isStartTimeValid(), equalTo(true));
assertThat(schedulingParameterConfiguration.getStartTime(), equalTo(Instant.parse("2007-12-03T10:15:30.00Z")));
}

@Test
void invalid_start_time_configuration_test() throws JsonProcessingException {
final String schedulingConfigurationYaml =
" job_count: 3\n" +
" index_read_count: 3\n" +
" start_time: \"2007-12-03T10:15:30\"\n";

final SchedulingParameterConfiguration schedulingParameterConfiguration = objectMapper.readValue(schedulingConfigurationYaml, SchedulingParameterConfiguration.class);

assertThat(schedulingParameterConfiguration.getJobCount(), equalTo(3));
assertThat(schedulingParameterConfiguration.getIndexReadCount(), equalTo(3));
assertThat(schedulingParameterConfiguration.isStartTimeValid(), equalTo(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;

public class SearchConfigurationTest {
Expand All @@ -25,32 +24,25 @@ public class SearchConfigurationTest {
void default_search_configuration() {
final SearchConfiguration searchConfiguration = new SearchConfiguration();

assertThat(searchConfiguration.getQuery(), equalTo(null));
assertThat(searchConfiguration.getBatchSize(), equalTo(1000));
}

@Test
void non_default_search_configuration() {
final Map<String, Object> pluginSettings = new HashMap<>();
pluginSettings.put("batch_size", 2000);
pluginSettings.put("query", "{\"query\": {\"match_all\": {} }}");

final SearchConfiguration searchConfiguration = objectMapper.convertValue(pluginSettings, SearchConfiguration.class);
assertThat(searchConfiguration.getBatchSize(),equalTo(2000));
assertThat(searchConfiguration.isQueryValid(), equalTo(true));
assertThat(searchConfiguration.getQuery(), notNullValue());
assertThat(searchConfiguration.getQuery().containsKey("query"), equalTo(true));
}

@Test
void query_is_not_valid_json_string() {

final Map<String, Object> pluginSettings = new HashMap<>();
pluginSettings.put("batch_size", 1000);
pluginSettings.put("query", "\\{query: \"my_query\"}");

final SearchConfiguration searchConfiguration = objectMapper.convertValue(pluginSettings, SearchConfiguration.class);
assertThat(searchConfiguration.getBatchSize(),equalTo(1000));
assertThat(searchConfiguration.isQueryValid(), equalTo(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ void run_with_getNextPartition_with_non_empty_partition_processes_and_closes_tha
when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty());

final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class);
when(schedulingParameterConfiguration.getJobCount()).thenReturn(1);
when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO);
when(schedulingParameterConfiguration.getIndexReadCount()).thenReturn(1);
when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO);
when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration);

doNothing().when(sourceCoordinator).closePartition(partitionKey,
Expand Down Expand Up @@ -265,8 +265,8 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa
when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty());

final SchedulingParameterConfiguration schedulingParameterConfiguration = mock(SchedulingParameterConfiguration.class);
when(schedulingParameterConfiguration.getJobCount()).thenReturn(1);
when(schedulingParameterConfiguration.getRate()).thenReturn(Duration.ZERO);
when(schedulingParameterConfiguration.getIndexReadCount()).thenReturn(1);
when(schedulingParameterConfiguration.getInterval()).thenReturn(Duration.ZERO);
when(openSearchSourceConfiguration.getSchedulingParameterConfiguration()).thenReturn(schedulingParameterConfiguration);

doNothing().when(sourceCoordinator).closePartition(partitionKey,
Expand Down
Loading

0 comments on commit dca14b4

Please sign in to comment.