Skip to content

Commit

Permalink
Adding ttl_delete parameter to metadata for DynamoDB (opensearch-proj…
Browse files Browse the repository at this point in the history
…ect#4982)

Signed-off-by: Lee Hannigan <lhnng@amazon.com>
  • Loading branch information
LeeroyHannigan authored Oct 10, 2024
1 parent 76d76fc commit cc5a39e
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ public class MetadataKeyAttributes {
static final String DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE = "dynamodb_event_name";

static final String EVENT_TABLE_NAME_METADATA_ATTRIBUTE = "table_name";
static final String DDB_STREAM_EVENT_IS_TTL_DELETE = "ttl_delete";
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import software.amazon.awssdk.services.dynamodb.model.Identity;
import software.amazon.awssdk.services.dynamodb.model.OperationType;

import java.math.BigDecimal;
import java.time.Instant;
Expand All @@ -26,6 +28,7 @@
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.SORT_KEY_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_IS_TTL_DELETE;

/**
* Base Record Processor definition.
Expand All @@ -39,6 +42,8 @@ public abstract class RecordConverter {
private final BufferAccumulator<Record<Event>> bufferAccumulator;

private final TableInfo tableInfo;
static final String TTL_USER_PRINCIPAL = "dynamodb.amazonaws.com";
static final String TTL_USER_TYPE = "Service";

public RecordConverter(final BufferAccumulator<Record<Event>> bufferAccumulator, TableInfo tableInfo) {
this.bufferAccumulator = bufferAccumulator;
Expand Down Expand Up @@ -76,14 +81,16 @@ void flushBuffer() throws Exception {
* @param keys A map to hold the keys (partition key and sort key)
* @param eventCreationTimeMillis Creation timestamp of the event
* @param eventName Event name
* @param userIdentity UserIdentity for TTL based deletes
* @throws Exception Exception if failed to write to buffer.
*/
public void addToBuffer(final AcknowledgementSet acknowledgementSet,
final Map<String, Object> data,
final Map<String, Object> keys,
final long eventCreationTimeMillis,
final long eventVersionNumber,
final String eventName) throws Exception {
final String eventName,
final Identity userIdentity) throws Exception {
Event event = JacksonEvent.builder()
.withEventType(getEventType())
.withData(data)
Expand All @@ -95,6 +102,7 @@ public void addToBuffer(final AcknowledgementSet acknowledgementSet,
event.getEventHandle().setExternalOriginationTime(externalOriginationTime);
event.getMetadata().setExternalOriginationTime(externalOriginationTime);
}

EventMetadata eventMetadata = event.getMetadata();

eventMetadata.setAttribute(EVENT_TABLE_NAME_METADATA_ATTRIBUTE, tableInfo.getTableName());
Expand All @@ -103,6 +111,13 @@ public void addToBuffer(final AcknowledgementSet acknowledgementSet,
eventMetadata.setAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE, mapStreamEventNameToBulkAction(eventName));
eventMetadata.setAttribute(EVENT_VERSION_FROM_TIMESTAMP, eventVersionNumber);

// Only set ttl_delete for stream events, which are of type REMOVE containing a userIdentity
final boolean isTtlDelete = OperationType.REMOVE.toString().equals(eventName) &&
userIdentity != null &&
TTL_USER_PRINCIPAL.equals(userIdentity.principalId()) &&
TTL_USER_TYPE.equals(userIdentity.type());
eventMetadata.setAttribute(DDB_STREAM_EVENT_IS_TTL_DELETE, isTtlDelete);

String partitionKey = getAttributeValue(keys, tableInfo.getMetadata().getPartitionKeyAttributeName());
eventMetadata.setAttribute(PARTITION_KEY_METADATA_ATTRIBUTE, partitionKey);

Expand All @@ -123,7 +138,7 @@ public void addToBuffer(final AcknowledgementSet acknowledgementSet,
final Map<String, Object> data,
final long timestamp,
final long eventVersionNumber) throws Exception {
addToBuffer(acknowledgementSet, data, data, timestamp, eventVersionNumber, null);
addToBuffer(acknowledgementSet, data, data, timestamp, eventVersionNumber, null, null);
}

private String mapStreamEventNameToBulkAction(final String streamEventName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List<Reco
try {
bytesReceivedSummary.record(bytes);
final long eventCreationTimeMillis = calculateTieBreakingVersionFromTimestamp(record.dynamodb().approximateCreationDateTime());
addToBuffer(acknowledgementSet, data, keys, record.dynamodb().approximateCreationDateTime().toEpochMilli(), eventCreationTimeMillis, record.eventNameAsString());
addToBuffer(acknowledgementSet, data, keys, record.dynamodb().approximateCreationDateTime().toEpochMilli(), eventCreationTimeMillis, record.eventNameAsString(), record.userIdentity());
bytesProcessedSummary.record(bytes);
eventCount++;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import software.amazon.awssdk.services.dynamodb.model.OperationType;
import software.amazon.awssdk.services.dynamodb.model.StreamRecord;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.Identity;


import java.time.Instant;
import java.util.ArrayList;
Expand Down Expand Up @@ -57,6 +59,7 @@
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.SORT_KEY_METADATA_ATTRIBUTE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_IS_TTL_DELETE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.BYTES_PROCESSED;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.BYTES_RECEIVED;
import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.CHANGE_EVENTS_PROCESSED_COUNT;
Expand Down Expand Up @@ -95,6 +98,8 @@ class StreamRecordConverterTest {

private final String partitionKeyAttrName = "PK";
private final String sortKeyAttrName = "SK";
private final String principalId = "dynamodb.amazonaws.com";
private final String userIdentityType = "Service";


@BeforeEach
Expand Down Expand Up @@ -211,6 +216,7 @@ void test_writeSingleRecordToBuffer_with_other_data(final String additionalStrin
assertThat(event.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.INDEX.toString()));
assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo("INSERT"));
assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(record.dynamodb().approximateCreationDateTime().toEpochMilli()));
assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_IS_TTL_DELETE), equalTo(false));

assertThat(event.get(partitionKeyAttrName, String.class), notNullValue());
assertThat(event.get(sortKeyAttrName, String.class), notNullValue());
Expand Down Expand Up @@ -355,7 +361,7 @@ void remove_record_with_use_old_image_on_delete_uses_old_image() throws Exceptio

final Map<String, AttributeValue> newImage = Map.of(newImageKey, AttributeValue.builder().s(newImageValue).build());
final Map<String, AttributeValue> oldImage = Map.of(oldImageKey, AttributeValue.builder().s(oldImageValue).build());
List<software.amazon.awssdk.services.dynamodb.model.Record> records = Collections.singletonList(buildRecord(Instant.now(), newImage, oldImage, OperationType.REMOVE));
List<software.amazon.awssdk.services.dynamodb.model.Record> records = Collections.singletonList(buildRecord(Instant.now(), newImage, oldImage, OperationType.REMOVE, null));
final ArgumentCaptor<Record> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class);
software.amazon.awssdk.services.dynamodb.model.Record record = records.get(0);
final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics, streamConfig);
Expand Down Expand Up @@ -397,7 +403,7 @@ void remove_record_with_use_old_image_on_delete_with_no_new_image_found_uses_new
final String newImageValue = UUID.randomUUID().toString();

