Skip to content

Commit

Permalink
Add new metrics for tracking bytes received and processed by KDS
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <souvbose@amazon.com>
  • Loading branch information
sbose2k21 committed Dec 2, 2024
1 parent 539d599 commit f79a4f6
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package org.opensearch.dataprepper.plugins.kinesis.source;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -26,6 +27,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputCodec;
import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputConfig;
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfig;
Expand Down Expand Up @@ -70,9 +72,11 @@
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_RECORD_PROCESSED;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_RECORD_PROCESSING_ERRORS;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_RECORD_BYTES_PROCESSED_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_RECORD_BYTES_RECEIVED_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_RECORD_PROCESSED_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_CHECKPOINT_FAILURES;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_RECORD_PROCESSING_ERRORS_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisRecordProcessor.KINESIS_STREAM_TAG_KEY;

public class KinesisSourceIT {
Expand All @@ -83,6 +87,8 @@ public class KinesisSourceIT {
private static final String codec_plugin_name = "ndjson";
private static final String LEASE_TABLE_PREFIX = "kinesis-lease-table";
private static final int NUMBER_OF_RECORDS_TO_ACCUMULATE = 10;
private static final int MAX_INITIALIZATION_ATTEMPTS = 3;
private static final Duration BUFFER_TIMEOUT = Duration.ofMillis(1);

@Mock
private AcknowledgementSetManager acknowledgementSetManager;
Expand Down Expand Up @@ -141,6 +147,12 @@ public class KinesisSourceIT {
@Mock
private Counter checkpointFailures;

@Mock
private DistributionSummary bytesReceivedSummary;

@Mock
private DistributionSummary bytesProcessedSummary;

private KinesisClient kinesisClient;

private DynamoDbClient dynamoDbClient;
Expand All @@ -165,6 +177,7 @@ void setup() {
when(kinesisStreamConfig.getName()).thenReturn(streamName);
when(kinesisStreamConfig.getCheckPointInterval()).thenReturn(CHECKPOINT_INTERVAL);
when(kinesisStreamConfig.getInitialPosition()).thenReturn(InitialPositionInStream.TRIM_HORIZON);
when(kinesisStreamConfig.getCompression()).thenReturn(CompressionOption.NONE);
when(kinesisSourceConfig.getConsumerStrategy()).thenReturn(ConsumerStrategy.ENHANCED_FAN_OUT);
when(kinesisSourceConfig.getStreams()).thenReturn(List.of(kinesisStreamConfig));
when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(kinesisLeaseCoordinationTableConfig);
Expand All @@ -179,7 +192,8 @@ void setup() {
when(kinesisSourceConfig.getCodec()).thenReturn(pluginModel);
when(kinesisSourceConfig.isAcknowledgments()).thenReturn(false);
when(kinesisSourceConfig.getNumberOfRecordsToAccumulate()).thenReturn(NUMBER_OF_RECORDS_TO_ACCUMULATE);
when(kinesisSourceConfig.getBufferTimeout()).thenReturn(Duration.ofMillis(1));
when(kinesisSourceConfig.getBufferTimeout()).thenReturn(BUFFER_TIMEOUT);
when(kinesisSourceConfig.getMaxInitializationAttempts()).thenReturn(MAX_INITIALIZATION_ATTEMPTS);

kinesisClientFactory = mock(KinesisClientFactory.class);
when(kinesisClientFactory.buildDynamoDBClient(kinesisLeaseCoordinationTableConfig.getAwsRegion())).thenReturn(DynamoDbAsyncClient.builder()
Expand All @@ -204,17 +218,20 @@ void setup() {
when(pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamName))
.thenReturn(acknowledgementSetFailures);

when(pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSED, KINESIS_STREAM_TAG_KEY, streamName))
when(pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSED_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamName))
.thenReturn(recordsProcessed);

when(pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSING_ERRORS, KINESIS_STREAM_TAG_KEY, streamName))
when(pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSING_ERRORS_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamName))
.thenReturn(recordProcessingErrors);

when(pluginMetrics.counterWithTags(KINESIS_CHECKPOINT_FAILURES, KINESIS_STREAM_TAG_KEY, streamName))
.thenReturn(checkpointFailures);

kinesisClient = KinesisClient.builder().region(Region.of(System.getProperty(AWS_REGION))).build();
dynamoDbClient = DynamoDbClient.builder().region(Region.of(System.getProperty(AWS_REGION))).build();
when(pluginMetrics.summary(KINESIS_RECORD_BYTES_RECEIVED_METRIC_NAME)).thenReturn(bytesReceivedSummary);

when(pluginMetrics.summary(KINESIS_RECORD_BYTES_PROCESSED_METRIC_NAME)).thenReturn(bytesProcessedSummary);
kinesisIngester = new KinesisIngester(kinesisClient, streamName, dynamoDbClient, leaseTableName);

kinesisIngester.createStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand Down Expand Up @@ -39,6 +40,7 @@
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.Optional;
Expand Down Expand Up @@ -70,10 +72,14 @@ public class KinesisRecordProcessor implements ShardRecordProcessor {
private final Counter recordsProcessed;
private final Counter recordProcessingErrors;
private final Counter checkpointFailures;
private final DistributionSummary bytesReceivedSummary;
private final DistributionSummary bytesProcessedSummary;
public static final String ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME = "acknowledgementSetSuccesses";
public static final String ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME = "acknowledgementSetFailures";
public static final String KINESIS_RECORD_PROCESSED = "recordProcessed";
public static final String KINESIS_RECORD_PROCESSING_ERRORS = "recordProcessingErrors";
public static final String KINESIS_RECORD_PROCESSED_METRIC_NAME = "recordProcessed";
public static final String KINESIS_RECORD_PROCESSING_ERRORS_METRIC_NAME = "recordProcessingErrors";
public static final String KINESIS_RECORD_BYTES_RECEIVED_METRIC_NAME = "bytesReceived";
public static final String KINESIS_RECORD_BYTES_PROCESSED_METRIC_NAME = "bytesProcessed";
public static final String KINESIS_CHECKPOINT_FAILURES = "checkpointFailures";
public static final String KINESIS_STREAM_TAG_KEY = "stream";
private AtomicBoolean isStopRequested;
Expand All @@ -93,9 +99,11 @@ public KinesisRecordProcessor(final BufferAccumulator<Record<Event>> bufferAccum
this.acknowledgementSetManager = acknowledgementSetManager;
this.acknowledgementSetSuccesses = pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_SUCCESS_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.acknowledgementSetFailures = pluginMetrics.counterWithTags(ACKNOWLEDGEMENT_SET_FAILURES_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.recordsProcessed = pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSED, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.recordProcessingErrors = pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSING_ERRORS, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.recordsProcessed = pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSED_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.recordProcessingErrors = pluginMetrics.counterWithTags(KINESIS_RECORD_PROCESSING_ERRORS_METRIC_NAME, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.checkpointFailures = pluginMetrics.counterWithTags(KINESIS_CHECKPOINT_FAILURES, KINESIS_STREAM_TAG_KEY, streamIdentifier.streamName());
this.bytesReceivedSummary = pluginMetrics.summary(KINESIS_RECORD_BYTES_RECEIVED_METRIC_NAME);
this.bytesProcessedSummary = pluginMetrics.summary(KINESIS_RECORD_BYTES_PROCESSED_METRIC_NAME);
this.checkpointInterval = kinesisStreamConfig.getCheckPointInterval();
this.bufferAccumulator = bufferAccumulator;
this.kinesisCheckpointerTracker = kinesisCheckpointerTracker;
Expand Down Expand Up @@ -161,6 +169,12 @@ public void processRecords(ProcessRecordsInput processRecordsInput) {

// Track the records for checkpoint purpose
kinesisCheckpointerTracker.addRecordForCheckpoint(extendedSequenceNumber, processRecordsInput.checkpointer());

// Get the size of bytes received from Kinesis stream
final List<Integer> recordBytes = new ArrayList<>();
processRecordsInput.records().forEach(kinesisClientRecord-> recordBytes.add(kinesisClientRecord.data().remaining()));
bytesReceivedSummary.record(recordBytes.stream().mapToLong(Integer::longValue).sum());

List<Record<Event>> records = kinesisRecordConverter.convert(
kinesisStreamConfig.getCompression().getDecompressionEngine(),
processRecordsInput.records(), streamIdentifier.streamName());
Expand All @@ -177,6 +191,7 @@ public void processRecords(ProcessRecordsInput processRecordsInput) {
// Flush buffer at the end
bufferAccumulator.flush();
recordsProcessed.increment(eventCount);
bytesProcessedSummary.record(recordBytes.stream().mapToLong(Integer::longValue).sum());

// If acks are not enabled, mark the sequence number for checkpoint
if (!acknowledgementsEnabled) {
Expand Down
Loading

0 comments on commit f79a4f6

Please sign in to comment.