Skip to content

Commit

Permalink
Address review comments.
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <souvbose@amazon.com>
  • Loading branch information
sbose2k21 committed Dec 5, 2024
1 parent f79a4f6 commit 9e6d583
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisInputOutputRecord;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import java.io.ByteArrayInputStream;
Expand All @@ -32,13 +33,12 @@ public KinesisRecordConverter(final InputCodec codec) {
this.codec = codec;
}

public List<Record<Event>> convert(final DecompressionEngine decompressionEngine,
List<KinesisClientRecord> kinesisClientRecords,
final String streamName) throws IOException {
List<Record<Event>> records = new ArrayList<>();
public List<KinesisInputOutputRecord> convert(final DecompressionEngine decompressionEngine,
List<KinesisClientRecord> kinesisClientRecords,
final String streamName) throws IOException {
List<KinesisInputOutputRecord> records = new ArrayList<>();
for (KinesisClientRecord kinesisClientRecord : kinesisClientRecords) {
processRecord(decompressionEngine, kinesisClientRecord, record -> {
records.add(record);
Event event = record.getData();
EventMetadata eventMetadata = event.getMetadata();
eventMetadata.setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE,
Expand All @@ -49,6 +49,9 @@ public List<Record<Event>> convert(final DecompressionEngine decompressionEngine
final Instant externalOriginationTime = kinesisClientRecord.approximateArrivalTimestamp();
event.getEventHandle().setExternalOriginationTime(externalOriginationTime);
event.getMetadata().setExternalOriginationTime(externalOriginationTime);
records.add(KinesisInputOutputRecord.builder()
.withKinesisClientRecord(kinesisClientRecord)
.withDataPrepperRecord(record).build());
});
}
return records;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source.processor;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

@Builder(setterPrefix = "with")
@Getter
@AllArgsConstructor
public class KinesisInputOutputRecord {
private Record<Event> dataPrepperRecord;
private KinesisClientRecord kinesisClientRecord;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -170,28 +171,26 @@ public void processRecords(ProcessRecordsInput processRecordsInput) {
// Track the records for checkpoint purpose
kinesisCheckpointerTracker.addRecordForCheckpoint(extendedSequenceNumber, processRecordsInput.checkpointer());

// Get the size of bytes received from Kinesis stream
final List<Integer> recordBytes = new ArrayList<>();
processRecordsInput.records().forEach(kinesisClientRecord-> recordBytes.add(kinesisClientRecord.data().remaining()));
bytesReceivedSummary.record(recordBytes.stream().mapToLong(Integer::longValue).sum());

List<Record<Event>> records = kinesisRecordConverter.convert(
List<KinesisInputOutputRecord> kinesisOutputRecords = kinesisRecordConverter.convert(
kinesisStreamConfig.getCompression().getDecompressionEngine(),
processRecordsInput.records(), streamIdentifier.streamName());

int eventCount = 0;
for (Record<Event> record: records) {
Event event = record.getData();
for (KinesisInputOutputRecord kinesisInputOutputRecord: kinesisOutputRecords) {
Record<Event> dataPrepperRecord = kinesisInputOutputRecord.getDataPrepperRecord();
int incomingRecordSizeBytes = kinesisInputOutputRecord.getKinesisClientRecord().data().position();
bytesReceivedSummary.record(incomingRecordSizeBytes);
Event event = dataPrepperRecord.getData();
acknowledgementSetOpt.ifPresent(acknowledgementSet -> acknowledgementSet.add(event));

bufferAccumulator.add(record);
bufferAccumulator.add(dataPrepperRecord);
bytesProcessedSummary.record(incomingRecordSizeBytes);
eventCount++;
}

// Flush buffer at the end
bufferAccumulator.flush();
recordsProcessed.increment(eventCount);
bytesProcessedSummary.record(recordBytes.stream().mapToLong(Integer::longValue).sum());

// If acks are not enabled, mark the sequence number for checkpoint
if (!acknowledgementsEnabled) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.kinesis.source;

import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kinesis.source.converter.MetadataKeyAttributes;
import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisInputOutputRecord;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import java.nio.ByteBuffer;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

public class KinesisInputOutputRecordTest {

@Test
void builder_defaultCreatesObjectCorrectly() {

KinesisInputOutputRecord kinesisInputOutputRecord = KinesisInputOutputRecord.builder().build();

assertNull(kinesisInputOutputRecord.getKinesisClientRecord());
assertNull(kinesisInputOutputRecord.getDataPrepperRecord());
}

@Test
void builder_createsObjectCorrectly() {
Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
event.getMetadata().setAttribute(MetadataKeyAttributes.KINESIS_STREAM_NAME_METADATA_ATTRIBUTE, UUID.randomUUID().toString());
Record<Event> record = new Record<>(event);
KinesisClientRecord kinesisClientRecord = KinesisClientRecord.builder()
.data(ByteBuffer.wrap(event.toJsonString().getBytes()))
.sequenceNumber(Integer.toString(1000)).subSequenceNumber(1).build();

KinesisInputOutputRecord kinesisInputOutputRecord = KinesisInputOutputRecord.builder()
.withKinesisClientRecord(kinesisClientRecord)
.withDataPrepperRecord(record)
.build();

assertNotNull(kinesisInputOutputRecord.getKinesisClientRecord());
assertNotNull(kinesisInputOutputRecord.getDataPrepperRecord());
assertEquals(kinesisInputOutputRecord.getDataPrepperRecord(), record);
assertEquals(kinesisInputOutputRecord.getKinesisClientRecord(), kinesisClientRecord);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputCodec;
import org.opensearch.dataprepper.plugins.codec.json.NdjsonInputConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisInputOutputRecord;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -99,13 +100,13 @@ public void testRecordConverterWithNdJsonInputCodec() throws IOException {
InputStream inputStream = new ByteArrayInputStream(writer.toString().getBytes());
when(decompressionEngine.createInputStream(any(InputStream.class))).thenReturn(inputStream);

List<Record<Event>> events = kinesisRecordConverter.convert(decompressionEngine, List.of(kinesisClientRecord), streamId);
List<KinesisInputOutputRecord> kinesisOutputRecords = kinesisRecordConverter.convert(decompressionEngine, List.of(kinesisClientRecord), streamId);

assertEquals(events.size(), numRecords);
events.forEach(eventRecord -> {
assertEquals(eventRecord.getData().getMetadata().getAttribute(MetadataKeyAttributes.KINESIS_PARTITION_KEY_METADATA_ATTRIBUTE), partitionKey);
assertEquals(eventRecord.getData().getMetadata().getAttribute(MetadataKeyAttributes.KINESIS_SEQUENCE_NUMBER_METADATA_ATTRIBUTE), sequenceNumber);
assertEquals(eventRecord.getData().getMetadata().getAttribute(MetadataKeyAttributes.KINESIS_SUB_SEQUENCE_NUMBER_METADATA_ATTRIBUTE), subsequenceNumber);
assertEquals(kinesisOutputRecords.size(), numRecords);
kinesisOutputRecords.forEach(KinesisInputOutputRecord -> {
assertEquals(KinesisInputOutputRecord.getDataPrepperRecord().getData().getMetadata().getAttribute(MetadataKeyAttributes.KINESIS_PARTITION_KEY_METADATA_ATTRIBUTE), partitionKey);
assertEquals(KinesisInputOutputRecord.getDataPrepperRecord().getData().getMetadata().getAttribute(MetadataKeyAttributes.KINESIS_SEQUENCE_NUMBER_METADATA_ATTRIBUTE), sequenceNumber);
assertEquals(KinesisInputOutputRecord.getDataPrepperRecord().getData().getMetadata().getAttribute(MetadataKeyAttributes.KINESIS_SUB_SEQUENCE_NUMBER_METADATA_ATTRIBUTE), subsequenceNumber);
});
}

Expand Down
Loading

0 comments on commit 9e6d583

Please sign in to comment.