Skip to content

Commit

Permalink
Update the rename_keys and delete_entries processors to use the Event…
Browse files Browse the repository at this point in the history
…Key. (opensearch-project#4636)

Signed-off-by: David Venable <dlv@amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
dlvenable authored and Krishna Kondaka committed Jul 23, 2024
1 parent c3957b7 commit ae3470e
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 19 deletions.
2 changes: 2 additions & 0 deletions data-prepper-plugins/mutate-event-processors/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation project(':data-prepper-test-event')
testImplementation testLibs.slf4j.simple
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.List;
import java.util.Objects;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT;
Expand All @@ -25,7 +27,7 @@
public class DeleteEntryProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(DeleteEntryProcessor.class);
private final String[] entries;
private final List<EventKey> entries;
private final String deleteWhen;

private final ExpressionEvaluator expressionEvaluator;
Expand All @@ -49,7 +51,7 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
}


for (String entry : entries) {
for (final EventKey entry : entries) {
recordEvent.delete(entry);
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.EventKeyConfiguration;
import org.opensearch.dataprepper.model.event.EventKeyFactory;

import java.util.List;

public class DeleteEntryProcessorConfig {
@NotEmpty
@NotNull
@JsonProperty("with_keys")
private String[] withKeys;
@EventKeyConfiguration(EventKeyFactory.EventAction.DELETE)
private List<@NotNull @NotEmpty EventKey> withKeys;

@JsonProperty("delete_when")
private String deleteWhen;

public String[] getWithKeys() {
public List<EventKey> getWithKeys() {
return withKeys;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.EventKeyConfiguration;
import org.opensearch.dataprepper.model.event.EventKeyFactory;

import java.util.List;

Expand All @@ -17,24 +20,26 @@ public static class Entry {
@NotEmpty
@NotNull
@JsonProperty("from_key")
private String fromKey;
@EventKeyConfiguration({EventKeyFactory.EventAction.GET, EventKeyFactory.EventAction.DELETE})
private EventKey fromKey;

@NotEmpty
@NotNull
@JsonProperty("to_key")
private String toKey;
@EventKeyConfiguration(EventKeyFactory.EventAction.PUT)
private EventKey toKey;

@JsonProperty("overwrite_if_to_key_exists")
private boolean overwriteIfToKeyExists = false;

@JsonProperty("rename_when")
private String renameWhen;

public String getFromKey() {
public EventKey getFromKey() {
return fromKey;
}

public String getToKey() {
public EventKey getToKey() {
return toKey;
}

Expand All @@ -44,7 +49,7 @@ public boolean getOverwriteIfToKeyExists() {

public String getRenameWhen() { return renameWhen; }

public Entry(final String fromKey, final String toKey, final boolean overwriteIfKeyExists, final String renameWhen) {
public Entry(final EventKey fromKey, final EventKey toKey, final boolean overwriteIfKeyExists, final String renameWhen) {
this.fromKey = fromKey;
this.toKey = toKey;
this.overwriteIfToKeyExists = overwriteIfKeyExists;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@

package org.opensearch.dataprepper.plugins.processor.mutateevent;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.event.TestEventKeyFactory;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Collections;
import java.util.HashMap;
Expand All @@ -36,9 +38,11 @@ public class DeleteEntryProcessorTests {
@Mock
private ExpressionEvaluator expressionEvaluator;

private final EventKeyFactory eventKeyFactory = TestEventKeyFactory.getTestEventFactory();

@Test
public void testSingleDeleteProcessorTest() {
when(mockConfig.getWithKeys()).thenReturn(new String[] { "message" });
when(mockConfig.getWithKeys()).thenReturn(List.of(eventKeyFactory.createEventKey("message", EventKeyFactory.EventAction.DELETE)));
when(mockConfig.getDeleteWhen()).thenReturn(null);

final DeleteEntryProcessor processor = createObjectUnderTest();
Expand All @@ -52,7 +56,7 @@ public void testSingleDeleteProcessorTest() {

@Test
public void testWithKeyDneDeleteProcessorTest() {
when(mockConfig.getWithKeys()).thenReturn(new String[] { "message2" });
when(mockConfig.getWithKeys()).thenReturn(List.of(eventKeyFactory.createEventKey("message2", EventKeyFactory.EventAction.DELETE)));
when(mockConfig.getDeleteWhen()).thenReturn(null);

final DeleteEntryProcessor processor = createObjectUnderTest();
Expand All @@ -67,7 +71,9 @@ public void testWithKeyDneDeleteProcessorTest() {

@Test
public void testMultiDeleteProcessorTest() {
when(mockConfig.getWithKeys()).thenReturn(new String[] { "message", "message2" });
when(mockConfig.getWithKeys()).thenReturn(List.of(
eventKeyFactory.createEventKey("message", EventKeyFactory.EventAction.DELETE),
eventKeyFactory.createEventKey("message2", EventKeyFactory.EventAction.DELETE)));
when(mockConfig.getDeleteWhen()).thenReturn(null);

final DeleteEntryProcessor processor = createObjectUnderTest();
Expand All @@ -83,7 +89,7 @@ public void testMultiDeleteProcessorTest() {

@Test
public void testKeyIsNotDeleted_when_deleteWhen_returns_false() {
when(mockConfig.getWithKeys()).thenReturn(new String[] { "message" });
when(mockConfig.getWithKeys()).thenReturn(List.of(eventKeyFactory.createEventKey("message", EventKeyFactory.EventAction.DELETE)));
final String deleteWhen = UUID.randomUUID().toString();
when(mockConfig.getDeleteWhen()).thenReturn(deleteWhen);

Expand All @@ -98,8 +104,9 @@ public void testKeyIsNotDeleted_when_deleteWhen_returns_false() {
assertThat(editedRecords.get(0).getData().containsKey("newMessage"), is(true));
}

@Test
public void testNestedDeleteProcessorTest() {
when(mockConfig.getWithKeys()).thenReturn(new String[]{"nested/foo"});
when(mockConfig.getWithKeys()).thenReturn(List.of(eventKeyFactory.createEventKey("nested/foo", EventKeyFactory.EventAction.DELETE)));

Map<String, Object> nested = Map.of("foo", "bar", "fizz", 42);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@

package org.opensearch.dataprepper.plugins.processor.mutateevent;

import org.opensearch.dataprepper.event.TestEventKeyFactory;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -39,6 +42,8 @@ public class RenameKeyProcessorTests {
@Mock
private ExpressionEvaluator expressionEvaluator;

private final EventKeyFactory eventKeyFactory = TestEventKeyFactory.getTestEventFactory();

@Test
public void testSingleOverwriteRenameProcessorTests() {
when(mockConfig.getEntries()).thenReturn(createListOfEntries(createEntry("message", "newMessage", true, null)));
Expand Down Expand Up @@ -136,7 +141,9 @@ private RenameKeyProcessor createObjectUnderTest() {
}

private RenameKeyProcessorConfig.Entry createEntry(final String fromKey, final String toKey, final boolean overwriteIfToKeyExists, final String renameWhen) {
return new RenameKeyProcessorConfig.Entry(fromKey, toKey, overwriteIfToKeyExists, renameWhen);
final EventKey fromEventKey = eventKeyFactory.createEventKey(fromKey);
final EventKey toEventKey = eventKeyFactory.createEventKey(toKey);
return new RenameKeyProcessorConfig.Entry(fromEventKey, toEventKey, overwriteIfToKeyExists, renameWhen);
}

private List<RenameKeyProcessorConfig.Entry> createListOfEntries(final RenameKeyProcessorConfig.Entry... entries) {
Expand Down

0 comments on commit ae3470e

Please sign in to comment.