Skip to content

Commit

Permalink
Kds stream api changes (#5239)
Browse files Browse the repository at this point in the history
* Change API to use DescribeStreamSummary

Signed-off-by: Souvik Bose <souvbose@amazon.com>

* Code changes

Signed-off-by: Souvik Bose <souvbose@amazon.com>

* Rename the class.

Signed-off-by: Souvik Bose <souvbose@amazon.com>

* Address review comments

Signed-off-by: Souvik Bose <souvbose@amazon.com>

---------

Signed-off-by: Souvik Bose <souvbose@amazon.com>
Co-authored-by: Souvik Bose <souvbose@amazon.com>
  • Loading branch information
sb2k16 and sbose2k21 authored Dec 10, 2024
1 parent 4aa6106 commit 094dcbe
Show file tree
Hide file tree
Showing 8 changed files with 413 additions and 79 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/kinesis-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
implementation project(':data-prepper-plugins:common')
implementation project(path: ':data-prepper-plugins:buffer-common')
implementation project(path: ':data-prepper-plugins:aws-plugin-api')
implementation libs.armeria.core
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'io.micrometer:micrometer-core'
implementation 'software.amazon.kinesis:amazon-kinesis-client:2.6.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source;

import com.amazonaws.arn.Arn;
import com.linecorp.armeria.client.retry.Backoff;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.dataprepper.plugins.kinesis.source.exceptions.KinesisRetriesExhaustedException;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary;
import software.amazon.kinesis.common.StreamIdentifier;

import java.time.Duration;
import java.util.concurrent.CompletionException;

@Slf4j
public class KinesisClientApiHandler {
private static final String COLON = ":";

private final Backoff backoff;
private final KinesisAsyncClient kinesisClient;
private int failedAttemptCount;
private int maxRetryCount;

public KinesisClientApiHandler(final KinesisAsyncClient kinesisClient, final Backoff backoff, final int maxRetryCount) {
this.kinesisClient = kinesisClient;
this.backoff = backoff;
this.failedAttemptCount = 0;
if (maxRetryCount <= 0) {
throw new IllegalArgumentException("Maximum Retry count should be strictly greater than zero.");
}
this.maxRetryCount = maxRetryCount;
}

public StreamIdentifier getStreamIdentifier(final String streamName) {
failedAttemptCount = 0;
DescribeStreamSummaryRequest describeStreamSummaryRequest = DescribeStreamSummaryRequest.builder()
.streamName(streamName).build();
while (failedAttemptCount < maxRetryCount) {
try {
DescribeStreamSummaryResponse response = kinesisClient.describeStreamSummary(describeStreamSummaryRequest).join();
String streamIdentifierString = getStreamIdentifierString(response.streamDescriptionSummary());
return StreamIdentifier.multiStreamInstance(streamIdentifierString);
} catch (CompletionException ex) {
Throwable cause = ex.getCause();
if (cause instanceof KinesisException || cause instanceof com.amazonaws.SdkClientException) {
log.error("Failed to describe stream summary for stream {} with error {}. The kinesis source will retry.", streamName, ex.getMessage());
} else {
log.error("Failed to describe stream summary for stream {} with error {}. The kinesis source will retry.", streamName, ex);
}
}
applyBackoff();
++failedAttemptCount;
}
throw new KinesisRetriesExhaustedException(String.format("Failed to get Kinesis stream summary for stream %s after %d retries", streamName, maxRetryCount));
}

private void applyBackoff() {
final long delayMillis = backoff.nextDelayMillis(failedAttemptCount);
if (delayMillis < 0) {
throw new KinesisRetriesExhaustedException("Kinesis DescribeStreamSummary request retries exhausted. Make sure that Kinesis configuration is valid, Kinesis stream exists, and IAM role has required permissions.");
}
final Duration delayDuration = Duration.ofMillis(delayMillis);
log.info("Pausing Kinesis DescribeStreamSummary request for {}.{} seconds due to an error in processing.",
delayDuration.getSeconds(), delayDuration.toMillisPart());
try {
Thread.sleep(delayMillis);
} catch (final InterruptedException e){
log.error("Thread is interrupted while polling Kinesis with retry.", e);
}
}

private String getStreamIdentifierString(StreamDescriptionSummary streamDescriptionSummary) {
String accountId = Arn.fromString(streamDescriptionSummary.streamARN()).getAccountId();
long creationEpochSecond = streamDescriptionSummary.streamCreationTimestamp().getEpochSecond();
return String.join(COLON, accountId, streamDescriptionSummary.streamName(), String.valueOf(creationEpochSecond));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,8 @@

package org.opensearch.dataprepper.plugins.kinesis.source;

import com.amazonaws.arn.Arn;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.StreamDescription;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
Expand All @@ -27,18 +22,15 @@
import java.util.ArrayList;
import java.util.List;


public class KinesisMultiStreamTracker implements MultiStreamTracker {
private static final String COLON = ":";

private final KinesisAsyncClient kinesisClient;
private final KinesisSourceConfig sourceConfig;
private final String applicationName;
private final KinesisClientApiHandler kinesisClientAPIHandler;

public KinesisMultiStreamTracker(KinesisAsyncClient kinesisClient, final KinesisSourceConfig sourceConfig, final String applicationName) {
this.kinesisClient = kinesisClient;
public KinesisMultiStreamTracker(final KinesisSourceConfig sourceConfig, final String applicationName, final KinesisClientApiHandler kinesisClientAPIHandler) {
this.sourceConfig = sourceConfig;
this.applicationName = applicationName;
this.kinesisClientAPIHandler = kinesisClientAPIHandler;
}

@Override
Expand All @@ -52,26 +44,11 @@ public List<StreamConfig> streamConfigList() {
}

private StreamConfig getStreamConfig(KinesisStreamConfig kinesisStreamConfig) {
StreamIdentifier sourceStreamIdentifier = getStreamIdentifier(kinesisStreamConfig);
StreamIdentifier sourceStreamIdentifier = kinesisClientAPIHandler.getStreamIdentifier(kinesisStreamConfig.getName());
return new StreamConfig(sourceStreamIdentifier,
InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition()));
}

private StreamIdentifier getStreamIdentifier(KinesisStreamConfig kinesisStreamConfig) {
DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
.streamName(kinesisStreamConfig.getName())
.build();
DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest).join();
String streamIdentifierString = getStreamIdentifierString(describeStreamResponse.streamDescription());
return StreamIdentifier.multiStreamInstance(streamIdentifierString);
}

private String getStreamIdentifierString(StreamDescription streamDescription) {
String accountId = Arn.fromString(streamDescription.streamARN()).getAccountId();
long creationEpochSecond = streamDescription.streamCreationTimestamp().getEpochSecond();
return String.join(COLON, accountId, streamDescription.streamName(), String.valueOf(creationEpochSecond));
}

/**
* Setting the deletion policy as autodetect and release shard lease with a wait time of 10 sec
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package org.opensearch.dataprepper.plugins.kinesis.source;

import com.amazonaws.SdkClientException;
import com.linecorp.armeria.client.retry.Backoff;
import lombok.Getter;
import lombok.Setter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand Down Expand Up @@ -42,6 +43,7 @@
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -55,6 +57,10 @@
public class KinesisService {
private static final Logger LOG = LoggerFactory.getLogger(KinesisService.class);
private static final int GRACEFUL_SHUTDOWN_WAIT_INTERVAL_SECONDS = 20;
private static final long INITIAL_DELAY = Duration.ofSeconds(20).toMillis();
private static final long MAXIMUM_DELAY = Duration.ofMinutes(5).toMillis();
private static final double JITTER_RATE = 0.20;
private static final int NUM_OF_RETRIES = 3;

private final PluginMetrics pluginMetrics;
private final PluginFactory pluginFactory;
Expand Down Expand Up @@ -171,7 +177,8 @@ public Scheduler createScheduler(final Buffer<Record<Event>> buffer) {

ConfigsBuilder configsBuilder =
new ConfigsBuilder(
new KinesisMultiStreamTracker(kinesisClient, kinesisSourceConfig, applicationName),
new KinesisMultiStreamTracker(kinesisSourceConfig, applicationName, new KinesisClientApiHandler(kinesisClient, Backoff.exponential(INITIAL_DELAY, MAXIMUM_DELAY).withJitter(JITTER_RATE)
.withMaxAttempts(NUM_OF_RETRIES), NUM_OF_RETRIES)),
applicationName, kinesisClient, dynamoDbClient, cloudWatchClient,
workerIdentifierGenerator.generate(), processorFactory
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source.exceptions;

public class KinesisRetriesExhaustedException extends RuntimeException {
public KinesisRetriesExhaustedException(final String errorMessage) {
super(errorMessage);
}
}
Loading

0 comments on commit 094dcbe

Please sign in to comment.