Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[plugin-kafka] Add step for setting headers to event #5122

Merged
merged 1 commit into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/modules/plugins/pages/plugin-kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,28 @@ Where `<producer-key>` is the key of the producer configuration which should be

=== Steps

==== *Set headers to Kafka event*

Sets headers to the event, which will be sent to Kafka topic.

NOTE: The set headers are scoped to the first event, which will be sent, and not available afterwards.

[source,gherkin]
----
When I set Kafka event headers:$headers
----

* `headers` - The xref:ROOT:glossary.adoc#_examplestable[ExamplesTable] representing the list of the headers with columns `name` and `value`.

.Set event header with name `aggregate-type` and value `tenant`
[source,gherkin]
----
When I set Kafka event headers:
|name |value |
|aggregate-type|tenant|
When I send event with value `test-data` to `dev` Kafka topic `test-topic`
----

==== *Send event with value*

Sends the event with the value to the provided topic with no key or partition.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,9 +20,11 @@
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static org.apache.commons.lang3.StringUtils.substringAfter;
import static org.apache.commons.lang3.StringUtils.substringBefore;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -36,15 +38,20 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.hamcrest.Matcher;
import org.jbehave.core.annotations.AfterStory;
import org.jbehave.core.annotations.When;
import org.jbehave.core.model.ExamplesTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
Expand All @@ -67,13 +74,16 @@
public class KafkaSteps
{
private static final String DOT = ".";
private static final String NAME = "name";
private static final String VALUE = "value";

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSteps.class);

private static final int WAIT_TIMEOUT_IN_MINUTES = 10;

private static final Class<?> LISTENER_KEY = GenericMessageListenerContainer.class;
private static final Class<?> EVENTS_KEY = ConsumerRecord.class;
private static final Class<?> HEADERS_KEY = Header.class;

private final Map<String, KafkaTemplate<String, String>> kafkaTemplates;
private final Map<String, DefaultKafkaConsumerFactory<Object, Object>> consumerFactories;
Expand Down Expand Up @@ -114,6 +124,25 @@ private <T> Map<String, T> convert(String propertiesPrefix, IPropertyParser prop
factoryCreator::apply))));
}

/**
* Set Kafka event headers
* @param headers ExamplesTable representing list of headers with columns "name" and "value" specifying kafka header
* names and values respectively
*/
@When("I set Kafka event headers:$headers")
public void setEventHeaders(ExamplesTable headers)
{
List<Header> kafkaHeaders = headers.getRowsAsParameters(true).stream()
.map(row ->
{
String name = row.valueAs(NAME, String.class);
String value = row.valueAs(VALUE, String.class);
return new RecordHeader(name, value.getBytes(StandardCharsets.UTF_8));
})
.collect(toList());
testContext.put(HEADERS_KEY, kafkaHeaders);
}

