diff --git a/docs/modules/plugins/pages/plugin-kafka.adoc b/docs/modules/plugins/pages/plugin-kafka.adoc index 140de02d87..7b63571569 100644 --- a/docs/modules/plugins/pages/plugin-kafka.adoc +++ b/docs/modules/plugins/pages/plugin-kafka.adoc @@ -29,6 +29,28 @@ Where `` is the key of the producer configuration which should be === Steps +==== *Set headers to Kafka event* + +Sets headers to the event, which will be sent to Kafka topic. + +NOTE: The set headers are scoped to the first event, which will be sent, and not available afterwards. + +[source,gherkin] +---- +When I set Kafka event headers:$headers +---- + +* `headers` - The xref:ROOT:glossary.adoc#_examplestable[ExamplesTable] representing the list of the headers with columns `name` and `value`. + +.Set event header with name `aggregate-type` and value `tenant` +[source,gherkin] +---- +When I set Kafka event headers: +|name |value | +|aggregate-type|tenant| +When I send event with value `test-data` to `dev` Kafka topic `test-topic` +---- + ==== *Send event with value* Sends the event with the value to the provided topic with no key or partition. diff --git a/vividus-plugin-kafka/src/main/java/org/vividus/steps/kafka/KafkaSteps.java b/vividus-plugin-kafka/src/main/java/org/vividus/steps/kafka/KafkaSteps.java index 1ba0c925ca..51a4219525 100644 --- a/vividus-plugin-kafka/src/main/java/org/vividus/steps/kafka/KafkaSteps.java +++ b/vividus-plugin-kafka/src/main/java/org/vividus/steps/kafka/KafkaSteps.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2023 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,9 +20,11 @@ import static java.util.stream.Collectors.collectingAndThen; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toList; import static org.apache.commons.lang3.StringUtils.substringAfter; import static org.apache.commons.lang3.StringUtils.substringBefore; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; @@ -36,15 +38,20 @@ import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.hamcrest.Matcher; import org.jbehave.core.annotations.AfterStory; import org.jbehave.core.annotations.When; +import org.jbehave.core.model.ExamplesTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; @@ -67,6 +74,8 @@ public class KafkaSteps { private static final String DOT = "."; + private static final String NAME = "name"; + private static final String VALUE = "value"; private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSteps.class); @@ -74,6 +83,7 @@ public class KafkaSteps private static final Class LISTENER_KEY = GenericMessageListenerContainer.class; private static final Class EVENTS_KEY = ConsumerRecord.class; + private static final Class HEADERS_KEY = Header.class; private final Map> kafkaTemplates; private final Map> consumerFactories; @@ -114,6 +124,25 @@ private Map convert(String propertiesPrefix, IPropertyParser prop factoryCreator::apply)))); } + /** + * Set Kafka event headers + * @param headers ExamplesTable representing list of headers with columns "name" and "value" specifying kafka header + * names and values respectively + */ + @When("I set Kafka event headers:$headers") + public void setEventHeaders(ExamplesTable headers) + { + List
kafkaHeaders = headers.getRowsAsParameters(true).stream() + .map(row -> + { + String name = row.valueAs(NAME, String.class); + String value = row.valueAs(VALUE, String.class); + return new RecordHeader(name, value.getBytes(StandardCharsets.UTF_8)); + }) + .collect(toList()); + testContext.put(HEADERS_KEY, kafkaHeaders); + } + /** * Send the event with the specified value to the provided topic with no key or partition. * @param value The event value @@ -150,7 +179,9 @@ public void sendEvent(String value, String producerKey, String topic) public void sendEventWithKey(String key, String value, String producerKey, String topic) throws InterruptedException, ExecutionException, TimeoutException { - kafkaTemplates.get(producerKey).send(topic, key, value).get(WAIT_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES); + ProducerRecord record = new ProducerRecord(topic, 0, 0L, key, value, + testContext.remove(HEADERS_KEY)); + kafkaTemplates.get(producerKey).send(record).get(WAIT_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES); } /** @@ -250,10 +281,13 @@ public void processKafkaEvents(QueueOperation queueOperation, String consumerKey String variableName) { List> events = queueOperation.performOn(getEventsBy(consumerKey)); - LOGGER.atInfo().addArgument(() -> events.stream().map(ConsumerRecord::key) - .map(key -> key == null ? "" : key) - .collect(Collectors.joining(", "))) - .log("Saving events with the keys: {}"); + LOGGER.atInfo().addArgument(() -> events.stream().map(e -> { + String key = e.key() == null ? "" : e.key(); + String headerNames = StreamSupport.stream(e.headers().spliterator(), false) + .map(Header::key) + .collect(Collectors.joining(", ")); + return headerNames.isEmpty() ? key : "{" + key + "; " + headerNames + "}"; + }).toList()).log("Saving events with the keys and headers: {}"); List eventValues = events.stream().map(ConsumerRecord::value).toList(); variableContext.putVariable(scopes, variableName, eventValues); } diff --git a/vividus-plugin-kafka/src/test/java/org/vividus/steps/kafka/KafkaStepsIntegrationTests.java b/vividus-plugin-kafka/src/test/java/org/vividus/steps/kafka/KafkaStepsIntegrationTests.java index 9a50d0d8a4..6cdc233c3e 100644 --- a/vividus-plugin-kafka/src/test/java/org/vividus/steps/kafka/KafkaStepsIntegrationTests.java +++ b/vividus-plugin-kafka/src/test/java/org/vividus/steps/kafka/KafkaStepsIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2023 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,6 +43,7 @@ import com.github.valfirst.slf4jtest.TestLoggerFactoryExtension; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.jbehave.core.model.ExamplesTable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -162,6 +163,7 @@ void shouldProduceEventsToAndConsumeEventsFromKafka(BiConsumer arguments = keysLogEvent.getArguments(); assertThat(arguments, hasSize(1)); - List keys = Stream.of(arguments.get(0).toString().split(",")).map(String::strip).sorted().toList(); - String noKey = ""; - assertEquals(List.of(noKey, noKey, noKey, KEY + 1, KEY + 2, KEY + 3), keys); + assertEquals("[key1, , , key2, key3, {; headerName1, headerName2}]", + arguments.get(0).toString()); verify(variableContext).putVariable(SCOPES, VARIABLE_NAME, - List.of(ANY_DATA, ANY_DATA, ANY_DATA, ANY_DATA, ANY_DATA, ANY_DATA)); + List.of(ANY_DATA, ANY_DATA, ANY_DATA, ANY_DATA, ANY_DATA, anyDataWithHeader)); } } diff --git a/vividus-plugin-kafka/src/test/java/org/vividus/steps/kafka/KafkaStepsTests.java b/vividus-plugin-kafka/src/test/java/org/vividus/steps/kafka/KafkaStepsTests.java index 81f3d87826..69422fa428 100644 --- a/vividus-plugin-kafka/src/test/java/org/vividus/steps/kafka/KafkaStepsTests.java +++ b/vividus-plugin-kafka/src/test/java/org/vividus/steps/kafka/KafkaStepsTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2023 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.withSettings; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,6 +43,9 @@ import com.github.valfirst.slf4jtest.TestLoggerFactoryExtension; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.jbehave.core.model.ExamplesTable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -62,6 +66,7 @@ class KafkaStepsTests private static final String KEY2 = "key2"; private static final Class LISTENER_KEY = GenericMessageListenerContainer.class; + private static final Class HEADERS_KEY = Header.class; private static final String LISTENER_IS_STOPPED = "Kafka event listener is stopped"; @@ -158,4 +163,13 @@ void shouldStopStartedKafkaListenerIfNewKafkaListenerIsCreated() info(listenerIsStarted)))); } } + + @Test + void testAddEventHeaders() + { + var headers = new ExamplesTable("|name|value|\n|headerName|headerValue|"); + kafkaSteps.setEventHeaders(headers); + RecordHeader header = new RecordHeader("headerName", "headerValue".getBytes(StandardCharsets.UTF_8)); + verify(testContext).put(HEADERS_KEY, List.of(header)); + } } diff --git a/vividus-tests/src/main/resources/story/system/Kafka.story b/vividus-tests/src/main/resources/story/system/Kafka.story index da0e665dda..16be7b92ef 100644 --- a/vividus-tests/src/main/resources/story/system/Kafka.story +++ b/vividus-tests/src/main/resources/story/system/Kafka.story @@ -11,6 +11,9 @@ Meta: @requirementId 2915 Given I initialize scenario variable `event-value` with value `{"event-from-system-vividus-test": "#{generate(regexify '[a-z]{8}')}"}` When I start consuming events from `vividus` Kafka topics `${topic}` +When I set Kafka event headers: +|name |value | +|test_header|test_value| When I execute steps: |step | ||