final Map<String, AttributeValue> newImage = Map.of(newImageKey, AttributeValue.builder().s(newImageValue).build());
List<software.amazon.awssdk.services.dynamodb.model.Record> records = Collections.singletonList(buildRecord(Instant.now(), newImage, null, OperationType.REMOVE));
List<software.amazon.awssdk.services.dynamodb.model.Record> records = Collections.singletonList(buildRecord(Instant.now(), newImage, null, OperationType.REMOVE, null));
final ArgumentCaptor<Record> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class);
software.amazon.awssdk.services.dynamodb.model.Record record = records.get(0);
final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics, streamConfig);
Expand Down Expand Up @@ -442,7 +448,7 @@ void remove_record_without_use_old_image_on_delete_uses_new_image() throws Excep

final Map<String, AttributeValue> newImage = Map.of(newImageKey, AttributeValue.builder().s(newImageValue).build());
final Map<String, AttributeValue> oldImage = Map.of(oldImageKey, AttributeValue.builder().s(oldImageValue).build());
List<software.amazon.awssdk.services.dynamodb.model.Record> records = Collections.singletonList(buildRecord(Instant.now(), newImage, oldImage, OperationType.REMOVE));
List<software.amazon.awssdk.services.dynamodb.model.Record> records = Collections.singletonList(buildRecord(Instant.now(), newImage, oldImage, OperationType.REMOVE, null));
final ArgumentCaptor<Record> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class);
software.amazon.awssdk.services.dynamodb.model.Record record = records.get(0);
final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics, streamConfig);
Expand Down Expand Up @@ -476,6 +482,72 @@ void remove_record_without_use_old_image_on_delete_uses_new_image() throws Excep
verify(bytesProcessedSummary).record(record.dynamodb().sizeBytes());
}

@Test
void test_writeSingleRecordToBuffer_with_userIdentity() throws Exception {

when(streamConfig.getStreamViewForRemoves()).thenReturn(StreamViewType.OLD_IMAGE);

final Identity userIdentity = Identity.builder()
.principalId(principalId)
.type(userIdentityType)
.build();

final String newImageKey = UUID.randomUUID().toString();
final String newImageValue = UUID.randomUUID().toString();

final Map<String, AttributeValue> newImage = Map.of(newImageKey, AttributeValue.builder().s(newImageValue).build());
List<software.amazon.awssdk.services.dynamodb.model.Record> records = Collections.singletonList(buildRecord(Instant.now(), newImage, null, OperationType.REMOVE, userIdentity));
final ArgumentCaptor<Record> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class);
software.amazon.awssdk.services.dynamodb.model.Record record = records.get(0);
final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics, streamConfig);
doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture());

