Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix binary events with no data and ce overrides not being delivered #3475

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.function.Function;

/**
* A CloudEventMutator mutates a given CloudEvent.
* A CloudEventMutator mutates a given CloudEvent
*/
@FunctionalInterface
public interface CloudEventMutator extends Function<CloudEvent, CloudEvent> {}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

/**
* {@link RecordDispatcherMutatorChain} chains {@link RecordDispatcher}s and applies mutations using a provided
* {@link CloudEventMutator} before passing the {@link KafkaConsumerRecord} to the next {@link RecordDispatcher}.
* {@link CloudEventMutator} before passing the {@link ConsumerRecord} to the next {@link RecordDispatcher}.
*/
public class RecordDispatcherMutatorChain implements RecordDispatcher {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer;

import io.cloudevents.CloudEvent;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NullCloudEventInterceptor implements ConsumerInterceptor<Object, CloudEvent> {

private static final CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer();

private static final Logger logger = LoggerFactory.getLogger(NullCloudEventInterceptor.class);

@Override
public ConsumerRecords<Object, CloudEvent> onConsume(ConsumerRecords<Object, CloudEvent> records) {
if (records.isEmpty()) {
return records;

Check warning on line 41 in data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/NullCloudEventInterceptor.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/NullCloudEventInterceptor.java#L41

Added line #L41 was not covered by tests
}
final Map<TopicPartition, List<ConsumerRecord<Object, CloudEvent>>> validRecords =
new HashMap<>(records.count());
for (final var record : records) {
final var tp = new TopicPartition(record.topic(), record.partition());
if (!validRecords.containsKey(tp)) {
validRecords.put(tp, new ArrayList<>());
}
validRecords.get(tp).add(maybeDeserializeRecord(record));
}

return new ConsumerRecords<>(validRecords);
}

@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
// Intentionally left blank
}

Check warning on line 59 in data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/NullCloudEventInterceptor.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/NullCloudEventInterceptor.java#L59

Added line #L59 was not covered by tests

@Override
public void close() {
// Intentionally left blank
}

Check warning on line 64 in data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/NullCloudEventInterceptor.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/NullCloudEventInterceptor.java#L64

Added line #L64 was not covered by tests

@Override
public void configure(Map<String, ?> map) {
logger.info("NullCloudEventInterceptor configured");
}

Check warning on line 69 in data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/NullCloudEventInterceptor.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/NullCloudEventInterceptor.java#L68-L69

Added lines #L68 - L69 were not covered by tests

private ConsumerRecord<Object, CloudEvent> maybeDeserializeRecord(ConsumerRecord<Object, CloudEvent> record) {
if (record.value() != null) {
return record;
}
// A valid CloudEvent in the CE binary protocol binding of Kafka
// might be composed by only Headers.
//
// KafkaConsumer doesn't call the deserializer if the value
// is null.
//
// That means that we get a record with a null value and some CE
// headers even though the record is a valid CloudEvent.
logger.debug("deserializing null record");
return KafkaConsumerRecordUtils.copyRecordAssigningValue(
record, cloudEventDeserializer.deserialize(record.topic(), record.headers(), null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEventInterceptor;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KeyDeserializer;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.NullCloudEventInterceptor;
import io.cloudevents.kafka.CloudEventSerializer;
import io.cloudevents.kafka.PartitionKeyExtensionInterceptor;
import io.opentelemetry.sdk.OpenTelemetrySdk;
Expand Down Expand Up @@ -81,7 +82,9 @@
Properties consumerConfig = Configurations.readPropertiesSync(env.getConsumerConfigFilePath());
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KeyDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, InvalidCloudEventInterceptor.class.getName());
consumerConfig.put(

Check warning on line 85 in data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java#L85

Added line #L85 was not covered by tests
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
NullCloudEventInterceptor.class.getName() + "," + InvalidCloudEventInterceptor.class.getName());

Check warning on line 87 in data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java

View check run for this annotation

Codecov / codecov/patch

data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java#L87

Added line #L87 was not covered by tests

// Read WebClient config
JsonObject webClientConfig = Configurations.readPropertiesAsJsonSync(env.getWebClientConfigFilePath());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.main;

import static org.assertj.core.api.Assertions.assertThat;

import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEvent;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEventInterceptor;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.NullCloudEventInterceptor;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.data.BytesCloudEventData;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;

public class InterceptorChainTest {
public class InterceptorChain {
InvalidCloudEventInterceptor invalidCloudEventInterceptor;
NullCloudEventInterceptor nullCloudEventInterceptor;

Map<String, String> configs;

public InterceptorChain() {
this.invalidCloudEventInterceptor = new InvalidCloudEventInterceptor();
this.nullCloudEventInterceptor = new NullCloudEventInterceptor();

this.configs = new HashMap<>();
withKindPlural(configs);
withSourceName(configs);
withSourceNamespace(configs);
withEnabled(configs);

this.invalidCloudEventInterceptor.configure(this.configs);
}

public Map<String, String> getConfigs() {
return this.configs;
}

public ConsumerRecords<Object, CloudEvent> onConsume(final ConsumerRecords<Object, CloudEvent> records) {
return this.invalidCloudEventInterceptor.onConsume(this.nullCloudEventInterceptor.onConsume(records));
}

private static void withKindPlural(final Map<String, String> configs) {
with(configs, InvalidCloudEventInterceptor.KIND_PLURAL_CONFIG, "kafkasources");
}

private static void withSourceName(final Map<String, String> configs) {
with(configs, InvalidCloudEventInterceptor.SOURCE_NAME_CONFIG, "ks");
}

private static void withSourceNamespace(final Map<String, String> configs) {
with(configs, InvalidCloudEventInterceptor.SOURCE_NAMESPACE_CONFIG, "knative-ns");
}

private static void withEnabled(final Map<String, String> configs) {
with(configs, CloudEventDeserializer.INVALID_CE_WRAPPER_ENABLED, "true");
}

private static <T> void with(final Map<String, T> configs, final String key, final T p) {
configs.put(key, p);
}
}

@Test
public void shouldTransformEventsToValidEventsHandlingNullValues() {
var interceptor = new InterceptorChain();

var input = mockInvalidRecords();

var configs = interceptor.getConfigs();

var got = interceptor.onConsume(input);

for (final var r : input) {
var tp = new TopicPartition(r.topic(), r.partition());
var inputRecords = input.records(tp);
var expected = new ArrayList<ConsumerRecord<Object, CloudEvent>>();
for (var i : inputRecords) {
var value = CloudEventBuilder.v1()
.withId(String.format("partition:%d/offset:%d", i.partition(), i.offset()))
.withTime(OffsetDateTime.ofInstant(Instant.ofEpochMilli(i.timestamp()), ZoneId.of("UTC")))
.withSource(URI.create(String.format(
"/apis/v1/namespaces/%s/%s/%s#%s",
configs.get(InvalidCloudEventInterceptor.SOURCE_NAMESPACE_CONFIG),
configs.get(InvalidCloudEventInterceptor.KIND_PLURAL_CONFIG),
configs.get(InvalidCloudEventInterceptor.SOURCE_NAME_CONFIG),
i.topic())))
.withSubject(String.format("partition:%d#%d", i.partition(), i.offset()))
.withType("dev.knative.kafka.event");

if (i.value() != null) {
value.withData(BytesCloudEventData.wrap(new byte[] {1}));
}

i.headers()
.forEach(h -> value.withExtension(
"kafkaheader" + h.key(), new String(h.value(), StandardCharsets.UTF_8)));

setKeys(value, i);

expected.add(new ConsumerRecord<>(
i.topic(),
i.partition(),
i.offset(),
i.timestamp(),
i.timestampType(),
i.serializedKeySize(),
i.serializedValueSize(),
i.key(),
value.build(),
i.headers(),
i.leaderEpoch()));
}

var pairs = zip(expected, got.records(tp));
for (var p : pairs) {
assertConsumerRecordEquals(p.getKey(), p.getValue());
}
}
}

@Test
public void shouldTransformNullEventsToValidEvents() {
var interceptor = new InterceptorChain();

var input = mockValidRecords();

var configs = interceptor.getConfigs();

var got = interceptor.onConsume(input);

for (final var r : input) {
var tp = new TopicPartition(r.topic(), r.partition());
var inputRecords = input.records(tp);
var expected = new ArrayList<ConsumerRecord<Object, CloudEvent>>();
for (var i : inputRecords) {
var value = i.value();

if (value == null) {
var builder = CloudEventBuilder.v1()
.withId(String.format("partition:%d/offset:%d", i.partition(), i.offset()))
.withTime(OffsetDateTime.ofInstant(Instant.ofEpochMilli(i.timestamp()), ZoneId.of("UTC")))
.withSource(URI.create(String.format(
"/apis/v1/namespaces/%s/%s/%s#%s",
configs.get(InvalidCloudEventInterceptor.SOURCE_NAMESPACE_CONFIG),
configs.get(InvalidCloudEventInterceptor.KIND_PLURAL_CONFIG),
configs.get(InvalidCloudEventInterceptor.SOURCE_NAME_CONFIG),
i.topic())))
.withSubject(String.format("partition:%d#%d", i.partition(), i.offset()))
.withType("dev.knative.kafka.event");

setKeys(builder, i);

value = builder.build();
}

expected.add(new ConsumerRecord<>(
i.topic(),
i.partition(),
i.offset(),
i.timestamp(),
i.timestampType(),
i.serializedKeySize(),
i.serializedValueSize(),
i.key(),
value,
i.headers(),
i.leaderEpoch()));
}

var pairs = zip(expected, got.records(tp));
for (var p : pairs) {
assertConsumerRecordEquals(p.getKey(), p.getValue());
}
}
}

private void assertConsumerRecordEquals(
final ConsumerRecord<Object, CloudEvent> actual, final ConsumerRecord<Object, CloudEvent> expected) {
assertThat(actual.topic()).isEqualTo(expected.topic());
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.offset()).isEqualTo(expected.offset());
assertThat(actual.key()).isEqualTo(expected.key());
assertThat(actual.value()).isEqualTo(expected.value());
assertThat(actual.serializedKeySize()).isEqualTo(expected.serializedKeySize());
assertThat(actual.timestamp()).isEqualTo(expected.timestamp());
assertThat(actual.timestampType()).isEqualTo(expected.timestampType());
assertThat(actual.headers()).isEqualTo(expected.headers());
}

private void setKeys(io.cloudevents.core.v1.CloudEventBuilder value, ConsumerRecord<Object, CloudEvent> i) {
if (i.key() instanceof Number) {
value.withExtension("partitionkey", (Number) i.key());
value.withExtension("key", (Number) i.key());
} else if (i.key() instanceof String) {
value.withExtension("partitionkey", i.key().toString());
value.withExtension("key", i.key().toString());
} else if (i.key() instanceof byte[]) {
value.withExtension("partitionkey", (byte[]) i.key());
value.withExtension("key", (byte[]) i.key());
} else if (i.key() != null) {
throw new IllegalArgumentException("unknown type for key: " + i.key());
}
}

public static <A, B> List<Map.Entry<A, B>> zip(List<A> as, List<B> bs) {
if (as.size() != bs.size()) {
throw new IllegalArgumentException("List must have the same length");
}
return IntStream.range(0, as.size())
.mapToObj(i -> Map.entry(as.get(i), bs.get(i)))
.collect(Collectors.toList());
}

private static ConsumerRecords<Object, CloudEvent> mockInvalidRecords() {
return new ConsumerRecords<>(Map.of(
new TopicPartition("t1", 0),
List.of(
new ConsumerRecord<>("t1", 0, 0, "a", new InvalidCloudEvent(new byte[] {1})),
new ConsumerRecord<>("t1", 0, 1, "a", null))));
}

private static ConsumerRecords<Object, CloudEvent> mockValidRecords() {
return new ConsumerRecords<>(Map.of(
new TopicPartition("t1", 0),
List.of(
new ConsumerRecord<>(
"t1",
0,
0,
"a",
CloudEventBuilder.v1()
.withId("1")
.withType("example.event.type")
.withSource(URI.create("localhost"))
.build()),
new ConsumerRecord<>("t1", 0, 1, "a", null))));
}
}
Loading
Loading