/**
* Send the event with the specified value to the provided topic with no key or partition.
* @param value The event value
Expand Down Expand Up @@ -150,7 +179,9 @@ public void sendEvent(String value, String producerKey, String topic)
public void sendEventWithKey(String key, String value, String producerKey, String topic)
throws InterruptedException, ExecutionException, TimeoutException
{
kafkaTemplates.get(producerKey).send(topic, key, value).get(WAIT_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES);
ProducerRecord record = new ProducerRecord(topic, 0, 0L, key, value,
testContext.remove(HEADERS_KEY));
kafkaTemplates.get(producerKey).send(record).get(WAIT_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES);
}

/**
Expand Down Expand Up @@ -250,10 +281,13 @@ public void processKafkaEvents(QueueOperation queueOperation, String consumerKey
String variableName)
{
List<ConsumerRecord<String, String>> events = queueOperation.performOn(getEventsBy(consumerKey));
LOGGER.atInfo().addArgument(() -> events.stream().map(ConsumerRecord::key)
.map(key -> key == null ? "<no key>" : key)
.collect(Collectors.joining(", ")))
.log("Saving events with the keys: {}");
LOGGER.atInfo().addArgument(() -> events.stream().map(e -> {
String key = e.key() == null ? "<no key>" : e.key();
String headerNames = StreamSupport.stream(e.headers().spliterator(), false)
.map(Header::key)
.collect(Collectors.joining(", "));
return headerNames.isEmpty() ? key : "{" + key + "; " + headerNames + "}";
}).toList()).log("Saving events with the keys and headers: {}");
List<String> eventValues = events.stream().map(ConsumerRecord::value).toList();
variableContext.putVariable(scopes, variableName, eventValues);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,6 +43,7 @@
import com.github.valfirst.slf4jtest.TestLoggerFactoryExtension;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.jbehave.core.model.ExamplesTable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -162,14 +163,17 @@ void shouldProduceEventsToAndConsumeEventsFromKafka(BiConsumer<KafkaSteps, Varia
@Test
void shouldProduceEventWithKey() throws InterruptedException, ExecutionException, TimeoutException
{
String anyDataWithHeader = ANY_DATA + "-with-headers";
kafkaSteps.startKafkaListener(CONSUMER, Set.of(TOPIC));

kafkaSteps.sendEventWithKey(KEY + 1, ANY_DATA, PRODUCER, TOPIC);
kafkaSteps.sendEvent(ANY_DATA, PRODUCER, TOPIC);
kafkaSteps.sendEvent(ANY_DATA, PRODUCER, TOPIC);
kafkaSteps.sendEventWithKey(KEY + 2, ANY_DATA, PRODUCER, TOPIC);
kafkaSteps.sendEventWithKey(KEY + 3, ANY_DATA, PRODUCER, TOPIC);
kafkaSteps.sendEvent(ANY_DATA, PRODUCER, TOPIC);
var headers = new ExamplesTable("|name|value|\n|headerName1|headerValue1|\n|headerName2|headerValue2|");
kafkaSteps.setEventHeaders(headers);
kafkaSteps.sendEvent(anyDataWithHeader, PRODUCER, TOPIC);

kafkaSteps.waitForKafkaEvents(Duration.ofSeconds(10), CONSUMER, ComparisonRule.EQUAL_TO, 6);
kafkaSteps.stopKafkaListener(CONSUMER);
Expand All @@ -181,14 +185,13 @@ void shouldProduceEventWithKey() throws InterruptedException, ExecutionException
assertEquals(info(LISTENER_STOPPED), events.get(1));
LoggingEvent keysLogEvent = events.get(2);
assertEquals(Level.INFO, keysLogEvent.getLevel());
assertEquals("Saving events with the keys: {}", keysLogEvent.getMessage());
assertEquals("Saving events with the keys and headers: {}", keysLogEvent.getMessage());
List<Object> arguments = keysLogEvent.getArguments();
assertThat(arguments, hasSize(1));
List<String> keys = Stream.of(arguments.get(0).toString().split(",")).map(String::strip).sorted().toList();
String noKey = "<no key>";
assertEquals(List.of(noKey, noKey, noKey, KEY + 1, KEY + 2, KEY + 3), keys);
assertEquals("[key1, <no key>, <no key>, key2, key3, {<no key>; headerName1, headerName2}]",
arguments.get(0).toString());

verify(variableContext).putVariable(SCOPES, VARIABLE_NAME,
List.of(ANY_DATA, ANY_DATA, ANY_DATA, ANY_DATA, ANY_DATA, ANY_DATA));
List.of(ANY_DATA, ANY_DATA, ANY_DATA, ANY_DATA, ANY_DATA, anyDataWithHeader));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,7 @@
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -42,6 +43,9 @@
import com.github.valfirst.slf4jtest.TestLoggerFactoryExtension;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.jbehave.core.model.ExamplesTable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -62,6 +66,7 @@ class KafkaStepsTests
private static final String KEY2 = "key2";

private static final Class<?> LISTENER_KEY = GenericMessageListenerContainer.class;
private static final Class<?> HEADERS_KEY = Header.class;

private static final String LISTENER_IS_STOPPED = "Kafka event listener is stopped";

Expand Down Expand Up @@ -158,4 +163,13 @@ void shouldStopStartedKafkaListenerIfNewKafkaListenerIsCreated()
info(listenerIsStarted))));
}
}

@Test
void testAddEventHeaders()
{
var headers = new ExamplesTable("|name|value|\n|headerName|headerValue|");
kafkaSteps.setEventHeaders(headers);
RecordHeader header = new RecordHeader("headerName", "headerValue".getBytes(StandardCharsets.UTF_8));
verify(testContext).put(HEADERS_KEY, List.of(header));
}
}
3 changes: 3 additions & 0 deletions vividus-tests/src/main/resources/story/system/Kafka.story
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ Meta:
@requirementId 2915
Given I initialize scenario variable `event-value` with value `{"event-from-system-vividus-test": "#{generate(regexify '[a-z]{8}')}"}`
When I start consuming events from `vividus` Kafka topics `${topic}`
When I set Kafka event headers:
|name |value |
|test_header|test_value|
abudevich marked this conversation as resolved.
Show resolved Hide resolved
When I execute steps:
|step |
|<eventPublisher>|
Expand Down
Loading