From 9e6d5837a63a06201c1ed44c8554722976345f33 Mon Sep 17 00:00:00 2001 From: Souvik Bose Date: Thu, 5 Dec 2024 11:14:44 -0800 Subject: [PATCH] Address review comments. Signed-off-by: Souvik Bose --- .../converter/KinesisRecordConverter.java | 13 +- .../processor/KinesisInputOutputRecord.java | 26 ++++ .../processor/KinesisRecordProcessor.java | 19 ++- .../source/KinesisInputOutputRecordTest.java | 58 ++++++++ .../converter/KinesisRecordConverterTest.java | 13 +- .../processor/KinesisRecordProcessorTest.java | 135 ++++++++++++------ 6 files changed, 196 insertions(+), 68 deletions(-) create mode 100644 data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisInputOutputRecord.java create mode 100644 data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisInputOutputRecordTest.java diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java index 7e1af1a41d..fdad523b63 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverter.java @@ -15,6 +15,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisInputOutputRecord; import software.amazon.kinesis.retrieval.KinesisClientRecord; import java.io.ByteArrayInputStream; @@ -32,13 +33,12 @@ public KinesisRecordConverter(final InputCodec codec) { this.codec = codec; } - public List> convert(final DecompressionEngine decompressionEngine, - List kinesisClientRecords, - final String streamName) throws IOException { - List> records = new ArrayList<>(); + public List convert(final DecompressionEngine decompressionEngine, + List kinesisClientRecords, + final String streamName) throws IOException { + List records = new ArrayList<>(); for (KinesisClientRecord kinesisClientRecord : kinesisClientRecords) { processRecord(decompressionEngine, kinesisClientRecord, record -> { - records.add(record); Event event = record.getData(); EventMetadata eventMetadata = event.getMetadata(); eventMetadata.setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, @@ -49,6 +49,9 @@ public List> convert(final DecompressionEngine decompressionEngine final Instant externalOriginationTime = kinesisClientRecord.approximateArrivalTimestamp(); event.getEventHandle().setExternalOriginationTime(externalOriginationTime); event.getMetadata().setExternalOriginationTime(externalOriginationTime); + records.add(KinesisInputOutputRecord.builder() + .withKinesisClientRecord(kinesisClientRecord) + .withDataPrepperRecord(record).build()); }); } return records; diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisInputOutputRecord.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisInputOutputRecord.java new file mode 100644 index 0000000000..9663dc54fe --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisInputOutputRecord.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.kinesis.source.processor; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import software.amazon.kinesis.retrieval.KinesisClientRecord; + +@Builder(setterPrefix = "with") +@Getter +@AllArgsConstructor +public class KinesisInputOutputRecord { + private Record dataPrepperRecord; + private KinesisClientRecord kinesisClientRecord; +} diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java index 1ec742e99c..b6b9c0d0fd 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessor.java @@ -39,6 +39,7 @@ import software.amazon.kinesis.retrieval.KinesisClientRecord; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -170,28 +171,26 @@ public void processRecords(ProcessRecordsInput processRecordsInput) { // Track the records for checkpoint purpose kinesisCheckpointerTracker.addRecordForCheckpoint(extendedSequenceNumber, processRecordsInput.checkpointer()); - // Get the size of bytes received from Kinesis stream - final List recordBytes = new ArrayList<>(); - processRecordsInput.records().forEach(kinesisClientRecord-> recordBytes.add(kinesisClientRecord.data().remaining())); - bytesReceivedSummary.record(recordBytes.stream().mapToLong(Integer::longValue).sum()); - - List> records = kinesisRecordConverter.convert( + List kinesisOutputRecords = kinesisRecordConverter.convert( kinesisStreamConfig.getCompression().getDecompressionEngine(), processRecordsInput.records(), streamIdentifier.streamName()); int eventCount = 0; - for (Record record: records) { - Event event = record.getData(); + for (KinesisInputOutputRecord kinesisInputOutputRecord: kinesisOutputRecords) { + Record dataPrepperRecord = kinesisInputOutputRecord.getDataPrepperRecord(); + int incomingRecordSizeBytes = kinesisInputOutputRecord.getKinesisClientRecord().data().position(); + bytesReceivedSummary.record(incomingRecordSizeBytes); + Event event = dataPrepperRecord.getData(); acknowledgementSetOpt.ifPresent(acknowledgementSet -> acknowledgementSet.add(event)); - bufferAccumulator.add(record); + bufferAccumulator.add(dataPrepperRecord); + bytesProcessedSummary.record(incomingRecordSizeBytes); eventCount++; } // Flush buffer at the end bufferAccumulator.flush(); recordsProcessed.increment(eventCount); - bytesProcessedSummary.record(recordBytes.stream().mapToLong(Integer::longValue).sum()); // If acks are not enabled, mark the sequence number for checkpoint if (!acknowledgementsEnabled) { diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisInputOutputRecordTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisInputOutputRecordTest.java new file mode 100644 index 0000000000..808124995a --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisInputOutputRecordTest.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.kinesis.source; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.kinesis.source.converter.MetadataKeyAttributes; +import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisInputOutputRecord; +import software.amazon.kinesis.retrieval.KinesisClientRecord; + +import java.nio.ByteBuffer; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class KinesisInputOutputRecordTest { + + @Test + void builder_defaultCreatesObjectCorrectly() { + + KinesisInputOutputRecord kinesisInputOutputRecord = KinesisInputOutputRecord.builder().build(); + + assertNull(kinesisInputOutputRecord.getKinesisClientRecord()); + assertNull(kinesisInputOutputRecord.getDataPrepperRecord()); + } + + @Test + void builder_createsObjectCorrectly() { + Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); + event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, UUID.randomUUID().toString()); + Record record = new Record<>(event); + KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder() + .data(ByteBuffer.wrap(event.toJsonString().getBytes())) + .sequenceNumber(Integer.toString(1000)).subSequenceNumber(1).build(); + + KinesisInputOutputRecord kinesisInputOutputRecord = KinesisInputOutputRecord.builder() + .withKinesisClientRecord(kinesisClientRecord) + .withDataPrepperRecord(record) + .build(); + + assertNotNull(kinesisInputOutputRecord.getKinesisClientRecord()); + assertNotNull(kinesisInputOutputRecord.getDataPrepperRecord()); + assertEquals(kinesisInputOutputRecord.getDataPrepperRecord(), record); + assertEquals(kinesisInputOutputRecord.getKinesisClientRecord(), kinesisClientRecord); + } +} diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java index 95ecc10d41..5d1dbbf363 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/converter/KinesisRecordConverterTest.java @@ -20,6 +20,7 @@ import org.opensearch.dataprepper.plugins.codec.CompressionOption; import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputCodec; import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputConfig; +import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisInputOutputRecord; import software.amazon.kinesis.retrieval.KinesisClientRecord; import java.io.ByteArrayInputStream; @@ -99,13 +100,13 @@ public void testRecordConverterWithNdJsonInputCodec() throws IOException { InputStream inputStream = new ByteArrayInputStream(writer.toString().getBytes()); when(decompressionEngine.createInputStream(any(InputStream.class))).thenReturn(inputStream); - List> events = kinesisRecordConverter.convert(decompressionEngine, List.of(kinesisClientRecord), streamId); + List kinesisOutputRecords = kinesisRecordConverter.convert(decompressionEngine, List.of(kinesisClientRecord), streamId); - assertEquals(events.size(), numRecords); - events.forEach(eventRecord -> { - assertEquals(eventRecord.getData().getMetadata().getAttribute(MetadataKeyAttributes.KINESIS_PARTITION_KEY_METADATA_ATTRIBUTE), partitionKey); - assertEquals(eventRecord.getData().getMetadata().getAttribute(MetadataKeyAttributes.KINESIS_SEQUENCE_NUMBER_METADATA_ATTRIBUTE), sequenceNumber); - assertEquals(eventRecord.getData().getMetadata().getAttribute(MetadataKeyAttributes.KINESIS_SUB_SEQUENCE_NUMBER_METADATA_ATTRIBUTE), subsequenceNumber); + assertEquals(kinesisOutputRecords.size(), numRecords); + kinesisOutputRecords.forEach(KinesisInputOutputRecord -> { + assertEquals(KinesisInputOutputRecord.getDataPrepperRecord().getData().getMetadata().getAttribute(MetadataKeyAttributes.KINESIS_PARTITION_KEY_METADATA_ATTRIBUTE), partitionKey); + assertEquals(KinesisInputOutputRecord.getDataPrepperRecord().getData().getMetadata().getAttribute(MetadataKeyAttributes.KINESIS_SEQUENCE_NUMBER_METADATA_ATTRIBUTE), sequenceNumber); + assertEquals(KinesisInputOutputRecord.getDataPrepperRecord().getData().getMetadata().getAttribute(MetadataKeyAttributes.KINESIS_SUB_SEQUENCE_NUMBER_METADATA_ATTRIBUTE), subsequenceNumber); }); } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java index 204c52021a..97a3a167fc 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/processor/KinesisRecordProcessorTest.java @@ -211,22 +211,29 @@ public void setup() throws IOException { @Test void testProcessRecordsWithoutAcknowledgementsWithCheckpointApplied() throws Exception { - List kinesisClientRecords = createInputKinesisClientRecords(); - when(processRecordsInput.records()).thenReturn(kinesisClientRecords); - - final Long recordsSize = kinesisClientRecords.stream() - .map(kinesisClientRecord -> kinesisClientRecord.data().array().length) - .mapToLong(Integer::longValue).sum(); - when(kinesisSourceConfig.isAcknowledgments()).thenReturn(false); when(kinesisStreamConfig.getCheckPointInterval()).thenReturn(Duration.ofMillis(0)); when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet); - List> records = new ArrayList<>(); + List records = new ArrayList<>(); Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId); Record record = new Record<>(event); - records.add(record); + KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder() + .data(ByteBuffer.wrap(event.toJsonString().getBytes())) + .sequenceNumber(Integer.toString(1000)).subSequenceNumber(1).build(); + List kinesisClientRecords = new ArrayList<>(); + kinesisClientRecords.add(kinesisClientRecord); + records.add(KinesisInputOutputRecord.builder() + .withDataPrepperRecord(record) + .withKinesisClientRecord(kinesisClientRecord).build() + ); + when(processRecordsInput.records()).thenReturn(kinesisClientRecords); + + final Long recordsSize = kinesisClientRecords.stream() + .map(kclRecord -> kclRecord.data().position()) + .mapToLong(Integer::longValue).sum(); + InputStream inputStream = mock(InputStream.class); when(decompressionEngine.createInputStream(inputStream)).thenReturn(inputStream); when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records); @@ -267,21 +274,29 @@ void testProcessRecordsWithoutAcknowledgementsWithCheckpointApplied() @Test public void testProcessRecordsWithoutAcknowledgementsEnabled() throws Exception { - List kinesisClientRecords = createInputKinesisClientRecords(); - when(processRecordsInput.records()).thenReturn(kinesisClientRecords); when(kinesisSourceConfig.isAcknowledgments()).thenReturn(false); when(kinesisStreamConfig.getCheckPointInterval()).thenReturn(Duration.ofMillis(0)); when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet); - final Long recordsSize = kinesisClientRecords.stream() - .map(kinesisClientRecord -> kinesisClientRecord.data().array().length) - .mapToLong(Integer::longValue).sum(); - - List> records = new ArrayList<>(); + List records = new ArrayList<>(); Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId); Record record = new Record<>(event); - records.add(record); + KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder() + .data(ByteBuffer.wrap(event.toJsonString().getBytes())) + .sequenceNumber(Integer.toString(1000)).subSequenceNumber(1).build(); + List kinesisClientRecords = new ArrayList<>(); + kinesisClientRecords.add(kinesisClientRecord); + records.add(KinesisInputOutputRecord.builder() + .withDataPrepperRecord(record) + .withKinesisClientRecord(kinesisClientRecord).build() + ); + + final Long recordsSize = kinesisClientRecords.stream() + .map(kclRecord -> kclRecord.data().position()) + .mapToLong(Integer::longValue).sum(); + + when(processRecordsInput.records()).thenReturn(kinesisClientRecords); when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records); kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, @@ -314,8 +329,6 @@ public void testProcessRecordsWithoutAcknowledgementsEnabled() @Test void testProcessRecordsWithAcknowledgementsEnabled() throws Exception { - List kinesisClientRecords = createInputKinesisClientRecords(); - when(processRecordsInput.records()).thenReturn(kinesisClientRecords); when(kinesisSourceConfig.isAcknowledgments()).thenReturn(true); when(kinesisStreamConfig.getCheckPointInterval()).thenReturn(Duration.ofMillis(0)); AtomicReference numEventsAdded = new AtomicReference<>(0); @@ -330,15 +343,23 @@ void testProcessRecordsWithAcknowledgementsEnabled() return acknowledgementSet; }).when(acknowledgementSetManager).create(any(Consumer.class), any(Duration.class)); - final Long recordsSize = kinesisClientRecords.stream() - .map(kinesisClientRecord -> kinesisClientRecord.data().array().length) - .mapToLong(Integer::longValue).sum(); - - List> records = new ArrayList<>(); + List records = new ArrayList<>(); Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId); Record record = new Record<>(event); - records.add(record); + KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder() + .data(ByteBuffer.wrap(event.toJsonString().getBytes())) + .sequenceNumber(Integer.toString(1000)).subSequenceNumber(1).build(); + List kinesisClientRecords = new ArrayList<>(); + when(processRecordsInput.records()).thenReturn(kinesisClientRecords); + kinesisClientRecords.add(kinesisClientRecord); + records.add(KinesisInputOutputRecord.builder() + .withDataPrepperRecord(record) + .withKinesisClientRecord(kinesisClientRecord).build() + ); + final Long recordsSize = kinesisClientRecords.stream() + .map(kclRecord -> kclRecord.data().position()) + .mapToLong(Integer::longValue).sum(); when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records); kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, @@ -376,15 +397,9 @@ void testProcessRecordsWithAcknowledgementsEnabled() @Test void testProcessRecordsWithNDJsonInputCodec() throws Exception { - List kinesisClientRecords = createInputKinesisClientRecords(); - when(processRecordsInput.records()).thenReturn(kinesisClientRecords); when(kinesisSourceConfig.isAcknowledgments()).thenReturn(false); when(kinesisStreamConfig.getCheckPointInterval()).thenReturn(Duration.ofMillis(0)); - final Long recordsSize = kinesisClientRecords.stream() - .map(kinesisClientRecord -> kinesisClientRecord.data().array().length) - .mapToLong(Integer::longValue).sum(); - PluginModel pluginModel = mock(PluginModel.class); when(pluginModel.getPluginName()).thenReturn("ndjson"); when(pluginModel.getPluginSettings()).thenReturn(Collections.emptyMap()); @@ -395,11 +410,23 @@ void testProcessRecordsWithNDJsonInputCodec() when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet); - List> records = new ArrayList<>(); + List records = new ArrayList<>(); Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId); Record record = new Record<>(event); - records.add(record); + KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder() + .data(ByteBuffer.wrap(event.toJsonString().getBytes())) + .sequenceNumber(Integer.toString(1000)).subSequenceNumber(1).build(); + List kinesisClientRecords = new ArrayList<>(); + when(processRecordsInput.records()).thenReturn(kinesisClientRecords); + kinesisClientRecords.add(kinesisClientRecord); + records.add(KinesisInputOutputRecord.builder() + .withDataPrepperRecord(record) + .withKinesisClientRecord(kinesisClientRecord).build() + ); + final Long recordsSize = kinesisClientRecords.stream() + .map(kclRecord -> kclRecord.data().position()) + .mapToLong(Integer::longValue).sum(); when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records); kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, @@ -436,21 +463,28 @@ void testProcessRecordsWithNDJsonInputCodec() @Test void testProcessRecordsNoThrowException() throws Exception { - List kinesisClientRecords = createInputKinesisClientRecords(); - when(processRecordsInput.records()).thenReturn(kinesisClientRecords); when(kinesisSourceConfig.isAcknowledgments()).thenReturn(false); - List> records = new ArrayList<>(); + List records = new ArrayList<>(); Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId); Record record = new Record<>(event); - records.add(record); + List kinesisClientRecords = new ArrayList<>(); + when(processRecordsInput.records()).thenReturn(kinesisClientRecords); + KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder() + .data(ByteBuffer.wrap(event.toJsonString().getBytes())) + .sequenceNumber(Integer.toString(1000)).subSequenceNumber(1).build(); + kinesisClientRecords.add(kinesisClientRecord); + records.add(KinesisInputOutputRecord.builder() + .withDataPrepperRecord(record) + .withKinesisClientRecord(kinesisClientRecord).build() + ); when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records); final Throwable exception = mock(RuntimeException.class); doThrow(exception).when(bufferAccumulator).add(any(Record.class)); final Long recordsSize = kinesisClientRecords.stream() - .map(kinesisClientRecord -> kinesisClientRecord.data().array().length) + .map(kclRecord -> kclRecord.data().position()) .mapToLong(Integer::longValue).sum(); kinesisRecordProcessor = new KinesisRecordProcessor(bufferAccumulator, kinesisSourceConfig, @@ -467,19 +501,26 @@ void testProcessRecordsNoThrowException() @Test void testProcessRecordsBufferFlushNoThrowException() throws Exception { - List kinesisClientRecords = createInputKinesisClientRecords(); - when(processRecordsInput.records()).thenReturn(kinesisClientRecords); when(kinesisSourceConfig.isAcknowledgments()).thenReturn(false); - final Long recordsSize = kinesisClientRecords.stream() - .map(kinesisClientRecord -> kinesisClientRecord.data().array().length) - .mapToLong(Integer::longValue).sum(); - - List> records = new ArrayList<>(); + List records = new ArrayList<>(); Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString()); event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, streamId); Record record = new Record<>(event); - records.add(record); + KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder() + .data(ByteBuffer.wrap(event.toJsonString().getBytes())) + .sequenceNumber(Integer.toString(1000)).subSequenceNumber(1).build(); + List kinesisClientRecords = new ArrayList<>(); + when(processRecordsInput.records()).thenReturn(kinesisClientRecords); + kinesisClientRecords.add(kinesisClientRecord); + records.add(KinesisInputOutputRecord.builder() + .withDataPrepperRecord(record) + .withKinesisClientRecord(kinesisClientRecord).build() + ); + final Long recordsSize = kinesisClientRecords.stream() + .map(kclRecord -> kclRecord.data().position()) + .mapToLong(Integer::longValue).sum(); + when(kinesisRecordConverter.convert(eq(decompressionEngine), eq(kinesisClientRecords), eq(streamId))).thenReturn(records); final Throwable exception = mock(RuntimeException.class); doThrow(exception).when(bufferAccumulator).flush(); @@ -492,7 +533,7 @@ void testProcessRecordsBufferFlushNoThrowException() verify(recordProcessingErrors, times(1)).increment(); verify(recordProcessed, times(0)).increment(anyDouble()); verify(bytesReceivedSummary, times(1)).record(eq(recordsSize.doubleValue())); - verify(bytesProcessedSummary, times(0)).record(eq(recordsSize.doubleValue())); + verify(bytesProcessedSummary, times(1)).record(eq(recordsSize.doubleValue())); } @Test