Skip to content

Commit

Permalink
[release-1.12] Fix binary events with no data and ce overrides not be…
Browse files Browse the repository at this point in the history
…ing delivered (#3475) (#3595)

* Fix binary events with no data and ce overrides not being delivered (#3475)

* Added regression test for binary events with extensions

Signed-off-by: Calum Murray <cmurray@redhat.com>

* deserialize the event from the headers if the record value is null

Signed-off-by: Calum Murray <cmurray@redhat.com>

* update codegen

Signed-off-by: Calum Murray <cmurray@redhat.com>

* use an interceptor instead of handling it in the mutatorchain

Signed-off-by: Calum Murray <cmurray@redhat.com>

* added unit test for interceptor chain

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fixed order of interceptors

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix InterceptorChainTest build issues

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 committed Jan 22, 2024
1 parent d2f4aee commit 2e99add
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.function.Function;

/**
* A CloudEventMutator mutates a given CloudEvent.
* A CloudEventMutator mutates a given CloudEvent
*/
@FunctionalInterface
public interface CloudEventMutator extends Function<CloudEvent, CloudEvent> {}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

/**
* {@link RecordDispatcherMutatorChain} chains {@link RecordDispatcher}s and applies mutations using a provided
* {@link CloudEventMutator} before passing the {@link KafkaConsumerRecord} to the next {@link RecordDispatcher}.
* {@link CloudEventMutator} before passing the {@link ConsumerRecord} to the next {@link RecordDispatcher}.
*/
public class RecordDispatcherMutatorChain implements RecordDispatcher {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer;

import io.cloudevents.CloudEvent;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NullCloudEventInterceptor implements ConsumerInterceptor<Object, CloudEvent> {

private static final CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer();

private static final Logger logger = LoggerFactory.getLogger(NullCloudEventInterceptor.class);

@Override
public ConsumerRecords<Object, CloudEvent> onConsume(ConsumerRecords<Object, CloudEvent> records) {
if (records.isEmpty()) {
return records;
}
final Map<TopicPartition, List<ConsumerRecord<Object, CloudEvent>>> validRecords =
new HashMap<>(records.count());
for (final var record : records) {
final var tp = new TopicPartition(record.topic(), record.partition());
if (!validRecords.containsKey(tp)) {
validRecords.put(tp, new ArrayList<>());
}
validRecords.get(tp).add(maybeDeserializeRecord(record));
}

return new ConsumerRecords<>(validRecords);
}

@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
// Intentionally left blank
}

@Override
public void close() {
// Intentionally left blank
}

@Override
public void configure(Map<String, ?> map) {
logger.info("NullCloudEventInterceptor configured");
}

private ConsumerRecord<Object, CloudEvent> maybeDeserializeRecord(ConsumerRecord<Object, CloudEvent> record) {
if (record.value() != null) {
return record;
}
// A valid CloudEvent in the CE binary protocol binding of Kafka
// might be composed by only Headers.
//
// KafkaConsumer doesn't call the deserializer if the value
// is null.
//
// That means that we get a record with a null value and some CE
// headers even though the record is a valid CloudEvent.
logger.debug("deserializing null record");
return KafkaConsumerRecordUtils.copyRecordAssigningValue(
record, cloudEventDeserializer.deserialize(record.topic(), record.headers(), null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEventInterceptor;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KeyDeserializer;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.NullCloudEventInterceptor;
import io.cloudevents.kafka.CloudEventSerializer;
import io.cloudevents.kafka.PartitionKeyExtensionInterceptor;
import io.opentelemetry.sdk.OpenTelemetrySdk;
Expand Down Expand Up @@ -81,7 +82,9 @@ public static void start(
Properties consumerConfig = Configurations.readPropertiesSync(env.getConsumerConfigFilePath());
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KeyDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, InvalidCloudEventInterceptor.class.getName());
consumerConfig.put(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
NullCloudEventInterceptor.class.getName() + "," + InvalidCloudEventInterceptor.class.getName());

// Read WebClient config
JsonObject webClientConfig = Configurations.readPropertiesAsJsonSync(env.getWebClientConfigFilePath());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.main;

import static org.assertj.core.api.Assertions.assertThat;

import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEvent;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEventInterceptor;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.NullCloudEventInterceptor;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.data.BytesCloudEventData;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;

public class InterceptorChainTest {
public static class InterceptorChain {
InvalidCloudEventInterceptor invalidCloudEventInterceptor;
NullCloudEventInterceptor nullCloudEventInterceptor;

Map<String, String> configs;

public InterceptorChain() {
this.invalidCloudEventInterceptor = new InvalidCloudEventInterceptor();
this.nullCloudEventInterceptor = new NullCloudEventInterceptor();

this.configs = new HashMap<>();
withKindPlural(configs);
withSourceName(configs);
withSourceNamespace(configs);
withEnabled(configs);

this.invalidCloudEventInterceptor.configure(this.configs);
}

public Map<String, String> getConfigs() {
return this.configs;
}

public ConsumerRecords<Object, CloudEvent> onConsume(final ConsumerRecords<Object, CloudEvent> records) {
return this.invalidCloudEventInterceptor.onConsume(this.nullCloudEventInterceptor.onConsume(records));
}

private static void withKindPlural(final Map<String, String> configs) {
with(configs, InvalidCloudEventInterceptor.KIND_PLURAL_CONFIG, "kafkasources");
}

private static void withSourceName(final Map<String, String> configs) {
with(configs, InvalidCloudEventInterceptor.SOURCE_NAME_CONFIG, "ks");
}

private static void withSourceNamespace(final Map<String, String> configs) {
with(configs, InvalidCloudEventInterceptor.SOURCE_NAMESPACE_CONFIG, "knative-ns");
}

private static void withEnabled(final Map<String, String> configs) {
with(configs, CloudEventDeserializer.INVALID_CE_WRAPPER_ENABLED, "true");
}

private static <T> void with(final Map<String, T> configs, final String key, final T p) {
configs.put(key, p);
}
}

@Test
public void shouldTransformEventsToValidEventsHandlingNullValues() {
var interceptor = new InterceptorChain();

var input = mockInvalidRecords();

var configs = interceptor.getConfigs();

var got = interceptor.onConsume(input);

for (final var r : input) {
var tp = new TopicPartition(r.topic(), r.partition());
var inputRecords = input.records(tp);
var expected = new ArrayList<ConsumerRecord<Object, CloudEvent>>();
for (var i : inputRecords) {
var value = CloudEventBuilder.v1()
.withId(String.format("partition:%d/offset:%d", i.partition(), i.offset()))
.withTime(OffsetDateTime.ofInstant(Instant.ofEpochMilli(i.timestamp()), ZoneId.of("UTC")))
.withSource(URI.create(String.format(
"/apis/v1/namespaces/%s/%s/%s#%s",
configs.get(InvalidCloudEventInterceptor.SOURCE_NAMESPACE_CONFIG),
configs.get(InvalidCloudEventInterceptor.KIND_PLURAL_CONFIG),
configs.get(InvalidCloudEventInterceptor.SOURCE_NAME_CONFIG),
i.topic())))
.withSubject(String.format("partition:%d#%d", i.partition(), i.offset()))
.withType("dev.knative.kafka.event");

if (i.value() != null) {
value.withData(BytesCloudEventData.wrap(new byte[] {1}));
}

i.headers()
.forEach(h -> value.withExtension(
"kafkaheader" + h.key(), new String(h.value(), StandardCharsets.UTF_8)));

setKeys(value, i);

expected.add(new ConsumerRecord<>(
i.topic(),
i.partition(),
i.offset(),
i.timestamp(),
i.timestampType(),
i.serializedKeySize(),
i.serializedValueSize(),
i.key(),
value.build(),
i.headers(),
i.leaderEpoch()));
}

var pairs = zip(expected, got.records(tp));
for (var p : pairs) {
assertConsumerRecordEquals(p.getKey(), p.getValue());
}
}
}

@Test
public void shouldTransformNullEventsToValidEvents() {
var interceptor = new InterceptorChain();

var input = mockValidRecords();

var configs = interceptor.getConfigs();

var got = interceptor.onConsume(input);

for (final var r : input) {
var tp = new TopicPartition(r.topic(), r.partition());
var inputRecords = input.records(tp);
var expected = new ArrayList<ConsumerRecord<Object, CloudEvent>>();
for (var i : inputRecords) {
var value = i.value();

if (value == null) {
var builder = CloudEventBuilder.v1()
.withId(String.format("partition:%d/offset:%d", i.partition(), i.offset()))
.withTime(OffsetDateTime.ofInstant(Instant.ofEpochMilli(i.timestamp()), ZoneId.of("UTC")))
.withSource(URI.create(String.format(
"/apis/v1/namespaces/%s/%s/%s#%s",
configs.get(InvalidCloudEventInterceptor.SOURCE_NAMESPACE_CONFIG),
configs.get(InvalidCloudEventInterceptor.KIND_PLURAL_CONFIG),
configs.get(InvalidCloudEventInterceptor.SOURCE_NAME_CONFIG),
i.topic())))
.withSubject(String.format("partition:%d#%d", i.partition(), i.offset()))
.withType("dev.knative.kafka.event");

setKeys(builder, i);

value = builder.build();
}

expected.add(new ConsumerRecord<>(
i.topic(),
i.partition(),
i.offset(),
i.timestamp(),
i.timestampType(),
i.serializedKeySize(),
i.serializedValueSize(),
i.key(),
value,
i.headers(),
i.leaderEpoch()));
}

var pairs = zip(expected, got.records(tp));
for (var p : pairs) {
assertConsumerRecordEquals(p.getKey(), p.getValue());
}
}
}

private void assertConsumerRecordEquals(
final ConsumerRecord<Object, CloudEvent> actual, final ConsumerRecord<Object, CloudEvent> expected) {
assertThat(actual.topic()).isEqualTo(expected.topic());
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.offset()).isEqualTo(expected.offset());
assertThat(actual.key()).isEqualTo(expected.key());
assertThat(actual.value()).isEqualTo(expected.value());
assertThat(actual.serializedKeySize()).isEqualTo(expected.serializedKeySize());
assertThat(actual.timestamp()).isEqualTo(expected.timestamp());
assertThat(actual.timestampType()).isEqualTo(expected.timestampType());
assertThat(actual.headers()).isEqualTo(expected.headers());
}

private void setKeys(io.cloudevents.core.v1.CloudEventBuilder value, ConsumerRecord<Object, CloudEvent> i) {
if (i.key() instanceof Number) {
value.withExtension("partitionkey", (Number) i.key());
value.withExtension("key", (Number) i.key());
} else if (i.key() instanceof String) {
value.withExtension("partitionkey", i.key().toString());
value.withExtension("key", i.key().toString());
} else if (i.key() instanceof byte[]) {
value.withExtension("partitionkey", (byte[]) i.key());
value.withExtension("key", (byte[]) i.key());
} else if (i.key() != null) {
throw new IllegalArgumentException("unknown type for key: " + i.key());
}
}

public static <A, B> List<Map.Entry<A, B>> zip(List<A> as, List<B> bs) {
if (as.size() != bs.size()) {
throw new IllegalArgumentException("List must have the same length");
}
return IntStream.range(0, as.size())
.mapToObj(i -> Map.entry(as.get(i), bs.get(i)))
.collect(Collectors.toList());
}

private static ConsumerRecords<Object, CloudEvent> mockInvalidRecords() {
return new ConsumerRecords<>(Map.of(
new TopicPartition("t1", 0),
List.of(
new ConsumerRecord<>("t1", 0, 0, "a", new InvalidCloudEvent(new byte[] {1})),
new ConsumerRecord<>("t1", 0, 1, "a", null))));
}

private static ConsumerRecords<Object, CloudEvent> mockValidRecords() {
return new ConsumerRecords<>(Map.of(
new TopicPartition("t1", 0),
List.of(
new ConsumerRecord<>(
"t1",
0,
0,
"a",
CloudEventBuilder.v1()
.withId("1")
.withType("example.event.type")
.withSource(URI.create("localhost"))
.build()),
new ConsumerRecord<>("t1", 0, 1, "a", null))));
}
}
Loading

0 comments on commit 2e99add

Please sign in to comment.