Skip to content

Commit

Permalink
Move ddb source coordinator config to the data-prepper-config.yaml (#…
Browse files Browse the repository at this point in the history
…3466)

Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 authored Oct 11, 2023
1 parent 400b2a8 commit 633401a
Show file tree
Hide file tree
Showing 42 changed files with 675 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.dynamodb.coordination;
package org.opensearch.dataprepper.model.source.coordinator.enhanced;

import java.util.Optional;

/**
* A Partition Interface represents an item in the coordination store.
*/
public interface Partition<T> {
public interface EnhancedPartition<T> {


String getPartitionType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.dynamodb.coordination;
package org.opensearch.dataprepper.model.source.coordinator.enhanced;

import java.time.Duration;
import java.util.Optional;
Expand All @@ -25,20 +25,20 @@ public interface EnhancedSourceCoordinator {
/**
* This method is used to create the partition item in the coordination store.
*
* @param partition A specific partition that extends {@link SourcePartition}
* @param partition A specific partition that extends {@link EnhancedSourcePartition}
* @param <T> The progress state class
* @return True if partition is created successfully otherwise false.
*/
<T> boolean createPartition(SourcePartition<T> partition);
<T> boolean createPartition(EnhancedSourcePartition<T> partition);


/**
* This method is used to acquire a lease on the partition item in the coordination store.
*
* @param partitionType The partition type identifier
* @return A {@link SourcePartition} instance
* @return A {@link EnhancedSourcePartition} instance
*/
Optional<SourcePartition> acquireAvailablePartition(String partitionType);
Optional<EnhancedSourcePartition> acquireAvailablePartition(String partitionType);

/**
* This method is used to update progress state for a partition in the coordination store.
Expand All @@ -48,7 +48,7 @@ public interface EnhancedSourceCoordinator {
* @param <T> The progress state class
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException when the partition was not updated successfully
*/
<T> void saveProgressStateForPartition(SourcePartition<T> partition);
<T> void saveProgressStateForPartition(EnhancedSourcePartition<T> partition);

/**
* This method is used to release the lease of a partition in the coordination store.
Expand All @@ -58,7 +58,7 @@ public interface EnhancedSourceCoordinator {
* @param <T> The progress state class
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException when the partition was not updated successfully
*/
<T> void giveUpPartition(SourcePartition<T> partition);
<T> void giveUpPartition(EnhancedSourcePartition<T> partition);

/**
* This method is used to mark a partition as COMPLETED in the coordination store.
Expand All @@ -68,7 +68,7 @@ public interface EnhancedSourceCoordinator {
* @param <T> The progress state class
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException when the partition was not updated successfully
*/
<T> void completePartition(SourcePartition<T> partition);
<T> void completePartition(EnhancedSourcePartition<T> partition);

/**
* This method is used to mark a partition as CLOSED in the coordination store.
Expand All @@ -82,7 +82,7 @@ public interface EnhancedSourceCoordinator {
* @param <T> The progress state class
* @throws org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException when the partition was not updated successfully
*/
<T> void closePartition(SourcePartition<T> partition, final Duration reopenAfter, final int maxClosedCount);
<T> void closePartition(EnhancedSourcePartition<T> partition, final Duration reopenAfter, final int maxClosedCount);


/**
Expand All @@ -91,9 +91,9 @@ public interface EnhancedSourceCoordinator {
* Hence, it's designed to be used as a "Global State" which can be read whenever needed.
*
* @param partitionKey A unique key for that partition
* @return A {@link SourcePartition} instance
* @return A {@link EnhancedSourcePartition} instance
*/
Optional<SourcePartition> getPartition(String partitionKey);
Optional<EnhancedSourcePartition> getPartition(String partitionKey);

/**
* This method is to perform initialization for the coordinator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.dynamodb.coordination;
package org.opensearch.dataprepper.model.source.coordinator.enhanced;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
Expand All @@ -17,7 +17,7 @@

/**
* <p>
* A base definition of a {@link Partition} in the coordination store.
* A base definition of a {@link EnhancedPartition} in the coordination store.
* All partitions must extend this.
* </p>
* We store the {SourcePartitionStoreItem} in the partition.
Expand All @@ -30,40 +30,38 @@
*
* @param <T> The progress state class
*/
public abstract class SourcePartition<T> implements Partition<T> {
public abstract class EnhancedSourcePartition<T> implements EnhancedPartition<T> {

private static final Logger LOG = LoggerFactory.getLogger(DefaultEnhancedSourceCoordinator.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final Logger LOG = LoggerFactory.getLogger(EnhancedSourcePartition.class);
private static final ObjectMapper objectMapper = new ObjectMapper();

private SourcePartitionStoreItem sourcePartitionStoreItem;

protected SourcePartitionStoreItem getSourcePartitionStoreItem() {
public SourcePartitionStoreItem getSourcePartitionStoreItem() {
return sourcePartitionStoreItem;
}


protected void setSourcePartitionStoreItem(SourcePartitionStoreItem sourcePartitionStoreItem) {
public void setSourcePartitionStoreItem(SourcePartitionStoreItem sourcePartitionStoreItem) {
this.sourcePartitionStoreItem = sourcePartitionStoreItem;
}


/**
* Helper method to convert progress state.
* This is because the state is currently stored as a String in the coordination store.
*/
protected T convertStringToPartitionProgressState(Class<T> progressStateClass, final String serializedPartitionProgressState) {
public T convertStringToPartitionProgressState(Class<T> progressStateClass, final String serializedPartitionProgressState) {
if (Objects.isNull(serializedPartitionProgressState)) {
return null;
}

try {
if (progressStateClass != null) {
return MAPPER.readValue(serializedPartitionProgressState, progressStateClass);
return objectMapper.readValue(serializedPartitionProgressState, progressStateClass);
}
return MAPPER.readValue(serializedPartitionProgressState, new TypeReference<>() {
return objectMapper.readValue(serializedPartitionProgressState, new TypeReference<>() {
});
} catch (final JsonProcessingException e) {
LOG.error("Unable to convert string to partition progress state class {}: {}", progressStateClass.getName(), e);
LOG.error("Unable to convert string to partition progress state class {}: ", progressStateClass != null ? progressStateClass.getName() : null, e);
return null;
}
}
Expand All @@ -72,14 +70,14 @@ protected T convertStringToPartitionProgressState(Class<T> progressStateClass, f
* Helper method to convert progress state to String
* This is because the state is currently stored as a String in the coordination store.
*/
protected String convertPartitionProgressStatetoString(Optional<T> partitionProgressState) {
public String convertPartitionProgressStatetoString(Optional<T> partitionProgressState) {
if (partitionProgressState.isEmpty()) {
return null;
}
try {
return MAPPER.writeValueAsString(partitionProgressState.get());
return objectMapper.writeValueAsString(partitionProgressState.get());
} catch (final JsonProcessingException e) {
LOG.error("Unable to convert partition progress state class to string: {}", e);
LOG.error("Unable to convert partition progress state class to string: ", e);
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.source.coordinator.enhanced;

import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;

import java.util.function.Function;

public interface UsesEnhancedSourceCoordination {

/**
*
* @param sourceCoordinator - The {@link EnhancedSourceCoordinator} to be used by the
* {@link org.opensearch.dataprepper.model.source.Source} as needed
*/
void setEnhancedSourceCoordinator(final EnhancedSourceCoordinator sourceCoordinator);

Function<SourcePartitionStoreItem, EnhancedSourcePartition> getPartitionFactory();
}
Loading

0 comments on commit 633401a

Please sign in to comment.