Skip to content

Commit

Permalink
[plugin-kafka] Add step for setting headers to event
Browse files Browse the repository at this point in the history
  • Loading branch information
abudevich committed Jun 12, 2024
1 parent ad8380b commit 644454a
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 14 deletions.
24 changes: 24 additions & 0 deletions docs/modules/plugins/pages/plugin-kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,30 @@ Where `<producer-key>` is the key of the producer configuration which should be

=== Steps

==== *Set headers to Kafka event*

Adds headers to the event, which will be sent to Kafka topic.

NOTE: The added headers are scoped to the first event, which will be sent, and not available afterwards.

[source,gherkin]
----
When I set Kafka event headers:$headers
----

* `headers` - The xref:ROOT:glossary.adoc#_examplestable[ExamplesTable] representing the list of the headers with columns `name` and `value`.

=== Examples

.Add event header with name `aggregate-type` and value `tenant`
[source,gherkin]
----
When I set Kafka event headers:
|name |value |
|aggregate-type|tenant|
When I send event with value `test-data` to `dev` Kafka topic `test-topic`
----

==== *Send event with value*

Sends the event with the value to the provided topic with no key or partition.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,9 +20,11 @@
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static org.apache.commons.lang3.StringUtils.substringAfter;
import static org.apache.commons.lang3.StringUtils.substringBefore;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -36,15 +38,20 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.hamcrest.Matcher;
import org.jbehave.core.annotations.AfterStory;
import org.jbehave.core.annotations.When;
import org.jbehave.core.model.ExamplesTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
Expand All @@ -67,13 +74,16 @@
public class KafkaSteps
{
private static final String DOT = ".";
private static final String NAME = "name";
private static final String VALUE = "value";

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSteps.class);

private static final int WAIT_TIMEOUT_IN_MINUTES = 10;

private static final Class<?> LISTENER_KEY = GenericMessageListenerContainer.class;
private static final Class<?> EVENTS_KEY = ConsumerRecord.class;
private static final Class<?> HEADERS_KEY = Header.class;

private final Map<String, KafkaTemplate<String, String>> kafkaTemplates;
private final Map<String, DefaultKafkaConsumerFactory<Object, Object>> consumerFactories;
Expand Down Expand Up @@ -114,6 +124,25 @@ private <T> Map<String, T> convert(String propertiesPrefix, IPropertyParser prop
factoryCreator::apply))));
}

/**
* Add kafka event headers
* @param headers ExamplesTable representing list of headers with columns "name" and "value" specifying kafka header
* names and values respectively
*/
@When("I set Kafka event headers:$headers")
public void addEventHeaders(ExamplesTable headers)
{
List<Header> kafkaHeaders = headers.getRowsAsParameters(true).stream()
.map(row ->
{
String name = row.valueAs(NAME, String.class);
String value = row.valueAs(VALUE, String.class);
return new RecordHeader(name, value.getBytes(StandardCharsets.UTF_8));
})
.collect(toList());
testContext.put(HEADERS_KEY, kafkaHeaders);
}

