Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix binary events with no data and ce overrides not being delivered #3475

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

import io.cloudevents.CloudEvent;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* A CloudEventMutator mutates a given CloudEvent.
* A CloudEventMutator mutates the CloudEvent in a given ConsumerRecord, returning a new CloudEvent.
*/
@FunctionalInterface
public interface CloudEventMutator extends Function<CloudEvent, CloudEvent> {}
public interface CloudEventMutator extends Function<ConsumerRecord<Object, CloudEvent>, CloudEvent> {}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

/**
* {@link RecordDispatcherMutatorChain} chains {@link RecordDispatcher}s and applies mutations using a provided
* {@link CloudEventMutator} before passing the {@link KafkaConsumerRecord} to the next {@link RecordDispatcher}.
* {@link CloudEventMutator} before passing the {@link ConsumerRecord} to the next {@link RecordDispatcher}.
*/
public class RecordDispatcherMutatorChain implements RecordDispatcher {

Expand All @@ -46,7 +46,7 @@ public Future<Void> dispatch(ConsumerRecord<Object, CloudEvent> record) {
record.serializedKeySize(),
record.serializedValueSize(),
record.key(),
cloudEventMutator.apply(record.value()),
cloudEventMutator.apply(record),
record.headers(),
record.leaderEpoch());
return next.dispatch(newRecord);
Expand Down
Copy link
Member

@pierDipi pierDipi Nov 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a few unit test as well?

Copy link
Member

@pierDipi pierDipi Nov 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good scenario to test is a chain RecordDispatcherMutatorChain similar to the one we use in reality

        final var recordDispatcher = new RecordDispatcherMutatorChain(
                new RecordDispatcherImpl(
                        consumerVerticleContext,
                        getFilter(),
                        egressSubscriberSender,
                        egressDeadLetterSender,
                        responseHandler,
                        offsetManager,
                        ConsumerTracer.create(
                                ((VertxInternal) vertx).tracer(),
                                consumerVerticleContext.getConsumerConfigs(),
                                TracingPolicy.PROPAGATE),
                        Metrics.getRegistry()),
                new CloudEventOverridesMutator(
                        consumerVerticleContext.getResource().getCloudEventOverrides()));

passing an InvalidCloudEvent object

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import dev.knative.eventing.kafka.broker.dispatcher.CloudEventMutator;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* CloudEventOverridesMutator is a {@link CloudEventMutator} that applies a given set of
Expand All @@ -28,12 +29,15 @@ public class CloudEventOverridesMutator implements CloudEventMutator {

private final DataPlaneContract.CloudEventOverrides cloudEventOverrides;

private static final CloudEventDeserializer cloudEventDeserializer = new CloudEventDeserializer();

public CloudEventOverridesMutator(final DataPlaneContract.CloudEventOverrides cloudEventOverrides) {
this.cloudEventOverrides = cloudEventOverrides;
}

@Override
public CloudEvent apply(CloudEvent cloudEvent) {
public CloudEvent apply(ConsumerRecord<Object, CloudEvent> record) {
final var cloudEvent = maybeDeserializeFromHeaders(record);
if (cloudEventOverrides.getExtensionsMap().isEmpty()) {
return cloudEvent;
}
Expand All @@ -42,6 +46,21 @@ public CloudEvent apply(CloudEvent cloudEvent) {
return builder.build();
}

private CloudEvent maybeDeserializeFromHeaders(ConsumerRecord<Object, CloudEvent> record) {
if (record.value() != null) {
return record.value();
}
// A valid CloudEvent in the CE binary protocol binding of Kafka
// might be composed by only Headers.
//
// KafkaConsumer doesn't call the deserializer if the value
// is null.
//
// That means that we get a record with a null value and some CE
// headers even though the record is a valid CloudEvent.
return cloudEventDeserializer.deserialize(record.topic(), record.headers(), null);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is duplicated from RecordDispatcherImpl, can we extract a common method, usable by both CloudEventOverridesMutator and RecordDispatcherImpl


private void applyCloudEventOverrides(CloudEventBuilder builder) {
cloudEventOverrides.getExtensionsMap().forEach(builder::withExtension);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.time.OffsetDateTime;
import java.util.Map;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;

public class CloudEventOverridesMutatorTest {
Expand All @@ -48,7 +49,7 @@ public void shouldAddExtensions() {
final var expected = CloudEventBuilder.from(given);
extensions.forEach(expected::withExtension);

final var got = mutator.apply(given);
final var got = mutator.apply(new ConsumerRecord<>("test-topic", 1, 1, "key", given));

assertThat(got).isEqualTo(expected.build());
}
Expand All @@ -68,7 +69,7 @@ public void shouldNotMutateRecordWhenNoOverrides() {
.withType("foo")
.build();

final var got = mutator.apply(given);
final var got = mutator.apply(new ConsumerRecord<>("test-topic", 1, 1, "key", given));

assertThat(got).isSameAs(given);
}
Expand Down
15 changes: 15 additions & 0 deletions test/e2e_new/kafka_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,21 @@ func TestKafkaSourceBinaryEvent(t *testing.T) {
env.Test(ctx, t, features.KafkaSourceBinaryEvent())
}

func TestKafkaSourceBinaryEventWithExtensions(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.WithPollTimings(PollInterval, PollTimeout),
environment.Managed(t),
)

env.Test(ctx, t, features.KafkaSourceBinaryEventWithExtensions())
}

func TestKafkaSourceStructuredEvent(t *testing.T) {
t.Parallel()

Expand Down
35 changes: 35 additions & 0 deletions test/rekt/features/kafka_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,41 @@ func KafkaSourceBinaryEvent() *feature.Feature {
)
}

func KafkaSourceBinaryEventWithExtensions() *feature.Feature {
senderOptions := []eventshub.EventsHubOption{
eventshub.InputHeader("ce-specversion", cloudevents.VersionV1),
eventshub.InputHeader("ce-type", "com.github.pull.create"),
eventshub.InputHeader("ce-source", "github.com/cloudevents/spec/pull"),
eventshub.InputHeader("ce-subject", "123"),
eventshub.InputHeader("ce-id", "A234-1234-1234"),
eventshub.InputHeader("content-type", "application/json"),
}
matcher := AllOf(
HasSpecVersion(cloudevents.VersionV1),
HasType("com.github.pull.create"),
HasSource("github.com/cloudevents/spec/pull"),
HasSubject("123"),
HasId("A234-1234-1234"),
HasDataContentType("application/json"),
HasExtension("comexampleextension1", "value"),
HasExtension("comexampleothervalue", "5"),
)

return kafkaSourceFeature("KafkaSourceBinaryEvent",
kafkaSourceConfig{
authMech: PlainMech,
opts: []manifest.CfgFn{
kafkasource.WithExtensions(map[string]string{
"comexampleextension1": "value",
"comexampleothervalue": "5",
})},
},
kafkaSinkConfig{},
senderOptions,
matcher,
)
}

func KafkaSourceStructuredEvent() *feature.Feature {
eventTime, _ := cetypes.ParseTime("2018-04-05T17:31:00Z")
senderOptions := []eventshub.EventsHubOption{
Expand Down