From 633401a0727019f86e0d7c475fead5b3bef98889 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Wed, 11 Oct 2023 14:40:16 -0500 Subject: [PATCH] Move ddb source coordinator config to the data-prepper-config.yaml (#3466) Signed-off-by: Taylor Gray --- .../enhanced/EnhancedPartition.java | 4 +- .../enhanced}/EnhancedSourceCoordinator.java | 22 +- .../enhanced/EnhancedSourcePartition.java | 30 ++- .../UsesEnhancedSourceCoordination.java | 22 ++ .../enhanced/EnhancedSourcePartitionTest.java | 197 ++++++++++++++++++ .../TestInvalidPartitionProgressState.java | 17 ++ .../enhanced/TestPartitionProgressState.java | 24 +++ .../dataprepper/pipeline/Pipeline.java | 9 + .../SourceCoordinatorFactory.java | 24 +++ .../EnhancedLeaseBasedSourceCoordinator.java | 44 ++-- .../dataprepper/pipeline/PipelineTests.java | 43 +++- .../TestSourceWithEnhancedCoordination.java | 74 +++++++ .../SourceCoordinatorFactoryTest.java | 76 ++++++- ...hancedLeaseBasedSourceCoordinatorTest.java | 55 +++-- .../dynamodb-source/build.gradle | 2 - .../source/dynamodb/ClientFactory.java | 4 + .../source/dynamodb/DynamoDBService.java | 6 +- .../source/dynamodb/DynamoDBSource.java | 49 +++-- .../source/dynamodb/DynamoDBSourceConfig.java | 11 - .../coordination/PartitionFactory.java | 5 +- .../partition/DataFilePartition.java | 5 +- .../partition/ExportPartition.java | 4 +- .../coordination/partition/GlobalState.java | 5 +- .../coordination/partition/InitPartition.java | 4 +- .../partition/StreamPartition.java | 4 +- .../dynamodb/export/DataFileCheckpointer.java | 2 +- .../export/DataFileLoaderFactory.java | 2 +- .../dynamodb/export/DataFileScheduler.java | 8 +- .../dynamodb/export/ExportScheduler.java | 6 +- .../dynamodb/stream/ShardConsumerFactory.java | 6 +- .../dynamodb/stream/StreamCheckpointer.java | 6 +- .../dynamodb/stream/StreamScheduler.java | 8 +- .../source/dynamodb/DynamoDBServiceTest.java | 8 +- .../dynamodb/DynamoDBSourceConfigTest.java | 7 +- .../coordination/PartitionFactoryTest.java | 11 +- .../export/DataFileLoaderFactoryTest.java | 2 +- .../dynamodb/export/DataFileLoaderTest.java | 12 +- .../export/DataFileSchedulerTest.java | 11 +- .../dynamodb/export/ExportSchedulerTest.java | 15 +- .../stream/ShardConsumerFactoryTest.java | 3 +- .../dynamodb/stream/ShardConsumerTest.java | 12 +- .../dynamodb/stream/StreamSchedulerTest.java | 12 +- 42 files changed, 675 insertions(+), 196 deletions(-) rename data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/Partition.java => data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedPartition.java (72%) rename {data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination => data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced}/EnhancedSourceCoordinator.java (82%) rename data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/SourcePartition.java => data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourcePartition.java (62%) create mode 100644 data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/UsesEnhancedSourceCoordination.java create mode 100644 data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourcePartitionTest.java create mode 100644 data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/TestInvalidPartitionProgressState.java create mode 100644 data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/TestPartitionProgressState.java rename data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/DefaultEnhancedSourceCoordinator.java => data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java (78%) create mode 100644 data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestSourceWithEnhancedCoordination.java rename data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/DefaultCoordinatorTest.java => data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinatorTest.java (74%) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/Partition.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedPartition.java similarity index 72% rename from data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/Partition.java rename to data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedPartition.java index 2f6f51c8af..d26f79c1c9 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/Partition.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedPartition.java @@ -3,14 +3,14 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.dynamodb.coordination; +package org.opensearch.dataprepper.model.source.coordinator.enhanced; import java.util.Optional; /** * A Partition Interface represents an item in the coordination store. */ -public interface Partition { +public interface EnhancedPartition { String getPartitionType(); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/EnhancedSourceCoordinator.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourceCoordinator.java similarity index 82% rename from data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/EnhancedSourceCoordinator.java rename to data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourceCoordinator.java index 3372586a43..9341a13e59 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/EnhancedSourceCoordinator.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourceCoordinator.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.dynamodb.coordination; +package org.opensearch.dataprepper.model.source.coordinator.enhanced; import java.time.Duration; import java.util.Optional; @@ -25,20 +25,20 @@ public interface EnhancedSourceCoordinator { /** * This method is used to create the partition item in the coordination store. * - * @param partition A specific partition that extends {@link SourcePartition} + * @param partition A specific partition that extends {@link EnhancedSourcePartition} * @param The progress state class * @return True if partition is created successfully otherwise false. */ - boolean createPartition(SourcePartition partition); + boolean createPartition(EnhancedSourcePartition partition); /** * This method is used to acquire a lease on the partition item in the coordination store. * * @param partitionType The partition type identifier - * @return A {@link SourcePartition} instance + * @return A {@link EnhancedSourcePartition} instance */ - Optional acquireAvailablePartition(String partitionType); + Optional acquireAvailablePartition(String partitionType); /** * This method is used to update progress state for a partition in the coordination store. @@ -48,7 +48,7 @@ public interface EnhancedSourceCoordinator { * @param The progress state class * @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException when the partition was not updated successfully */ - void saveProgressStateForPartition(SourcePartition partition); + void saveProgressStateForPartition(EnhancedSourcePartition partition); /** * This method is used to release the lease of a partition in the coordination store. @@ -58,7 +58,7 @@ public interface EnhancedSourceCoordinator { * @param The progress state class * @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException when the partition was not updated successfully */ - void giveUpPartition(SourcePartition partition); + void giveUpPartition(EnhancedSourcePartition partition); /** * This method is used to mark a partition as COMPLETED in the coordination store. @@ -68,7 +68,7 @@ public interface EnhancedSourceCoordinator { * @param The progress state class * @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException when the partition was not updated successfully */ - void completePartition(SourcePartition partition); + void completePartition(EnhancedSourcePartition partition); /** * This method is used to mark a partition as CLOSED in the coordination store. @@ -82,7 +82,7 @@ public interface EnhancedSourceCoordinator { * @param The progress state class * @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException when the partition was not updated successfully */ - void closePartition(SourcePartition partition, final Duration reopenAfter, final int maxClosedCount); + void closePartition(EnhancedSourcePartition partition, final Duration reopenAfter, final int maxClosedCount); /** @@ -91,9 +91,9 @@ public interface EnhancedSourceCoordinator { * Hence, it's designed to be used as a "Global State" which can be read whenever needed. * * @param partitionKey A unique key for that partition - * @return A {@link SourcePartition} instance + * @return A {@link EnhancedSourcePartition} instance */ - Optional getPartition(String partitionKey); + Optional getPartition(String partitionKey); /** * This method is to perform initialization for the coordinator diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/SourcePartition.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourcePartition.java similarity index 62% rename from data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/SourcePartition.java rename to data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourcePartition.java index d26320666e..ac52ace478 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/SourcePartition.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourcePartition.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.dynamodb.coordination; +package org.opensearch.dataprepper.model.source.coordinator.enhanced; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -17,7 +17,7 @@ /** *

- * A base definition of a {@link Partition} in the coordination store. + * A base definition of a {@link EnhancedPartition} in the coordination store. * All partitions must extend this. *

* We store the {SourcePartitionStoreItem} in the partition. @@ -30,40 +30,38 @@ * * @param The progress state class */ -public abstract class SourcePartition implements Partition { +public abstract class EnhancedSourcePartition implements EnhancedPartition { - private static final Logger LOG = LoggerFactory.getLogger(DefaultEnhancedSourceCoordinator.class); - private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final Logger LOG = LoggerFactory.getLogger(EnhancedSourcePartition.class); + private static final ObjectMapper objectMapper = new ObjectMapper(); private SourcePartitionStoreItem sourcePartitionStoreItem; - protected SourcePartitionStoreItem getSourcePartitionStoreItem() { + public SourcePartitionStoreItem getSourcePartitionStoreItem() { return sourcePartitionStoreItem; } - - protected void setSourcePartitionStoreItem(SourcePartitionStoreItem sourcePartitionStoreItem) { + public void setSourcePartitionStoreItem(SourcePartitionStoreItem sourcePartitionStoreItem) { this.sourcePartitionStoreItem = sourcePartitionStoreItem; } - /** * Helper method to convert progress state. * This is because the state is currently stored as a String in the coordination store. */ - protected T convertStringToPartitionProgressState(Class progressStateClass, final String serializedPartitionProgressState) { + public T convertStringToPartitionProgressState(Class progressStateClass, final String serializedPartitionProgressState) { if (Objects.isNull(serializedPartitionProgressState)) { return null; } try { if (progressStateClass != null) { - return MAPPER.readValue(serializedPartitionProgressState, progressStateClass); + return objectMapper.readValue(serializedPartitionProgressState, progressStateClass); } - return MAPPER.readValue(serializedPartitionProgressState, new TypeReference<>() { + return objectMapper.readValue(serializedPartitionProgressState, new TypeReference<>() { }); } catch (final JsonProcessingException e) { - LOG.error("Unable to convert string to partition progress state class {}: {}", progressStateClass.getName(), e); + LOG.error("Unable to convert string to partition progress state class {}: ", progressStateClass != null ? progressStateClass.getName() : null, e); return null; } } @@ -72,14 +70,14 @@ protected T convertStringToPartitionProgressState(Class progressStateClass, f * Helper method to convert progress state to String * This is because the state is currently stored as a String in the coordination store. */ - protected String convertPartitionProgressStatetoString(Optional partitionProgressState) { + public String convertPartitionProgressStatetoString(Optional partitionProgressState) { if (partitionProgressState.isEmpty()) { return null; } try { - return MAPPER.writeValueAsString(partitionProgressState.get()); + return objectMapper.writeValueAsString(partitionProgressState.get()); } catch (final JsonProcessingException e) { - LOG.error("Unable to convert partition progress state class to string: {}", e); + LOG.error("Unable to convert partition progress state class to string: ", e); return null; } } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/UsesEnhancedSourceCoordination.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/UsesEnhancedSourceCoordination.java new file mode 100644 index 0000000000..46e2309ec3 --- /dev/null +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/UsesEnhancedSourceCoordination.java @@ -0,0 +1,22 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.source.coordinator.enhanced; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; + +import java.util.function.Function; + +public interface UsesEnhancedSourceCoordination { + + /** + * + * @param sourceCoordinator - The {@link EnhancedSourceCoordinator} to be used by the + * {@link org.opensearch.dataprepper.model.source.Source} as needed + */ + void setEnhancedSourceCoordinator(final EnhancedSourceCoordinator sourceCoordinator); + + Function getPartitionFactory(); +} diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourcePartitionTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourcePartitionTest.java new file mode 100644 index 0000000000..36ba2ba63b --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/EnhancedSourcePartitionTest.java @@ -0,0 +1,197 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.source.coordinator.enhanced; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; + +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.mock; + +public class EnhancedSourcePartitionTest { + + private String partitionKey; + private TestPartitionProgressState testPartitionProgressState; + private final ObjectMapper objectMapper = new ObjectMapper(); + + @BeforeEach + void setup() { + partitionKey = UUID.randomUUID().toString(); + testPartitionProgressState = new TestPartitionProgressState(UUID.randomUUID().toString()); + } + + private EnhancedSourcePartition createObjectUnderTest() { + return new TestEnhancedSourcePartition(partitionKey, testPartitionProgressState); + } + + @Test + void set_SourcePartitionStoreItem_sets_item_correctly() { + final EnhancedSourcePartition objectUnderTest = createObjectUnderTest(); + + assertThat(objectUnderTest.getSourcePartitionStoreItem(), equalTo(null)); + + final SourcePartitionStoreItem item = mock(SourcePartitionStoreItem.class); + objectUnderTest.setSourcePartitionStoreItem(item); + assertThat(objectUnderTest.getSourcePartitionStoreItem(), equalTo(item)); + } + + @Test + void convertStringToPartitionState_with_null_state_returns_null() { + final EnhancedSourcePartition objectUnderTest = createObjectUnderTest(); + + assertThat(objectUnderTest.convertStringToPartitionProgressState(TestPartitionProgressState.class, null), equalTo(null)); + } + + @Test + void convertStringToPartitionState_returns_null_when_JsonProcessingException_is_thrown() { + final EnhancedSourcePartition objectUnderTest = createObjectUnderTest(); + + assertThat(objectUnderTest.convertStringToPartitionProgressState(TestPartitionProgressState.class, UUID.randomUUID().toString()), equalTo(null)); + } + + @Test + void convertStringToPartitionState_is_null_when_ClassCastException_is_thrown_with_null_class() { + final EnhancedSourcePartition objectUnderTest = createObjectUnderTest(); + + assertThat(objectUnderTest.convertStringToPartitionProgressState(null, UUID.randomUUID().toString()), equalTo(null)); + } + + @Test + void convertFromStringToPartitionState_converts_as_expected() { + final EnhancedSourcePartition objectUnderTest = createObjectUnderTest(); + + final String serializedString = objectUnderTest.convertPartitionProgressStatetoString(Optional.of(testPartitionProgressState)); + + final TestPartitionProgressState result = objectUnderTest.convertStringToPartitionProgressState(TestPartitionProgressState.class, serializedString); + assertThat(result, notNullValue()); + assertThat(result.getTestValue(), equalTo(testPartitionProgressState.getTestValue())); + } + + @Test + void convertPartitionStateToStringWithEmptyState_returns_null() { + final String result = createObjectUnderTest().convertPartitionProgressStatetoString(Optional.empty()); + assertThat(result, equalTo(null)); + } + + @Test + void convertFromPartitionStateToString_converts() { + final EnhancedSourcePartition objectUnderTest = createObjectUnderTest(); + + final String result = objectUnderTest.convertPartitionProgressStatetoString(Optional.of(testPartitionProgressState)); + + assertThat(result, notNullValue()); + assertThat(result, equalTo("{\"testValue\":\"" + testPartitionProgressState.getTestValue() + "\"}")); + } + + @Test + void convertFromPartitionStateToStringReturns_null_when_JsonProcessingException_is_thrown() { + + final TestInvalidPartitionProgressState invalidPartitionProgressState = new TestInvalidPartitionProgressState(); + final EnhancedSourcePartition objectUnderTest = new TestInvalidEnhancedSourcePartition(UUID.randomUUID().toString(), + invalidPartitionProgressState); + + assertThat(objectUnderTest.convertPartitionProgressStatetoString(Optional.of(invalidPartitionProgressState)), equalTo(null)); + } + + @Test + void convertFromStringToPartitionStateWithPrimitiveType_returns_expected_result() throws JsonProcessingException { + final Map stateMap = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + final EnhancedSourcePartition> objectUnderTest = + new TestMapStringObjectEnhancedSourcePartition(UUID.randomUUID().toString(), stateMap); + + final Map resultWithNullClass = objectUnderTest.convertStringToPartitionProgressState(null, objectMapper.writeValueAsString(stateMap)); + assertThat(resultWithNullClass, notNullValue()); + assertThat(resultWithNullClass, equalTo(stateMap)); + } + + + public class TestEnhancedSourcePartition extends EnhancedSourcePartition { + + private final String partitionKey; + private final TestPartitionProgressState testPartitionProgressState; + + public TestEnhancedSourcePartition(final String partitionKey, final TestPartitionProgressState partitionProgressState) { + this.partitionKey = partitionKey; + this.testPartitionProgressState = partitionProgressState; + } + + @Override + public String getPartitionType() { + return "TEST"; + } + + @Override + public String getPartitionKey() { + return partitionKey; + } + + @Override + public Optional getProgressState() { + return Optional.of(testPartitionProgressState); + } + } + + public class TestInvalidEnhancedSourcePartition extends EnhancedSourcePartition { + + private final String partitionKey; + private final TestInvalidPartitionProgressState testPartitionProgressState; + + public TestInvalidEnhancedSourcePartition(final String partitionKey, final TestInvalidPartitionProgressState partitionProgressState) { + this.partitionKey = partitionKey; + this.testPartitionProgressState = partitionProgressState; + } + + @Override + public String getPartitionType() { + return "TEST"; + } + + @Override + public String getPartitionKey() { + return partitionKey; + } + + @Override + public Optional getProgressState() { + return Optional.of(testPartitionProgressState); + } + } + + public class TestMapStringObjectEnhancedSourcePartition extends EnhancedSourcePartition> { + + private final String partitionKey; + private final Map testPartitionProgressState; + + public TestMapStringObjectEnhancedSourcePartition(final String partitionKey, final Map partitionProgressState) { + this.partitionKey = partitionKey; + this.testPartitionProgressState = partitionProgressState; + } + + @Override + public String getPartitionType() { + return "TEST"; + } + + @Override + public String getPartitionKey() { + return partitionKey; + } + + @Override + public Optional> getProgressState() { + return Optional.of(testPartitionProgressState); + } + } +} diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/TestInvalidPartitionProgressState.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/TestInvalidPartitionProgressState.java new file mode 100644 index 0000000000..85e27f2d47 --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/TestInvalidPartitionProgressState.java @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.source.coordinator.enhanced; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class TestInvalidPartitionProgressState { + + @JsonProperty("invalid") + private String x; + + @JsonProperty("invalid") + private String y; +} diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/TestPartitionProgressState.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/TestPartitionProgressState.java new file mode 100644 index 0000000000..ebfb091649 --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/source/coordinator/enhanced/TestPartitionProgressState.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.source.coordinator.enhanced; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class TestPartitionProgressState { + + @JsonProperty("testValue") + private String testValue; + + public TestPartitionProgressState(@JsonProperty("testValue") final String testValue) { + this.testValue = testValue; + } + + public String getTestValue() { + return testValue; + } + + public void setTestValue(final String testValue) { this.testValue = testValue; } +} \ No newline at end of file diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java index 11553c89fa..0bd5d469ca 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java @@ -15,7 +15,11 @@ import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; import org.opensearch.dataprepper.model.source.coordinator.UsesSourceCoordination; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.UsesEnhancedSourceCoordination; import org.opensearch.dataprepper.parser.DataFlowComponent; import org.opensearch.dataprepper.pipeline.common.PipelineThreadFactory; import org.opensearch.dataprepper.pipeline.common.PipelineThreadPoolExecutor; @@ -38,6 +42,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.stream.Collectors; import static java.lang.String.format; @@ -236,6 +241,10 @@ public void execute() { final Class partionProgressModelClass = ((UsesSourceCoordination) source).getPartitionProgressStateClass(); final SourceCoordinator sourceCoordinator = sourceCoordinatorFactory.provideSourceCoordinator(partionProgressModelClass, name); ((UsesSourceCoordination) source).setSourceCoordinator(sourceCoordinator); + } else if (source instanceof UsesEnhancedSourceCoordination) { + final Function partitionFactory = ((UsesEnhancedSourceCoordination) source).getPartitionFactory(); + final EnhancedSourceCoordinator enhancedSourceCoordinator = sourceCoordinatorFactory.provideEnhancedSourceCoordinator(partitionFactory, name); + ((UsesEnhancedSourceCoordination) source).setEnhancedSourceCoordinator(enhancedSourceCoordinator); } sinkExecutorService.submit(() -> { diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/SourceCoordinatorFactory.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/SourceCoordinatorFactory.java index 02722da9c1..ad9eddf64a 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/SourceCoordinatorFactory.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/SourceCoordinatorFactory.java @@ -9,10 +9,16 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.model.source.SourceCoordinationStore; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.parser.model.SourceCoordinationConfig; +import org.opensearch.dataprepper.sourcecoordination.enhanced.EnhancedLeaseBasedSourceCoordinator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.function.Function; + /** * A factory class that will create the {@link SourceCoordinator} implementation based on the * source_coordination configuration @@ -49,4 +55,22 @@ public SourceCoordinator provideSourceCoordinator(final Class clazz, f sourceCoordinationConfig.getSourceCoordinationStoreConfig().getName(), subPipelineName); return new LeaseBasedSourceCoordinator(clazz, sourceCoordinationStore, sourceCoordinationConfig, new PartitionManager<>(), subPipelineName, sourceCoordinatorMetrics); } + + public EnhancedSourceCoordinator provideEnhancedSourceCoordinator(final Function partitionFactory, + final String subPipelineName) { + if (sourceCoordinationConfig == null + || sourceCoordinationConfig.getSourceCoordinationStoreConfig() == null + || sourceCoordinationConfig.getSourceCoordinationStoreConfig().getName() == null) { + return null; + } + + final SourceCoordinationStore sourceCoordinationStore = + pluginFactory.loadPlugin(SourceCoordinationStore.class, sourceCoordinationConfig.getSourceCoordinationStoreConfig()); + + final PluginMetrics sourceCoordinatorMetrics = PluginMetrics.fromNames(SOURCE_COORDINATOR_PLUGIN_NAME_FOR_METRICS, subPipelineName); + + LOG.info("Creating EnhancedLeaseBasedSourceCoordinator with coordination store {} for sub-pipeline {}", + sourceCoordinationConfig.getSourceCoordinationStoreConfig().getName(), subPipelineName); + return new EnhancedLeaseBasedSourceCoordinator(sourceCoordinationStore, sourceCoordinationConfig, sourceCoordinatorMetrics, subPipelineName, partitionFactory); + } } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/DefaultEnhancedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java similarity index 78% rename from data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/DefaultEnhancedSourceCoordinator.java rename to data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java index 743a9c1a81..686ec4e2ef 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/DefaultEnhancedSourceCoordinator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java @@ -3,12 +3,15 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.dynamodb.coordination; +package org.opensearch.dataprepper.sourcecoordination.enhanced; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.source.SourceCoordinationStore; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStatus; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.InitPartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.parser.model.SourceCoordinationConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,9 +27,9 @@ /** * An implemetation of {@link EnhancedSourceCoordinator} backend by {@link SourceCoordinationStore} */ -public class DefaultEnhancedSourceCoordinator implements EnhancedSourceCoordinator { +public class EnhancedLeaseBasedSourceCoordinator implements EnhancedSourceCoordinator { - private static final Logger LOG = LoggerFactory.getLogger(DefaultEnhancedSourceCoordinator.class); + private static final Logger LOG = LoggerFactory.getLogger(EnhancedLeaseBasedSourceCoordinator.class); /** * Default time out duration for lease. @@ -51,9 +54,11 @@ public class DefaultEnhancedSourceCoordinator implements EnhancedSourceCoordinat /** * In order to support different types of partitions. - * A custom factory is required to map a {@link SourcePartitionStoreItem} to a {@link SourcePartition} + * A custom factory is required to map a {@link SourcePartitionStoreItem} to a {@link EnhancedSourcePartition} */ - private final Function partitionFactory; + private final Function partitionFactory; + + private final PluginMetrics pluginMetrics; /** * Use host name of the node as the default ownerId @@ -70,21 +75,26 @@ public class DefaultEnhancedSourceCoordinator implements EnhancedSourceCoordinat } - public DefaultEnhancedSourceCoordinator(final SourceCoordinationStore coordinationStore, String sourceIdentifier, Function partitionFactory) { + public EnhancedLeaseBasedSourceCoordinator(final SourceCoordinationStore coordinationStore, + final SourceCoordinationConfig sourceCoordinationConfig, + final PluginMetrics pluginMetrics, + final String sourceIdentifier, + final Function partitionFactory) { this.coordinationStore = coordinationStore; - this.sourceIdentifier = sourceIdentifier; + this.sourceIdentifier = Objects.nonNull(sourceCoordinationConfig.getPartitionPrefix()) ? + sourceCoordinationConfig.getPartitionPrefix() + "|" + sourceIdentifier : + sourceIdentifier; + this.pluginMetrics = pluginMetrics; this.partitionFactory = partitionFactory; - } @Override public void initialize() { coordinationStore.initializeStore(); - createPartition(new InitPartition()); } @Override - public boolean createPartition(SourcePartition partition) { + public boolean createPartition(EnhancedSourcePartition partition) { String partitionType = partition.getPartitionType() == null ? DEFAULT_GLOBAL_STATE_PARTITION_TYPE : partition.getPartitionType(); // Don't need the status for Global state which is not for lease. SourcePartitionStatus status = partition.getPartitionType() == null ? null : SourcePartitionStatus.UNASSIGNED; @@ -102,7 +112,7 @@ public boolean createPartition(SourcePartition partition) { @Override - public Optional acquireAvailablePartition(String partitionType) { + public Optional acquireAvailablePartition(String partitionType) { // Not available for global state. Objects.nonNull(partitionType); LOG.debug("Try to acquire an available {} partition", partitionType); @@ -117,7 +127,7 @@ public Optional acquireAvailablePartition(String partitionType) @Override - public void saveProgressStateForPartition(SourcePartition partition) { + public void saveProgressStateForPartition(EnhancedSourcePartition partition) { String partitionType = partition.getPartitionType() == null ? DEFAULT_GLOBAL_STATE_PARTITION_TYPE : partition.getPartitionType(); LOG.debug("Try to save progress for partition {} (Type {})", partition.getPartitionKey(), partitionType); @@ -139,7 +149,7 @@ public void saveProgressStateForPartition(SourcePartition partition) { } @Override - public void giveUpPartition(SourcePartition partition) { + public void giveUpPartition(EnhancedSourcePartition partition) { Objects.nonNull(partition.getPartitionType()); LOG.debug("Try to give up the ownership for partition {} (Type {})", partition.getPartitionKey(), partition.getPartitionType()); @@ -162,7 +172,7 @@ public void giveUpPartition(SourcePartition partition) { } @Override - public void completePartition(SourcePartition partition) { + public void completePartition(EnhancedSourcePartition partition) { Objects.nonNull(partition.getPartitionType()); LOG.debug("Try to complete partition {} (Type {})", partition.getPartitionKey(), partition.getPartitionType()); @@ -185,7 +195,7 @@ public void completePartition(SourcePartition partition) { } @Override - public void closePartition(SourcePartition partition, final Duration reopenAfter, final int maxClosedCount) { + public void closePartition(EnhancedSourcePartition partition, final Duration reopenAfter, final int maxClosedCount) { Objects.nonNull(partition.getPartitionType()); @@ -217,7 +227,7 @@ public void closePartition(SourcePartition partition, final Duration reop @Override - public Optional getPartition(String partitionKey) { + public Optional getPartition(String partitionKey) { // Default to Global State only. final Optional sourceItem = coordinationStore.getSourcePartitionItem(this.sourceIdentifier + "|" + DEFAULT_GLOBAL_STATE_PARTITION_TYPE, partitionKey); if (!sourceItem.isPresent()) { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java index 1e56de1d4b..2fc86b80eb 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/PipelineTests.java @@ -9,28 +9,31 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PluginSetting; -import org.opensearch.dataprepper.model.processor.Processor; -import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.Sink; -import org.opensearch.dataprepper.model.source.coordinator.UsesSourceCoordination; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.UsesSourceCoordination; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.UsesEnhancedSourceCoordination; import org.opensearch.dataprepper.parser.DataFlowComponent; import org.opensearch.dataprepper.pipeline.common.FutureHelper; import org.opensearch.dataprepper.pipeline.common.TestProcessor; import org.opensearch.dataprepper.pipeline.router.Router; +import org.opensearch.dataprepper.pipeline.router.RouterCopyRecordStrategy; import org.opensearch.dataprepper.pipeline.router.RouterGetRecordStrategy; import org.opensearch.dataprepper.plugins.TestSink; import org.opensearch.dataprepper.plugins.TestSource; import org.opensearch.dataprepper.plugins.TestSourceWithCoordination; +import org.opensearch.dataprepper.plugins.TestSourceWithEnhancedCoordination; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; -import org.opensearch.dataprepper.pipeline.router.RouterCopyRecordStrategy; -import org.opensearch.dataprepper.model.event.EventFactory; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactory; import java.time.Duration; @@ -48,13 +51,13 @@ import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.ArgumentMatchers.eq; @@ -328,6 +331,30 @@ void testExecuteOnSourceWithRequiredSourceCoordination_sets_source_coordinator() } } + @Test + void testExecuteOnSourceWithRequiredEnhancedSourceCoordination_sets_enhanced_source_coordinator() { + final Source> testSource = new TestSourceWithEnhancedCoordination(); + final Source> sourceSpy = spy(testSource); + + final Sink> testSink = new TestSink(); + final DataFlowComponent sinkDataFlowComponent = mock(DataFlowComponent.class); + + final EnhancedSourceCoordinator sourceCoordinator = mock(EnhancedSourceCoordinator.class); + given(sourceCoordinatorFactory.provideEnhancedSourceCoordinator(((TestSourceWithEnhancedCoordination)testSource).getPartitionFactory(), UUID.randomUUID().toString())).willReturn(sourceCoordinator); + when(sinkDataFlowComponent.getComponent()).thenReturn(testSink); + try { + testPipeline = new Pipeline(TEST_PIPELINE_NAME, testSource, new BlockingBuffer(TEST_PIPELINE_NAME), + Collections.emptyList(), Collections.singletonList(sinkDataFlowComponent), router, eventFactory, + acknowledgementSetManager, sourceCoordinatorFactory, TEST_PROCESSOR_THREADS, TEST_READ_BATCH_TIMEOUT, + processorShutdownTimeout, sinkShutdownTimeout, peerForwarderDrainTimeout); + testPipeline.execute(); + Thread.sleep(TEST_READ_BATCH_TIMEOUT); + } catch (final InterruptedException e) { + verify((UsesEnhancedSourceCoordination)sourceSpy).getPartitionFactory(); + verify((UsesEnhancedSourceCoordination)sourceSpy).setEnhancedSourceCoordinator(sourceCoordinator); + } + } + @Test void testGetSource() { final Source> testSource = new TestSource(); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestSourceWithEnhancedCoordination.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestSourceWithEnhancedCoordination.java new file mode 100644 index 0000000000..1dd4625274 --- /dev/null +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/TestSourceWithEnhancedCoordination.java @@ -0,0 +1,74 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins; + +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.UsesEnhancedSourceCoordination; +import org.opensearch.dataprepper.sourcecoordination.SourceCoordinatorFactoryTest; + +import java.util.Iterator; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@DataPrepperPlugin(name = "test_source", pluginType = Source.class) +public class TestSourceWithEnhancedCoordination implements Source>, UsesEnhancedSourceCoordination { + public static final List> TEST_DATA = Stream.of("TEST") + .map(Record::new).collect(Collectors.toList()); + private boolean isStopRequested; + private boolean failSourceForTest; + + private EnhancedSourceCoordinator sourceCoordinator; + + public TestSourceWithEnhancedCoordination() { + this.isStopRequested = false; + this.failSourceForTest = false; + } + + public TestSourceWithEnhancedCoordination(final boolean failSourceForTest) { + this.isStopRequested = false; + this.failSourceForTest = failSourceForTest; + } + + @Override + public void start(Buffer> buffer) { + if(failSourceForTest) { + throw new RuntimeException("Source is expected to fail"); + } + final Iterator> iterator = TEST_DATA.iterator(); + while (iterator.hasNext() && !isStopRequested) { + try { + buffer.write(iterator.next(), 1_000); + } catch (TimeoutException e) { + throw new RuntimeException("Timed out writing to buffer"); + } + } + } + + @Override + public void stop() { + isStopRequested = true; + } + + @Override + public void setEnhancedSourceCoordinator(EnhancedSourceCoordinator sourceCoordinator) { + this.sourceCoordinator = sourceCoordinator; + } + + @Override + public Function getPartitionFactory() { + return (item) -> new SourceCoordinatorFactoryTest.TestEnhancedSourcePartition(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/SourceCoordinatorFactoryTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/SourceCoordinatorFactoryTest.java index 8bca669f93..2e6d39dfd1 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/SourceCoordinatorFactoryTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/SourceCoordinatorFactoryTest.java @@ -13,8 +13,11 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.source.SourceCoordinationStore; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.parser.model.SourceCoordinationConfig; +import java.util.Optional; import java.util.UUID; import static org.hamcrest.MatcherAssert.assertThat; @@ -37,13 +40,19 @@ private SourceCoordinatorFactory createObjectUnderTest() { @Test void provideSourceCoordinatorWithNullSourceCoordinationConfig_returns_null() { - - final SourceCoordinator sourceCoordinator = createObjectUnderTest().provideSourceCoordinator(String.class, UUID.randomUUID().toString()); assertThat(sourceCoordinator, nullValue()); } + @Test + void provideEnhancedSourceCoordinatorWithNullSourceCoordinationConfig_returns_null() { + final EnhancedSourceCoordinator enhancedSourceCoordinator = createObjectUnderTest().provideEnhancedSourceCoordinator( + (item) -> new TestEnhancedSourcePartition(UUID.randomUUID().toString(), UUID.randomUUID().toString()), UUID.randomUUID().toString()); + + assertThat(enhancedSourceCoordinator, nullValue()); + } + @Test void provideSourceCoordinatorWithNullSourceCoordinationStoreConfig_returns_null() { given(sourceCoordinationConfig.getSourceCoordinationStoreConfig()).willReturn(null); @@ -53,6 +62,16 @@ void provideSourceCoordinatorWithNullSourceCoordinationStoreConfig_returns_null( assertThat(sourceCoordinator, nullValue()); } + @Test + void provideEnhancedSourceCoordinatorWithNullSourceCoordinationStoreConfig_returns_null() { + given(sourceCoordinationConfig.getSourceCoordinationStoreConfig()).willReturn(null); + + final EnhancedSourceCoordinator sourceCoordinator = createObjectUnderTest().provideEnhancedSourceCoordinator( + (item) -> new TestEnhancedSourcePartition(UUID.randomUUID().toString(), UUID.randomUUID().toString()), UUID.randomUUID().toString()); + + assertThat(sourceCoordinator, nullValue()); + } + @Test @@ -66,6 +85,17 @@ void provideSourceCoordinatorWith_no_name_no_store_name_returns_null() { assertThat(sourceCoordinator, nullValue()); } + @Test + void provideEnhancedSourceCoordinatorWith_no_name_no_store_name_returns_null() { + final PluginSetting pluginSetting = mock(PluginSetting.class); + given(sourceCoordinationConfig.getSourceCoordinationStoreConfig()).willReturn(pluginSetting); + given(pluginSetting.getName()).willReturn(null); + + final EnhancedSourceCoordinator sourceCoordinator = createObjectUnderTest().provideEnhancedSourceCoordinator( + (item) -> new TestEnhancedSourcePartition(UUID.randomUUID().toString(), UUID.randomUUID().toString()), UUID.randomUUID().toString()); + assertThat(sourceCoordinator, nullValue()); + } + @Test void provideSourceCoordinator_loads_expected_plugin_from_plugin_factory() { final String pluginName = UUID.randomUUID().toString(); @@ -81,4 +111,46 @@ void provideSourceCoordinator_loads_expected_plugin_from_plugin_factory() { assertThat(actualSourceCoordinator, notNullValue()); } + + @Test + void provideEnhancedSourceCoordinator_loads_expected_plugin_from_plugin_factory() { + final String pluginName = UUID.randomUUID().toString(); + final PluginSetting pluginSetting = mock(PluginSetting.class); + given(sourceCoordinationConfig.getSourceCoordinationStoreConfig()).willReturn(pluginSetting); + given(pluginSetting.getName()).willReturn(pluginName); + + final SourceCoordinationStore expectedSourceCoordinationStore = mock(SourceCoordinationStore.class); + + given(pluginFactory.loadPlugin(SourceCoordinationStore.class, pluginSetting)).willReturn(expectedSourceCoordinationStore); + + final EnhancedSourceCoordinator actualSourceCoordinator = createObjectUnderTest().provideEnhancedSourceCoordinator( + (item) -> new TestEnhancedSourcePartition(UUID.randomUUID().toString(), UUID.randomUUID().toString()), UUID.randomUUID().toString()); + assertThat(actualSourceCoordinator, notNullValue()); + } + + public static class TestEnhancedSourcePartition extends EnhancedSourcePartition { + + private final String partitionKey; + private final String testPartitionProgressState; + + public TestEnhancedSourcePartition(final String partitionKey, final String partitionProgressState) { + this.partitionKey = partitionKey; + this.testPartitionProgressState = partitionProgressState; + } + + @Override + public String getPartitionType() { + return "TEST"; + } + + @Override + public String getPartitionKey() { + return partitionKey; + } + + @Override + public Optional getProgressState() { + return Optional.of(testPartitionProgressState); + } + } } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/DefaultCoordinatorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinatorTest.java similarity index 74% rename from data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/DefaultCoordinatorTest.java rename to data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinatorTest.java index 1883e16252..4e6e40c57a 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/DefaultCoordinatorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinatorTest.java @@ -3,16 +3,19 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.source.dynamodb.coordination; +package org.opensearch.dataprepper.sourcecoordination.enhanced; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.source.SourceCoordinationStore; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStatus; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.parser.model.SourceCoordinationConfig; import java.time.Duration; import java.util.Optional; @@ -30,19 +33,25 @@ import static org.mockito.Mockito.verify; @ExtendWith(MockitoExtension.class) -class DefaultCoordinatorTest { +class EnhancedLeaseBasedSourceCoordinatorTest { @Mock private SourceCoordinationStore sourceCoordinationStore; + @Mock + private SourceCoordinationConfig sourceCoordinationConfig; + @Mock private SourcePartitionStoreItem sourcePartitionStoreItem; + @Mock + private PluginMetrics pluginMetrics; + private String sourceIdentifier; - private DefaultEnhancedSourceCoordinator coordinator; + private EnhancedLeaseBasedSourceCoordinator coordinator; - private final String DEFAULT_PARTITION_TYPE = "TEST"; + private static final String DEFAULT_PARTITION_TYPE = "TEST"; @BeforeEach @@ -52,18 +61,18 @@ void setup() { lenient().when(sourcePartitionStoreItem.getSourceIdentifier()).thenReturn(sourceIdentifier + "|" + DEFAULT_PARTITION_TYPE); } - private DefaultEnhancedSourceCoordinator createObjectUnderTest() { - DefaultEnhancedSourceCoordinator coordinator = new DefaultEnhancedSourceCoordinator(sourceCoordinationStore, sourceIdentifier, sourcePartitionStoreItem -> new TestPartition(sourcePartitionStoreItem)); + private EnhancedLeaseBasedSourceCoordinator createObjectUnderTest() { + EnhancedLeaseBasedSourceCoordinator coordinator = new EnhancedLeaseBasedSourceCoordinator(sourceCoordinationStore, sourceCoordinationConfig, pluginMetrics, sourceIdentifier, sourcePartitionStoreItem -> new TestEnhancedSourcePartition(sourcePartitionStoreItem)); return coordinator; } - class TestPartition extends SourcePartition { + static class TestEnhancedSourcePartition extends EnhancedSourcePartition { private final String partitionType; private final String partitionKey; - public TestPartition(SourcePartitionStoreItem sourcePartitionStoreItem) { + public TestEnhancedSourcePartition(SourcePartitionStoreItem sourcePartitionStoreItem) { setSourcePartitionStoreItem(sourcePartitionStoreItem); String[] split = sourcePartitionStoreItem.getSourceIdentifier().split("\\|"); @@ -75,7 +84,7 @@ public TestPartition(SourcePartitionStoreItem sourcePartitionStoreItem) { this.partitionKey = sourcePartitionStoreItem.getSourcePartitionKey(); } - public TestPartition(boolean isGlobal) { + public TestEnhancedSourcePartition(boolean isGlobal) { this.partitionType = isGlobal ? null : DEFAULT_PARTITION_TYPE; partitionKey = UUID.randomUUID().toString(); } @@ -110,12 +119,12 @@ void test_initialize_should_run_correctly() { void test_createPartition() { coordinator = createObjectUnderTest(); // A normal type. - TestPartition partition = new TestPartition(false); + TestEnhancedSourcePartition partition = new TestEnhancedSourcePartition(false); coordinator.createPartition(partition); verify(sourceCoordinationStore).tryCreatePartitionItem(eq(sourceIdentifier + "|" + DEFAULT_PARTITION_TYPE), anyString(), eq(SourcePartitionStatus.UNASSIGNED), anyLong(), eq(null)); // GlobalState. - TestPartition globalState = new TestPartition(true); + TestEnhancedSourcePartition globalState = new TestEnhancedSourcePartition(true); coordinator.createPartition(globalState); verify(sourceCoordinationStore).tryCreatePartitionItem(eq(sourceIdentifier + "|GLOBAL"), anyString(), eq(null), anyLong(), eq(null)); @@ -129,13 +138,13 @@ void test_acquireAvailablePartition_should_run_correctly() { .willReturn(Optional.empty()); coordinator = createObjectUnderTest(); - Optional sourcePartition = coordinator.acquireAvailablePartition(DEFAULT_PARTITION_TYPE); + Optional sourcePartition = coordinator.acquireAvailablePartition(DEFAULT_PARTITION_TYPE); assertThat(sourcePartition.isPresent(), equalTo(true)); - Optional sourcePartition2 = coordinator.acquireAvailablePartition(DEFAULT_PARTITION_TYPE); + Optional sourcePartition2 = coordinator.acquireAvailablePartition(DEFAULT_PARTITION_TYPE); assertThat(sourcePartition2.isPresent(), equalTo(true)); - Optional sourcePartition3 = coordinator.acquireAvailablePartition(DEFAULT_PARTITION_TYPE); + Optional sourcePartition3 = coordinator.acquireAvailablePartition(DEFAULT_PARTITION_TYPE); assertThat(sourcePartition3.isPresent(), equalTo(false)); verify(sourceCoordinationStore, times(3)).tryAcquireAvailablePartition(anyString(), anyString(), any(Duration.class)); @@ -151,9 +160,9 @@ void test_saveProgressStateForPartition() { .willReturn(Optional.empty()); coordinator = createObjectUnderTest(); - Optional sourcePartition = coordinator.acquireAvailablePartition(DEFAULT_PARTITION_TYPE); + Optional sourcePartition = coordinator.acquireAvailablePartition(DEFAULT_PARTITION_TYPE); assertThat(sourcePartition.isPresent(), equalTo(true)); - TestPartition partition = (TestPartition) sourcePartition.get(); + TestEnhancedSourcePartition partition = (TestEnhancedSourcePartition) sourcePartition.get(); coordinator.saveProgressStateForPartition(partition); verify(sourceCoordinationStore).tryAcquireAvailablePartition(anyString(), anyString(), any(Duration.class)); @@ -167,9 +176,9 @@ void test_giveUpPartition() { coordinator = createObjectUnderTest(); - Optional sourcePartition = coordinator.acquireAvailablePartition(DEFAULT_PARTITION_TYPE); + Optional sourcePartition = coordinator.acquireAvailablePartition(DEFAULT_PARTITION_TYPE); assertThat(sourcePartition.isPresent(), equalTo(true)); - TestPartition partition = (TestPartition) sourcePartition.get(); + TestEnhancedSourcePartition partition = (TestEnhancedSourcePartition) sourcePartition.get(); coordinator.giveUpPartition(partition); @@ -186,9 +195,9 @@ void test_completePartition() { given(sourceCoordinationStore.tryAcquireAvailablePartition(anyString(), anyString(), any())).willReturn(Optional.of(sourcePartitionStoreItem)); coordinator = createObjectUnderTest(); - Optional sourcePartition = coordinator.acquireAvailablePartition(DEFAULT_PARTITION_TYPE); + Optional sourcePartition = coordinator.acquireAvailablePartition(DEFAULT_PARTITION_TYPE); assertThat(sourcePartition.isPresent(), equalTo(true)); - TestPartition partition = (TestPartition) sourcePartition.get(); + TestEnhancedSourcePartition partition = (TestEnhancedSourcePartition) sourcePartition.get(); coordinator.completePartition(partition); @@ -206,9 +215,9 @@ void test_closePartition() { given(sourceCoordinationStore.tryAcquireAvailablePartition(anyString(), anyString(), any())).willReturn(Optional.of(sourcePartitionStoreItem)); coordinator = createObjectUnderTest(); - Optional sourcePartition = coordinator.acquireAvailablePartition(DEFAULT_PARTITION_TYPE); + Optional sourcePartition = coordinator.acquireAvailablePartition(DEFAULT_PARTITION_TYPE); assertThat(sourcePartition.isPresent(), equalTo(true)); - TestPartition partition = (TestPartition) sourcePartition.get(); + TestEnhancedSourcePartition partition = (TestEnhancedSourcePartition) sourcePartition.get(); coordinator.closePartition(partition, Duration.ofMinutes(10), 1); verify(sourcePartitionStoreItem).setSourcePartitionStatus(SourcePartitionStatus.CLOSED); @@ -225,7 +234,7 @@ void getPartition() { String partitionKey = UUID.randomUUID().toString(); given(sourceCoordinationStore.getSourcePartitionItem(eq(sourceIdentifier + "|GLOBAL"), eq(partitionKey))).willReturn(Optional.of(sourcePartitionStoreItem)); coordinator = createObjectUnderTest(); - Optional sourcePartition = coordinator.getPartition(partitionKey); + Optional sourcePartition = coordinator.getPartition(partitionKey); assertThat(sourcePartition.isPresent(), equalTo(true)); } } \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/build.gradle b/data-prepper-plugins/dynamodb-source/build.gradle index c6f53b9da0..71b8dc2afc 100644 --- a/data-prepper-plugins/dynamodb-source/build.gradle +++ b/data-prepper-plugins/dynamodb-source/build.gradle @@ -9,8 +9,6 @@ repositories { dependencies { implementation project(path: ':data-prepper-api') - implementation project(path: ':data-prepper-core') - implementation project(path: ':data-prepper-plugins:dynamodb-source-coordination-store') implementation libs.armeria.core implementation 'io.micrometer:micrometer-core' diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/ClientFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/ClientFactory.java index 867903e8e9..7d72049a6a 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/ClientFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/ClientFactory.java @@ -16,6 +16,7 @@ public class ClientFactory { private final AwsCredentialsProvider awsCredentialsProvider; + private final AwsAuthenticationConfig awsAuthenticationConfig; public ClientFactory(AwsCredentialsSupplier awsCredentialsSupplier, AwsAuthenticationConfig awsAuthenticationConfig) { awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder() @@ -24,18 +25,21 @@ public ClientFactory(AwsCredentialsSupplier awsCredentialsSupplier, AwsAuthentic .withStsExternalId(awsAuthenticationConfig.getAwsStsExternalId()) .withStsHeaderOverrides(awsAuthenticationConfig.getAwsStsHeaderOverrides()) .build()); + this.awsAuthenticationConfig = awsAuthenticationConfig; } public DynamoDbStreamsClient buildDynamoDbStreamClient() { return DynamoDbStreamsClient.builder() .credentialsProvider(awsCredentialsProvider) + .region(awsAuthenticationConfig.getAwsRegion()) .build(); } public DynamoDbClient buildDynamoDBClient() { return DynamoDbClient.builder() + .region(awsAuthenticationConfig.getAwsRegion()) .credentialsProvider(awsCredentialsProvider) .build(); } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java index e27d0ded70..0e41403e31 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java @@ -10,9 +10,9 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.TableConfig; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.SourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.ExportPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.InitPartition; @@ -131,7 +131,7 @@ public void shutdown() { public void init() { LOG.info("Start initialize DynamoDB service"); - final Optional initPartition = coordinator.acquireAvailablePartition(InitPartition.PARTITION_TYPE); + final Optional initPartition = coordinator.acquireAvailablePartition(InitPartition.PARTITION_TYPE); if (initPartition.isEmpty()) { // Already initialized. Do nothing. return; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java index 16ca5153f9..ef35c82825 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java @@ -15,33 +15,34 @@ import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; -import org.opensearch.dataprepper.model.source.SourceCoordinationStore; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.DefaultEnhancedSourceCoordinator; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.UsesEnhancedSourceCoordination; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.PartitionFactory; +import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.InitPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Objects; +import java.util.function.Function; @DataPrepperPlugin(name = "dynamodb", pluginType = Source.class, pluginConfigurationType = DynamoDBSourceConfig.class) -public class DynamoDBSource implements Source> { +public class DynamoDBSource implements Source>, UsesEnhancedSourceCoordination { private static final Logger LOG = LoggerFactory.getLogger(DynamoDBSource.class); - private static final String SOURCE_COORDINATOR_METRIC_PREFIX = "source-coordinator"; - private final PluginMetrics pluginMetrics; private final DynamoDBSourceConfig sourceConfig; private final PluginFactory pluginFactory; - private final SourceCoordinationStore coordinationStore; + private final ClientFactory clientFactory; - private final EnhancedSourceCoordinator coordinator; + private EnhancedSourceCoordinator coordinator; - private final DynamoDBService dynamoDBService; + private DynamoDBService dynamoDBService; @DataPrepperPluginConstructor @@ -51,27 +52,19 @@ public DynamoDBSource(PluginMetrics pluginMetrics, final DynamoDBSourceConfig so this.sourceConfig = sourceConfig; this.pluginFactory = pluginFactory; + clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig()); + } - // Load Coordination Store via PluginFactory - // This part will be updated. - PluginSetting sourceCoordinationStoreSetting = new PluginSetting(sourceConfig.getCoordinationStoreConfig().getPluginName(), sourceConfig.getCoordinationStoreConfig().getPluginSettings()); - sourceCoordinationStoreSetting.setPipelineName(SOURCE_COORDINATOR_METRIC_PREFIX); - coordinationStore = pluginFactory.loadPlugin(SourceCoordinationStore.class, sourceCoordinationStoreSetting); - String pipelineName = pluginSetting.getPipelineName(); - - // Create and initialize coordinator - coordinator = new DefaultEnhancedSourceCoordinator(coordinationStore, pipelineName, new PartitionFactory()); - coordinator.initialize(); + @Override + public void start(Buffer> buffer) { + Objects.requireNonNull(coordinator); - ClientFactory clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig()); + coordinator.createPartition(new InitPartition()); // Create DynamoDB Service dynamoDBService = new DynamoDBService(coordinator, clientFactory, sourceConfig, pluginMetrics); dynamoDBService.init(); - } - @Override - public void start(Buffer> buffer) { LOG.info("Start DynamoDB service"); dynamoDBService.start(buffer); } @@ -86,4 +79,14 @@ public void stop() { } + @Override + public void setEnhancedSourceCoordinator(final EnhancedSourceCoordinator sourceCoordinator) { + coordinator = sourceCoordinator; + coordinator.initialize(); + } + + @Override + public Function getPartitionFactory() { + return new PartitionFactory(); + } } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java index 379f837767..6e4a8fe7ae 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfig.java @@ -8,7 +8,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; -import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.AwsAuthenticationConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.TableConfig; @@ -28,11 +27,6 @@ public class DynamoDBSourceConfig { @Valid private AwsAuthenticationConfig awsAuthenticationConfig; - - @JsonProperty("coordinator") - private PluginModel coordinationStoreConfig; - - public DynamoDBSourceConfig() { } @@ -44,9 +38,4 @@ public List getTableConfigs() { public AwsAuthenticationConfig getAwsAuthenticationConfig() { return awsAuthenticationConfig; } - - public PluginModel getCoordinationStoreConfig() { - return coordinationStoreConfig; - } - } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/PartitionFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/PartitionFactory.java index 1d154eb17e..74ca186004 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/PartitionFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/PartitionFactory.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.coordination; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.ExportPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState; @@ -17,11 +18,11 @@ /** * Special partition factory just for this DynamoDB source. */ -public class PartitionFactory implements Function { +public class PartitionFactory implements Function { @Override - public SourcePartition apply(SourcePartitionStoreItem partitionStoreItem) { + public EnhancedSourcePartition apply(SourcePartitionStoreItem partitionStoreItem) { String sourceIdentifier = partitionStoreItem.getSourceIdentifier(); String partitionType = sourceIdentifier.substring(sourceIdentifier.lastIndexOf('|') + 1); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/DataFilePartition.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/DataFilePartition.java index bbf35374f4..63222b0d22 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/DataFilePartition.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/DataFilePartition.java @@ -6,7 +6,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.SourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.DataFileProgressState; import java.util.Optional; @@ -15,7 +15,7 @@ * An DataFilePartition represents an export data file needs to be loaded. * The source identifier contains keyword 'DATAFILE' */ -public class DataFilePartition extends SourcePartition { +public class DataFilePartition extends EnhancedSourcePartition { public static final String PARTITION_TYPE = "DATAFILE"; @@ -26,6 +26,7 @@ public class DataFilePartition extends SourcePartition { private final DataFileProgressState state; public DataFilePartition(SourcePartitionStoreItem sourcePartitionStoreItem) { + setSourcePartitionStoreItem(sourcePartitionStoreItem); String[] keySplits = sourcePartitionStoreItem.getSourcePartitionKey().split("\\|"); exportArn = keySplits[0]; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/ExportPartition.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/ExportPartition.java index a7da8ac225..c41cfcb728 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/ExportPartition.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/ExportPartition.java @@ -6,7 +6,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.SourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.ExportProgressState; import java.time.Instant; @@ -18,7 +18,7 @@ * Each job maintains the state such as total files/records etc. independently. * The source identifier contains keyword 'EXPORT' */ -public class ExportPartition extends SourcePartition { +public class ExportPartition extends EnhancedSourcePartition { public static final String PARTITION_TYPE = "EXPORT"; private final String tableArn; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/GlobalState.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/GlobalState.java index 4b45dd10f4..ade0fc8314 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/GlobalState.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/GlobalState.java @@ -6,7 +6,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.SourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import java.util.Map; import java.util.Optional; @@ -17,7 +17,7 @@ * However, you can read and update Global State whenever required. * The progress state is a Map object. */ -public class GlobalState extends SourcePartition> { +public class GlobalState extends EnhancedSourcePartition> { private final String stateName; @@ -27,7 +27,6 @@ public GlobalState(SourcePartitionStoreItem sourcePartitionStoreItem) { setSourcePartitionStoreItem(sourcePartitionStoreItem); this.stateName = sourcePartitionStoreItem.getSourcePartitionKey(); this.state = convertStringToPartitionProgressState(null, sourcePartitionStoreItem.getPartitionProgressState()); - } public GlobalState(String stateName, Optional> state) { diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/InitPartition.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/InitPartition.java index c2c7008bee..2df0e59fe5 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/InitPartition.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/InitPartition.java @@ -6,7 +6,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.SourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.EmptyProgressState; import java.util.Optional; @@ -17,7 +17,7 @@ * The downside is that after initialization, changes to configuration will be ignored. * The source identifier contains keyword 'INIT' */ -public class InitPartition extends SourcePartition { +public class InitPartition extends EnhancedSourcePartition { public static final String PARTITION_TYPE = "INIT"; private static final String DEFAULT_PARTITION_KEY = "GLOBAL"; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/StreamPartition.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/StreamPartition.java index 846b33d573..2c071d4f2e 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/StreamPartition.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/partition/StreamPartition.java @@ -6,12 +6,12 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.SourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; import java.util.Optional; -public class StreamPartition extends SourcePartition { +public class StreamPartition extends EnhancedSourcePartition { public static final String PARTITION_TYPE = "STREAM"; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileCheckpointer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileCheckpointer.java index bb681c8a20..c3221d62ec 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileCheckpointer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileCheckpointer.java @@ -5,7 +5,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.export; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.DataFileProgressState; import org.slf4j.Logger; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactory.java index 07d5a6efa7..75dccf8218 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactory.java @@ -9,8 +9,8 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import software.amazon.awssdk.services.s3.S3Client; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java index 181300c9c3..be1d1fc175 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java @@ -7,8 +7,8 @@ import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.SourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.dynamodb.model.LoadStatus; @@ -78,7 +78,7 @@ public void run() { while (!Thread.interrupted()) { if (numOfWorkers.get() < MAX_JOB_COUNT) { - final Optional sourcePartition = coordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE); + final Optional sourcePartition = coordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE); if (sourcePartition.isPresent()) { DataFilePartition dataFilePartition = (DataFilePartition) sourcePartition.get(); @@ -151,7 +151,7 @@ private void updateState(String exportArn, int loaded) { // Unlimited retries // The state be out of dated when updating. while (true) { - Optional globalPartition = coordinator.getPartition(exportArn); + Optional globalPartition = coordinator.getPartition(exportArn); if (globalPartition.isEmpty()) { LOG.error("Failed to get load status for " + exportArn); return; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java index 34fb702cc1..6c9d9108c2 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java @@ -7,8 +7,8 @@ import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.SourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.ExportPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState; @@ -96,7 +96,7 @@ public void run() { while (!Thread.interrupted()) { // Does not have limit on max leases // As most of the time it's just to wait - final Optional sourcePartition = enhancedSourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE); + final Optional sourcePartition = enhancedSourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE); if (sourcePartition.isPresent()) { diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java index 6e141c6e27..c6e1fea6ce 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java @@ -9,8 +9,8 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; @@ -39,7 +39,9 @@ public class ShardConsumerFactory { private final Buffer> buffer; - public ShardConsumerFactory(EnhancedSourceCoordinator enhancedSourceCoordinator, DynamoDbStreamsClient streamsClient, PluginMetrics pluginMetrics, ShardManager shardManager, Buffer> buffer) { + public ShardConsumerFactory(final EnhancedSourceCoordinator enhancedSourceCoordinator, + final DynamoDbStreamsClient streamsClient, final PluginMetrics pluginMetrics, + final ShardManager shardManager, final Buffer> buffer) { this.streamsClient = streamsClient; this.enhancedSourceCoordinator = enhancedSourceCoordinator; this.pluginMetrics = pluginMetrics; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java index 9407576b83..ccbfd268f3 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamCheckpointer.java @@ -5,8 +5,8 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.stream; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.SourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; import org.slf4j.Logger; @@ -87,7 +87,7 @@ public void release(String sequenceNumber) { } public boolean isExportDone() { - Optional globalPartition = coordinator.getPartition(streamPartition.getStreamArn()); + Optional globalPartition = coordinator.getPartition(streamPartition.getStreamArn()); return globalPartition.isPresent(); } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java index cd7775c5ba..0e9f2ef910 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java @@ -5,8 +5,8 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.stream; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.SourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; import org.slf4j.Logger; @@ -36,7 +36,7 @@ public class StreamScheduler implements Runnable { private final ShardManager shardManager; - public StreamScheduler(EnhancedSourceCoordinator coordinator, ShardConsumerFactory consumerFactory, ShardManager shardManager) { + public StreamScheduler(final EnhancedSourceCoordinator coordinator, final ShardConsumerFactory consumerFactory, final ShardManager shardManager) { this.coordinator = coordinator; this.shardManager = shardManager; this.consumerFactory = consumerFactory; @@ -62,7 +62,7 @@ public void run() { LOG.debug("Stream Scheduler start to run..."); while (!Thread.interrupted()) { if (numOfWorkers.get() < MAX_JOB_COUNT) { - final Optional sourcePartition = coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE); + final Optional sourcePartition = coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE); if (sourcePartition.isPresent()) { StreamPartition streamPartition = (StreamPartition) sourcePartition.get(); processStreamPartition(streamPartition); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBServiceTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBServiceTest.java index 7b6a9b7531..7858358a13 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBServiceTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBServiceTest.java @@ -15,11 +15,11 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.ExportConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.TableConfig; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.SourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.InitPartition; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.ContinuousBackupsDescription; @@ -196,10 +196,10 @@ void test_should_init() { // Acquire the init partition verify(coordinator).acquireAvailablePartition(eq(InitPartition.PARTITION_TYPE)); // Complete the init partition - verify(coordinator).completePartition(any(SourcePartition.class)); + verify(coordinator).completePartition(any(EnhancedSourcePartition.class)); // Should create 1 export partition + 1 stream partition + 1 global table state - verify(coordinator, times(3)).createPartition(any(SourcePartition.class)); + verify(coordinator, times(3)).createPartition(any(EnhancedSourcePartition.class)); } @Test diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfigTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfigTest.java index 0e11d82e57..581d17884a 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfigTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceConfigTest.java @@ -42,15 +42,10 @@ void test_general_config() throws JsonProcessingException { " start_position: \"BEGINNING\" \n" + "aws:\n" + " region: \"us-west-2\"\n" + - " sts_role_arn: \"arn:aws:iam::123456789012:role/DataPrepperRole\"\n" + - "coordinator:\n" + - " dynamodb:\n" + - " table_name: \"coordinator-table\"\n" + - " region: \"us-west-2\""; + " sts_role_arn: \"arn:aws:iam::123456789012:role/DataPrepperRole\""; final DynamoDBSourceConfig sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, DynamoDBSourceConfig.class); assertThat(sourceConfiguration.getAwsAuthenticationConfig(), notNullValue()); - assertThat(sourceConfiguration.getCoordinationStoreConfig(), notNullValue()); assertThat(sourceConfiguration.getTableConfigs(), notNullValue()); assertThat(sourceConfiguration.getTableConfigs().size(), equalTo(3)); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/PartitionFactoryTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/PartitionFactoryTest.java index 749213ae86..37e269dcd8 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/PartitionFactoryTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/coordination/PartitionFactoryTest.java @@ -10,6 +10,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.ExportPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState; @@ -68,7 +69,7 @@ void testCreateExportPartition() { when(sourcePartitionStoreItem.getPartitionProgressState()).thenReturn(state); PartitionFactory factory = new PartitionFactory(); - SourcePartition sourcePartition = factory.apply(sourcePartitionStoreItem); + EnhancedSourcePartition sourcePartition = factory.apply(sourcePartitionStoreItem); assertThat(sourcePartition, notNullValue()); ExportPartition exportPartition = (ExportPartition) sourcePartition; assertThat(exportPartition.getTableArn(), equalTo(tableArn)); @@ -95,7 +96,7 @@ void testCreateStreamPartition() { PartitionFactory factory = new PartitionFactory(); - SourcePartition sourcePartition = factory.apply(sourcePartitionStoreItem); + EnhancedSourcePartition sourcePartition = factory.apply(sourcePartitionStoreItem); assertThat(sourcePartition, notNullValue()); StreamPartition streamPartition = (StreamPartition) sourcePartition; assertThat(streamPartition.getStreamArn(), equalTo(streamArn)); @@ -122,7 +123,7 @@ void testCreateDataFilePartition() { PartitionFactory factory = new PartitionFactory(); - SourcePartition sourcePartition = factory.apply(sourcePartitionStoreItem); + EnhancedSourcePartition sourcePartition = factory.apply(sourcePartitionStoreItem); assertThat(sourcePartition, notNullValue()); DataFilePartition dataFilePartition = (DataFilePartition) sourcePartition; assertThat(dataFilePartition.getExportArn(), equalTo(exportArn)); @@ -150,7 +151,7 @@ void testCreateGlobalState() { PartitionFactory factory = new PartitionFactory(); - SourcePartition sourcePartition = factory.apply(sourcePartitionStoreItem); + EnhancedSourcePartition sourcePartition = factory.apply(sourcePartitionStoreItem); assertThat(sourcePartition, notNullValue()); GlobalState globalState = (GlobalState) sourcePartition; assertThat(globalState.getPartitionKey(), equalTo(partitionKey)); @@ -169,7 +170,7 @@ void testCreateInitPartition() { when(sourcePartitionStoreItem.getSourceIdentifier()).thenReturn(sourceId); PartitionFactory factory = new PartitionFactory(); - SourcePartition sourcePartition = factory.apply(sourcePartitionStoreItem); + EnhancedSourcePartition sourcePartition = factory.apply(sourcePartitionStoreItem); assertThat(sourcePartition, notNullValue()); InitPartition exportPartition = (InitPartition) sourcePartition; assertThat(exportPartition.getPartitionKey(), equalTo("GLOBAL")); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactoryTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactoryTest.java index efcb7be076..d1f7426ad7 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactoryTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderFactoryTest.java @@ -14,7 +14,7 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.DataFileProgressState; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java index 5118b700c7..fc25226f78 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoaderTest.java @@ -10,9 +10,9 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.SourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.DataFileProgressState; import software.amazon.awssdk.core.ResponseInputStream; @@ -81,10 +81,10 @@ void setup() throws IOException { when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(generateGzipInputStream(total)); s3ObjectReader = new S3ObjectReader(s3Client); - lenient().when(coordinator.createPartition(any(SourcePartition.class))).thenReturn(true); - lenient().doNothing().when(coordinator).completePartition(any(SourcePartition.class)); - lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(SourcePartition.class)); - lenient().doNothing().when(coordinator).giveUpPartition(any(SourcePartition.class)); + lenient().when(coordinator.createPartition(any(EnhancedSourcePartition.class))).thenReturn(true); + lenient().doNothing().when(coordinator).completePartition(any(EnhancedSourcePartition.class)); + lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(EnhancedSourcePartition.class)); + lenient().doNothing().when(coordinator).giveUpPartition(any(EnhancedSourcePartition.class)); lenient().doNothing().when(recordConverter).writeToBuffer(any(List.class)); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java index f3ae4b8098..33acaf9003 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java @@ -12,8 +12,8 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.SourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.DataFileProgressState; @@ -39,6 +39,7 @@ class DataFileSchedulerTest { @Mock private EnhancedSourceCoordinator coordinator; + @Mock private PluginMetrics pluginMetrics; @@ -100,9 +101,9 @@ void setup() { given(pluginMetrics.counter(EXPORT_FILE_SUCCESS_COUNT)).willReturn(exportFileSuccess); - lenient().when(coordinator.createPartition(any(SourcePartition.class))).thenReturn(true); - lenient().doNothing().when(coordinator).completePartition(any(SourcePartition.class)); - lenient().doNothing().when(coordinator).giveUpPartition(any(SourcePartition.class)); + lenient().when(coordinator.createPartition(any(EnhancedSourcePartition.class))).thenReturn(true); + lenient().doNothing().when(coordinator).completePartition(any(EnhancedSourcePartition.class)); + lenient().doNothing().when(coordinator).giveUpPartition(any(EnhancedSourcePartition.class)); lenient().when(loaderFactory.createDataFileLoader(any(DataFilePartition.class), any(TableInfo.class))).thenReturn(() -> System.out.println("Hello")); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java index 6f78f17809..dcd44a26aa 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java @@ -13,8 +13,8 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.SourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.ExportPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.ExportProgressState; import org.opensearch.dataprepper.plugins.source.dynamodb.model.ExportSummary; @@ -55,6 +55,7 @@ class ExportSchedulerTest { @Mock private EnhancedSourceCoordinator coordinator; + @Mock private DynamoDbClient dynamoDBClient; @@ -117,9 +118,9 @@ void setup() { lenient().when(summary.getManifestFilesS3Key()).thenReturn(manifestKey); lenient().when(manifestFileReader.parseDataFile(anyString(), anyString())).thenReturn(Map.of("Key1", 100, "Key2", 200)); - lenient().when(coordinator.createPartition(any(SourcePartition.class))).thenReturn(true); - lenient().doNothing().when(coordinator).completePartition(any(SourcePartition.class)); - lenient().doNothing().when(coordinator).giveUpPartition(any(SourcePartition.class)); + lenient().when(coordinator.createPartition(any(EnhancedSourcePartition.class))).thenReturn(true); + lenient().doNothing().when(coordinator).completePartition(any(EnhancedSourcePartition.class)); + lenient().doNothing().when(coordinator).giveUpPartition(any(EnhancedSourcePartition.class)); } @@ -156,9 +157,9 @@ public void test_run_exportJob_correctly() throws InterruptedException { verify(dynamoDBClient, times(2)).describeExport(any(DescribeExportRequest.class)); // Create 2 data file partitions + 1 global state - verify(coordinator, times(3)).createPartition(any(SourcePartition.class)); + verify(coordinator, times(3)).createPartition(any(EnhancedSourcePartition.class)); // Complete the export partition - verify(coordinator).completePartition(any(SourcePartition.class)); + verify(coordinator).completePartition(any(EnhancedSourcePartition.class)); verify(exportJobSuccess).increment(); verify(exportFilesTotal).increment(2); verify(exportRecordsTotal).increment(300); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java index 52a0716180..c2f3bd1cf1 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java @@ -14,7 +14,7 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; @@ -34,6 +34,7 @@ class ShardConsumerFactoryTest { @Mock private EnhancedSourceCoordinator coordinator; + @Mock private DynamoDbStreamsClient dynamoDbStreamsClient; @Mock diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java index 8fb1120aa0..87bc68ce7d 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java @@ -10,9 +10,9 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.SourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; @@ -89,10 +89,10 @@ void setup() { .build(); lenient().when(tableInfoGlobalState.getProgressState()).thenReturn(Optional.of(metadata.toMap())); - lenient().when(coordinator.createPartition(any(SourcePartition.class))).thenReturn(true); - lenient().doNothing().when(coordinator).completePartition(any(SourcePartition.class)); - lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(SourcePartition.class)); - lenient().doNothing().when(coordinator).giveUpPartition(any(SourcePartition.class)); + lenient().when(coordinator.createPartition(any(EnhancedSourcePartition.class))).thenReturn(true); + lenient().doNothing().when(coordinator).completePartition(any(EnhancedSourcePartition.class)); + lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(EnhancedSourcePartition.class)); + lenient().doNothing().when(coordinator).giveUpPartition(any(EnhancedSourcePartition.class)); checkpointer = new StreamCheckpointer(coordinator, streamPartition); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java index 30ced8d0f7..60ce8dd254 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java @@ -10,8 +10,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.EnhancedSourceCoordinator; -import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.SourcePartition; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; @@ -73,10 +73,10 @@ void setup() { streamPartition = new StreamPartition(streamArn, shardId, Optional.of(state)); // Mock Coordinator methods - lenient().when(coordinator.createPartition(any(SourcePartition.class))).thenReturn(true); - lenient().doNothing().when(coordinator).completePartition(any(SourcePartition.class)); - lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(SourcePartition.class)); - lenient().doNothing().when(coordinator).giveUpPartition(any(SourcePartition.class)); + lenient().when(coordinator.createPartition(any(EnhancedSourcePartition.class))).thenReturn(true); + lenient().doNothing().when(coordinator).completePartition(any(EnhancedSourcePartition.class)); + lenient().doNothing().when(coordinator).saveProgressStateForPartition(any(EnhancedSourcePartition.class)); + lenient().doNothing().when(coordinator).giveUpPartition(any(EnhancedSourcePartition.class)); lenient().when(consumerFactory.createConsumer(any(StreamPartition.class))).thenReturn(() -> System.out.println("Hello")); lenient().when(shardManager.getChildShardIds(anyString(), anyString())).thenReturn(List.of(shardId));