/**
* Send the event with the specified value to the provided topic with no key or partition.
* @param value The event value
Expand Down Expand Up @@ -150,7 +179,10 @@ public void sendEvent(String value, String producerKey, String topic)
public void sendEventWithKey(String key, String value, String producerKey, String topic)
throws InterruptedException, ExecutionException, TimeoutException
{
kafkaTemplates.get(producerKey).send(topic, key, value).get(WAIT_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES);
ProducerRecord record = new ProducerRecord(topic, null, null, key, value,
testContext.get(HEADERS_KEY));
testContext.remove(HEADERS_KEY);
kafkaTemplates.get(producerKey).send(record).get(WAIT_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES);
}

/**
Expand Down Expand Up @@ -250,10 +282,13 @@ public void processKafkaEvents(QueueOperation queueOperation, String consumerKey
String variableName)
{
List<ConsumerRecord<String, String>> events = queueOperation.performOn(getEventsBy(consumerKey));
LOGGER.atInfo().addArgument(() -> events.stream().map(ConsumerRecord::key)
.map(key -> key == null ? "<no key>" : key)
.collect(Collectors.joining(", ")))
.log("Saving events with the keys: {}");
LOGGER.atInfo().addArgument(() -> events.stream().map(e -> {
String key = e.key() == null ? "<no key>" : e.key();
String headerNames = Stream.of(e.headers().toArray()).map(Header::key)
.collect(Collectors.joining(", "));
String headersNamesForLog = headerNames.isEmpty() ? "<no headers>" : headerNames;
return "{" + key + "; " + headersNamesForLog + "}";
}).toList()).log("Saving events with the keys-headers: {}");
List<String> eventValues = events.stream().map(ConsumerRecord::value).toList();
variableContext.putVariable(scopes, variableName, eventValues);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,6 +43,7 @@
import com.github.valfirst.slf4jtest.TestLoggerFactoryExtension;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.jbehave.core.model.ExamplesTable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -162,14 +163,17 @@ void shouldProduceEventsToAndConsumeEventsFromKafka(BiConsumer<KafkaSteps, Varia
@Test
void shouldProduceEventWithKey() throws InterruptedException, ExecutionException, TimeoutException
{
String anyDataWithHeader = ANY_DATA + "-with-headers";
kafkaSteps.startKafkaListener(CONSUMER, Set.of(TOPIC));

kafkaSteps.sendEventWithKey(KEY + 1, ANY_DATA, PRODUCER, TOPIC);
kafkaSteps.sendEvent(ANY_DATA, PRODUCER, TOPIC);
kafkaSteps.sendEvent(ANY_DATA, PRODUCER, TOPIC);
kafkaSteps.sendEventWithKey(KEY + 2, ANY_DATA, PRODUCER, TOPIC);
kafkaSteps.sendEventWithKey(KEY + 3, ANY_DATA, PRODUCER, TOPIC);
kafkaSteps.sendEvent(ANY_DATA, PRODUCER, TOPIC);
var headers = new ExamplesTable("|name|value|\n|headerName1|headerValue1|\n|headerName2|headerValue2|");
kafkaSteps.addEventHeaders(headers);
kafkaSteps.sendEvent(anyDataWithHeader, PRODUCER, TOPIC);

kafkaSteps.waitForKafkaEvents(Duration.ofSeconds(10), CONSUMER, ComparisonRule.EQUAL_TO, 6);
kafkaSteps.stopKafkaListener(CONSUMER);
Expand All @@ -181,14 +185,14 @@ void shouldProduceEventWithKey() throws InterruptedException, ExecutionException
assertEquals(info(LISTENER_STOPPED), events.get(1));
LoggingEvent keysLogEvent = events.get(2);
assertEquals(Level.INFO, keysLogEvent.getLevel());
assertEquals("Saving events with the keys: {}", keysLogEvent.getMessage());
assertEquals("Saving events with the keys-headers: {}", keysLogEvent.getMessage());
List<Object> arguments = keysLogEvent.getArguments();
assertThat(arguments, hasSize(1));
List<String> keys = Stream.of(arguments.get(0).toString().split(",")).map(String::strip).sorted().toList();
String noKey = "<no key>";
assertEquals(List.of(noKey, noKey, noKey, KEY + 1, KEY + 2, KEY + 3), keys);
String expectedKeyHeaders = "[{key1; <no headers>}, {<no key>; <no headers>}, {<no key>; <no headers>}, "
+ "{key2; <no headers>}, {key3; <no headers>}, {<no key>; headerName1, headerName2}]";
assertEquals(expectedKeyHeaders, arguments.get(0).toString());

verify(variableContext).putVariable(SCOPES, VARIABLE_NAME,
List.of(ANY_DATA, ANY_DATA, ANY_DATA, ANY_DATA, ANY_DATA, ANY_DATA));
List.of(ANY_DATA, ANY_DATA, ANY_DATA, ANY_DATA, ANY_DATA, anyDataWithHeader));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,7 @@
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -42,6 +43,9 @@
import com.github.valfirst.slf4jtest.TestLoggerFactoryExtension;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.jbehave.core.model.ExamplesTable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -62,6 +66,7 @@ class KafkaStepsTests
private static final String KEY2 = "key2";

private static final Class<?> LISTENER_KEY = GenericMessageListenerContainer.class;
private static final Class<?> HEADERS_KEY = Header.class;

private static final String LISTENER_IS_STOPPED = "Kafka event listener is stopped";

Expand Down Expand Up @@ -158,4 +163,13 @@ void shouldStopStartedKafkaListenerIfNewKafkaListenerIsCreated()
info(listenerIsStarted))));
}
}

@Test
void testAddEventHeaders()
{
var headers = new ExamplesTable("|name|value|\n|headerName|headerValue|");
kafkaSteps.addEventHeaders(headers);
RecordHeader header = new RecordHeader("headerName", "headerValue".getBytes(StandardCharsets.UTF_8));
verify(testContext).put(HEADERS_KEY, List.of(header));
}
}
3 changes: 3 additions & 0 deletions vividus-tests/src/main/resources/story/system/Kafka.story
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ Meta:
@requirementId 2915
Given I initialize scenario variable `event-value` with value `{"event-from-system-vividus-test": "#{generate(regexify '[a-z]{8}')}"}`
When I start consuming events from `vividus` Kafka topics `${topic}`
When I set Kafka event headers:
|name |value |
|test_header|test_value|
When I execute steps:
|step |
|<eventPublisher>|
Expand Down

0 comments on commit 644454a

Please sign in to comment.