objectUnderTest.writeToBuffer(null, records);

verify(bufferAccumulator).add(any(Record.class));
verify(bufferAccumulator).flush();
verify(changeEventSuccessCounter).increment(anyDouble());
assertThat(recordArgumentCaptor.getValue().getData(), notNullValue());
JacksonEvent event = (JacksonEvent) recordArgumentCaptor.getValue().getData();

assertThat(event.getMetadata(), notNullValue());
assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_IS_TTL_DELETE), equalTo(Boolean.TRUE));

}

@Test
void test_writeSingleRecordToBuffer_with_wrong_userIdentity() throws Exception {

when(streamConfig.getStreamViewForRemoves()).thenReturn(StreamViewType.OLD_IMAGE);

final Identity userIdentity = Identity.builder()
.principalId("lambda.amazonaws.com")
.type(userIdentityType)
.build();

final String newImageKey = UUID.randomUUID().toString();
final String newImageValue = UUID.randomUUID().toString();

final Map<String, AttributeValue> newImage = Map.of(newImageKey, AttributeValue.builder().s(newImageValue).build());
List<software.amazon.awssdk.services.dynamodb.model.Record> records = Collections.singletonList(buildRecord(Instant.now(), newImage, null, OperationType.REMOVE, userIdentity));
final ArgumentCaptor<Record> recordArgumentCaptor = ArgumentCaptor.forClass(Record.class);
software.amazon.awssdk.services.dynamodb.model.Record record = records.get(0);
final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics, streamConfig);
doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture());

objectUnderTest.writeToBuffer(null, records);

verify(bufferAccumulator).add(any(Record.class));
verify(bufferAccumulator).flush();
verify(changeEventSuccessCounter).increment(anyDouble());
assertThat(recordArgumentCaptor.getValue().getData(), notNullValue());
JacksonEvent event = (JacksonEvent) recordArgumentCaptor.getValue().getData();

assertThat(event.getMetadata(), notNullValue());
assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_IS_TTL_DELETE), equalTo(Boolean.FALSE));

}

private List<software.amazon.awssdk.services.dynamodb.model.Record> buildRecords(int count, final Instant creationTime) {
return buildRecords(count, creationTime, Collections.emptyMap());
}
Expand All @@ -486,20 +558,34 @@ private List<software.amazon.awssdk.services.dynamodb.model.Record> buildRecords
final Map<String, AttributeValue> additionalData) {
List<software.amazon.awssdk.services.dynamodb.model.Record> records = new ArrayList<>();
for (int i = 0; i < count; i++) {
records.add(buildRecord(creationTime, additionalData, null, OperationType.INSERT));
records.add(buildRecord(creationTime, additionalData, null, OperationType.INSERT, null));
}

return records;
}

private List<software.amazon.awssdk.services.dynamodb.model.Record> buildRecords(
int count,
final Instant creationTime,
final Map<String, AttributeValue> additionalData,
final Identity userIdentity) {
List<software.amazon.awssdk.services.dynamodb.model.Record> records = new ArrayList<>();
for (int i = 0; i < count; i++) {
records.add(buildRecord(creationTime, additionalData, null, OperationType.INSERT, userIdentity));
}

return records;
}

private software.amazon.awssdk.services.dynamodb.model.Record buildRecord(final Instant creationTime) {
return buildRecord(creationTime, Collections.emptyMap(), null, OperationType.INSERT);
return buildRecord(creationTime, Collections.emptyMap(), null, OperationType.INSERT, null);
}

private software.amazon.awssdk.services.dynamodb.model.Record buildRecord(final Instant creationTime,
Map<String, AttributeValue> additionalData,
Map<String, AttributeValue> oldImage,
final OperationType operationType) {
final OperationType operationType,
final Identity userIdentity) {
Map<String, AttributeValue> keysData = Map.of(
partitionKeyAttrName, AttributeValue.builder().s(UUID.randomUUID().toString()).build(),
sortKeyAttrName, AttributeValue.builder().s(UUID.randomUUID().toString()).build());
Expand All @@ -526,6 +612,7 @@ private software.amazon.awssdk.services.dynamodb.model.Record buildRecord(final
software.amazon.awssdk.services.dynamodb.model.Record record = software.amazon.awssdk.services.dynamodb.model.Record.builder()
.dynamodb(streamRecord)
.eventName(operationType)
.userIdentity(userIdentity)
.build();
return record;
}
Expand Down

0 comments on commit cc5a39e

Please sign in to comment.