Skip to content

Commit

Permalink
Rename the class.
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <souvbose@amazon.com>
  • Loading branch information
sbose2k21 committed Dec 2, 2024
1 parent fae1f9a commit cb0f08c
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
import java.time.Duration;

@Slf4j
public class KinesisStreamBackoffStrategy {
public class KinesisClientAPIHandler {
private static final String COLON = ":";

private final Backoff backoff;
private final KinesisAsyncClient kinesisClient;
private int failedAttemptCount;
private int maxRetryCount;

public KinesisStreamBackoffStrategy(final KinesisAsyncClient kinesisClient, final Backoff backoff, final int maxRetryCount) {
public KinesisClientAPIHandler(final KinesisAsyncClient kinesisClient, final Backoff backoff, final int maxRetryCount) {
this.kinesisClient = kinesisClient;
this.backoff = backoff;
this.failedAttemptCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

package org.opensearch.dataprepper.plugins.kinesis.source;

import lombok.extern.slf4j.Slf4j;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
Expand All @@ -23,16 +22,15 @@
import java.util.ArrayList;
import java.util.List;

@Slf4j
public class KinesisMultiStreamTracker implements MultiStreamTracker {
private final KinesisSourceConfig sourceConfig;
private final String applicationName;
private final KinesisStreamBackoffStrategy kinesisStreamBackoffStrategy;
private final KinesisClientAPIHandler kinesisClientAPIHandler;

public KinesisMultiStreamTracker(final KinesisSourceConfig sourceConfig, final String applicationName, final KinesisStreamBackoffStrategy kinesisStreamBackoffStrategy) {
public KinesisMultiStreamTracker(final KinesisSourceConfig sourceConfig, final String applicationName, final KinesisClientAPIHandler kinesisClientAPIHandler) {
this.sourceConfig = sourceConfig;
this.applicationName = applicationName;
this.kinesisStreamBackoffStrategy = kinesisStreamBackoffStrategy;
this.kinesisClientAPIHandler = kinesisClientAPIHandler;
}

@Override
Expand All @@ -46,7 +44,7 @@ public List<StreamConfig> streamConfigList() {
}

private StreamConfig getStreamConfig(KinesisStreamConfig kinesisStreamConfig) {
StreamIdentifier sourceStreamIdentifier = kinesisStreamBackoffStrategy.getStreamIdentifier(kinesisStreamConfig.getName());
StreamIdentifier sourceStreamIdentifier = kinesisClientAPIHandler.getStreamIdentifier(kinesisStreamConfig.getName());
return new StreamConfig(sourceStreamIdentifier,
InitialPositionInStreamExtended.newInitialPosition(kinesisStreamConfig.getInitialPosition()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public Scheduler createScheduler(final Buffer<Record<Event>> buffer) {

ConfigsBuilder configsBuilder =
new ConfigsBuilder(
new KinesisMultiStreamTracker(kinesisSourceConfig, applicationName, new KinesisStreamBackoffStrategy(kinesisClient, Backoff.exponential(INITIAL_DELAY, MAXIMUM_DELAY).withJitter(JITTER_RATE)
new KinesisMultiStreamTracker(kinesisSourceConfig, applicationName, new KinesisClientAPIHandler(kinesisClient, Backoff.exponential(INITIAL_DELAY, MAXIMUM_DELAY).withJitter(JITTER_RATE)
.withMaxAttempts(NUM_OF_RETRIES), NUM_OF_RETRIES)),
applicationName, kinesisClient, dynamoDbClient, cloudWatchClient,
workerIdentifierGenerator.generate(), processorFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class KinesisStreamBackoffStrategyTest {
public class KinesisClientAPIHandlerTest {
private static final List<String> STREAMS_LIST = ImmutableList.of("stream-1", "stream-2", "stream-3");
private static final String awsAccountId = "1234";
private static final String streamArnFormat = "arn:aws:kinesis:us-east-1:%s:stream/%s";
Expand All @@ -55,8 +55,8 @@ public void setUp() {
streamName = UUID.randomUUID().toString();
}

private KinesisStreamBackoffStrategy createObjectUnderTest() {
return new KinesisStreamBackoffStrategy(kinesisClient, backoff, NUM_OF_RETRIES);
private KinesisClientAPIHandler createObjectUnderTest() {
return new KinesisClientAPIHandler(kinesisClient, backoff, NUM_OF_RETRIES);
}

@Test
Expand All @@ -67,9 +67,9 @@ public void testGetStreamIdentifierThrowsException() {

when(kinesisClient.describeStreamSummary(describeStreamSummaryRequest)).thenThrow(new KinesisRetriesExhaustedException("exception"));

KinesisStreamBackoffStrategy kinesisStreamBackoffStrategy = createObjectUnderTest();
KinesisClientAPIHandler kinesisClientAPIHandler = createObjectUnderTest();

assertThrows(KinesisRetriesExhaustedException.class, () -> kinesisStreamBackoffStrategy.getStreamIdentifier(streamName));
assertThrows(KinesisRetriesExhaustedException.class, () -> kinesisClientAPIHandler.getStreamIdentifier(streamName));
}

@Test
Expand All @@ -90,9 +90,9 @@ public void testGetStreamIdentifierSuccess() {

given(kinesisClient.describeStreamSummary(describeStreamSummaryRequest)).willReturn(successFuture);

KinesisStreamBackoffStrategy kinesisStreamBackoffStrategy = createObjectUnderTest();
KinesisClientAPIHandler kinesisClientAPIHandler = createObjectUnderTest();

StreamIdentifier streamIdentifier = kinesisStreamBackoffStrategy.getStreamIdentifier(streamName);
StreamIdentifier streamIdentifier = kinesisClientAPIHandler.getStreamIdentifier(streamName);
assertEquals(streamIdentifier, getStreamIdentifier(streamName));
}

Expand Down Expand Up @@ -122,9 +122,9 @@ public void testGetStreamIdentifierSuccessWithRetries() {
.willReturn(failedFuture2)
.willReturn(successFuture);

KinesisStreamBackoffStrategy kinesisStreamBackoffStrategy = createObjectUnderTest();
KinesisClientAPIHandler kinesisClientAPIHandler = createObjectUnderTest();

StreamIdentifier streamIdentifier = kinesisStreamBackoffStrategy.getStreamIdentifier(streamName);
StreamIdentifier streamIdentifier = kinesisClientAPIHandler.getStreamIdentifier(streamName);
assertEquals(streamIdentifier, getStreamIdentifier(streamName));
}

Expand Down Expand Up @@ -154,9 +154,9 @@ public void testGetStreamIdentifierSuccessWithMultipleRetries() {
.willReturn(failedFuture2)
.willReturn(successFuture);

KinesisStreamBackoffStrategy kinesisStreamBackoffStrategy = createObjectUnderTest();
KinesisClientAPIHandler kinesisClientAPIHandler = createObjectUnderTest();

StreamIdentifier streamIdentifier = kinesisStreamBackoffStrategy.getStreamIdentifier(streamName);
StreamIdentifier streamIdentifier = kinesisClientAPIHandler.getStreamIdentifier(streamName);
assertEquals(streamIdentifier, getStreamIdentifier(streamName));
}

Expand Down Expand Up @@ -190,9 +190,14 @@ public void testGetStreamIdentifierFailureWithMultipleRetries() {

when(backoff.nextDelayMillis(eq(2))).thenReturn(-10L);

KinesisStreamBackoffStrategy kinesisStreamBackoffStrategy = createObjectUnderTest();
KinesisClientAPIHandler kinesisClientAPIHandler = createObjectUnderTest();

assertThrows(KinesisRetriesExhaustedException.class, ()->kinesisStreamBackoffStrategy.getStreamIdentifier(streamName));
assertThrows(KinesisRetriesExhaustedException.class, ()->kinesisClientAPIHandler.getStreamIdentifier(streamName));
}

@Test
public void testCreateFailureInvalidMaxRetry() {
assertThrows(IllegalArgumentException.class, () -> new KinesisClientAPIHandler(kinesisClient, backoff, -1));
}

private StreamIdentifier getStreamIdentifier(final String streamName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class KinesisMultiStreamTrackerTest {
KinesisSourceConfig kinesisSourceConfig;

@Mock
private KinesisStreamBackoffStrategy kinesisStreamBackoffStrategy;
private KinesisClientAPIHandler kinesisClientAPIHandler;

@BeforeEach
public void setUp() {
Expand All @@ -61,7 +61,7 @@ public void setUp() {
KinesisStreamConfig kinesisStreamConfig = mock(KinesisStreamConfig.class);
when(kinesisStreamConfig.getName()).thenReturn(stream);
when(kinesisStreamConfig.getInitialPosition()).thenReturn(InitialPositionInStream.LATEST);
when(kinesisStreamBackoffStrategy.getStreamIdentifier(stream)).thenReturn(getStreamIdentifier(stream));
when(kinesisClientAPIHandler.getStreamIdentifier(stream)).thenReturn(getStreamIdentifier(stream));
kinesisStreamConfigs.add(kinesisStreamConfig);
streamConfigMap.put(stream, kinesisStreamConfig);
});
Expand All @@ -70,7 +70,7 @@ public void setUp() {
}

private KinesisMultiStreamTracker createObjectUnderTest() {
return new KinesisMultiStreamTracker(kinesisSourceConfig, APPLICATION_NAME, kinesisStreamBackoffStrategy);
return new KinesisMultiStreamTracker(kinesisSourceConfig, APPLICATION_NAME, kinesisClientAPIHandler);
}

@Test
Expand Down Expand Up @@ -99,7 +99,7 @@ public void testStreamConfigListWithException() {
when(kinesisStreamConfig.getName()).thenReturn(stream);
when(kinesisStreamConfig.getInitialPosition()).thenReturn(InitialPositionInStream.LATEST);

when(kinesisStreamBackoffStrategy.getStreamIdentifier(stream)).thenThrow(new RuntimeException());
when(kinesisClientAPIHandler.getStreamIdentifier(stream)).thenThrow(new RuntimeException());
kinesisStreamConfigs.add(kinesisStreamConfig);
streamConfigMap.put(stream, kinesisStreamConfig);
});
Expand All @@ -121,7 +121,7 @@ public void testStreamConfigWithRetries() {
when(kinesisStreamConfig.getInitialPosition()).thenReturn(InitialPositionInStream.LATEST);

StreamIdentifier streamIdentifier = getStreamIdentifier(stream);
when(kinesisStreamBackoffStrategy.getStreamIdentifier(stream)).thenReturn(streamIdentifier);
when(kinesisClientAPIHandler.getStreamIdentifier(stream)).thenReturn(streamIdentifier);
});

when(kinesisSourceConfig.getStreams()).thenReturn(kinesisStreamConfigs);
Expand Down
Empty file.

0 comments on commit cb0f08c

Please sign in to comment.