Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add spotless support #3171

Merged
merged 14 commits into from
Jul 11, 2023
50 changes: 27 additions & 23 deletions control-plane/pkg/contract/contract.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions data-plane/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
The data-plane uses [Vertx](https://vertx.io/) and is composed of two components:

- [**Receiver**](#receiver), it's responsible for accepting incoming events and send them to the appropriate Kafka
topic. It acts as Kafka producers and broker ingress.
topic. It acts as Kafka producers and broker ingress.
- [**Dispatcher**](#dispatcher), it's responsible for consuming events and send them to Triggers' subscribers. It acts
as Kafka consumer.
as Kafka consumer.

## Receiver

Expand Down Expand Up @@ -39,5 +39,5 @@ update another one will be created. This allows to not block or use locks.
- `core` directory contains the core module, in particular, it contains classes for representing Eventing objects
- `dispatcher` directory contains the [_Dispatcher_](#dispatcher) application.
- `contract` directory contains a module in which the protobuf compiler (`protoc`) generates code. Git ignores the
generated code.
generated code.
- `receiver` directory contains the [_Receiver_](#receiver) application.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer;

import dev.knative.eventing.kafka.broker.dispatcher.ReactiveKafkaConsumer;
import io.cloudevents.CloudEvent;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -32,156 +36,140 @@
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;

import dev.knative.eventing.kafka.broker.dispatcher.ReactiveKafkaConsumer;
public class UnorderedOffsetManagerBenchmark {

import java.time.Duration;
import java.util.Collection;
import java.util.Map;
@State(Scope.Thread)
public static class RecordsState {

public class UnorderedOffsetManagerBenchmark {
private ConsumerRecord<String, CloudEvent>[][] records;

@State(Scope.Thread)
public static class RecordsState {

private ConsumerRecord<String, CloudEvent>[][] records;

@Setup(Level.Trial)
@SuppressWarnings("unchecked")
public void doSetup() {
this.records = new ConsumerRecord[100][10_000];
for (int p = 0; p < 100; p++) {
for (int o = 0; o < 10_000; o++) {
this.records[p][o] = new ConsumerRecord<>(
"abc",
p,
o,
null,
null
);
}
}
@Setup(Level.Trial)
@SuppressWarnings("unchecked")
public void doSetup() {
this.records = new ConsumerRecord[100][10_000];
for (int p = 0; p < 100; p++) {
for (int o = 0; o < 10_000; o++) {
this.records[p][o] = new ConsumerRecord<>("abc", p, o, null, null);
}
}
}
}

}
@Benchmark
public void benchmarkReverseOrder(RecordsState recordsState, Blackhole blackhole) {
final OffsetManager offsetManager = new OffsetManager(Vertx.vertx(), new MockKafkaConsumer(), null, 10000L);

@Benchmark
public void benchmarkReverseOrder(RecordsState recordsState, Blackhole blackhole) {
final OffsetManager offsetManager = new OffsetManager(Vertx.vertx(), new MockKafkaConsumer(), null, 10000L);
int partitions = 100;
for (int partition = 0; partition < partitions; partition++) {
offsetManager.recordReceived(recordsState.records[partition][0]);
}

int partitions = 100;
for (int partition = 0; partition < partitions; partition++) {
offsetManager.recordReceived(recordsState.records[partition][0]);
}
for (int offset = 9_999; offset > 0; offset--) {
for (int partition = 0; partition < partitions; partition++) {
offsetManager.recordReceived(recordsState.records[partition][offset]);
offsetManager.successfullySentToSubscriber(recordsState.records[partition][offset]);
}
}

for (int offset = 9_999; offset > 0; offset--) {
for (int partition = 0; partition < partitions; partition++) {
offsetManager.recordReceived(recordsState.records[partition][offset]);
offsetManager.successfullySentToSubscriber(recordsState.records[partition][offset]);
}
for (int partition = 0; partition < partitions; partition++) {
offsetManager.successfullySentToSubscriber(recordsState.records[partition][0]);
}
}

for (int partition = 0; partition < partitions; partition++) {
offsetManager.successfullySentToSubscriber(recordsState.records[partition][0]);
}
}

@Benchmark
public void benchmarkOrdered(RecordsState recordsState, Blackhole blackhole) {
OffsetManager offsetManager = new OffsetManager(Vertx.vertx(), new MockKafkaConsumer(), null, 10000L);
int partitions = 100;

for (int offset = 0; offset < 10_000; offset++) {
for (int partition = 0; partition < partitions; partition++) {
offsetManager.recordReceived(recordsState.records[partition][offset]);
offsetManager.successfullySentToSubscriber(recordsState.records[partition][offset]);
}
@Benchmark
public void benchmarkOrdered(RecordsState recordsState, Blackhole blackhole) {
OffsetManager offsetManager = new OffsetManager(Vertx.vertx(), new MockKafkaConsumer(), null, 10000L);
int partitions = 100;

for (int offset = 0; offset < 10_000; offset++) {
for (int partition = 0; partition < partitions; partition++) {
offsetManager.recordReceived(recordsState.records[partition][offset]);
offsetManager.successfullySentToSubscriber(recordsState.records[partition][offset]);
}
}
}
}

@Benchmark
public void benchmarkRealisticCase(RecordsState recordsState, Blackhole blackhole) {
OffsetManager offsetManager = new OffsetManager(Vertx.vertx(), new MockKafkaConsumer(), null, 10000L);
int partitions = 10;
@Benchmark
public void benchmarkRealisticCase(RecordsState recordsState, Blackhole blackhole) {
OffsetManager offsetManager = new OffsetManager(Vertx.vertx(), new MockKafkaConsumer(), null, 10000L);
int partitions = 10;

for (int partition = 0; partition < partitions; partition++) {
offsetManager.recordReceived(recordsState.records[partition][0]);
}
for (int partition = 0; partition < partitions; partition++) {
offsetManager.recordReceived(recordsState.records[partition][0]);
}

for (int partition = 0; partition < partitions; partition++) {
for (int offset : new int[]{5, 2, 0, 7, 1, 3, 4, 6}) {
offsetManager.successfullySentToSubscriber(recordsState.records[partition][offset]);
}
for (int partition = 0; partition < partitions; partition++) {
for (int offset : new int[] {5, 2, 0, 7, 1, 3, 4, 6}) {
offsetManager.successfullySentToSubscriber(recordsState.records[partition][offset]);
}
}
}
}

@Benchmark
public void benchmarkMixedABit(RecordsState recordsState, Blackhole blackhole) {
OffsetManager offsetManager = new OffsetManager(Vertx.vertx(), new MockKafkaConsumer(), null, 10000L);
int partitions = 4;
@Benchmark
public void benchmarkMixedABit(RecordsState recordsState, Blackhole blackhole) {
OffsetManager offsetManager = new OffsetManager(Vertx.vertx(), new MockKafkaConsumer(), null, 10000L);
int partitions = 4;

for (int partition = 0; partition < partitions; partition++) {
offsetManager.recordReceived(recordsState.records[partition][0]);
}
for (int partition = 0; partition < partitions; partition++) {
offsetManager.recordReceived(recordsState.records[partition][0]);
}

for (int i = 0; i < 120; i++) {
// This will commit in the following order:
// 1 0 3 2 5 4 ...
offsetManager.successfullySentToSubscriber(recordsState.records[2][i % 2 == 0 ? i + 1 : i - 1]);
offsetManager.successfullySentToSubscriber(recordsState.records[1][i % 2 == 0 ? i + 1 : i - 1]);
offsetManager.successfullySentToSubscriber(recordsState.records[0][i % 2 == 0 ? i + 1 : i - 1]);
offsetManager.successfullySentToSubscriber(recordsState.records[3][i % 2 == 0 ? i + 1 : i - 1]);
for (int i = 0; i < 120; i++) {
// This will commit in the following order:
// 1 0 3 2 5 4 ...
offsetManager.successfullySentToSubscriber(recordsState.records[2][i % 2 == 0 ? i + 1 : i - 1]);
offsetManager.successfullySentToSubscriber(recordsState.records[1][i % 2 == 0 ? i + 1 : i - 1]);
offsetManager.successfullySentToSubscriber(recordsState.records[0][i % 2 == 0 ? i + 1 : i - 1]);
offsetManager.successfullySentToSubscriber(recordsState.records[3][i % 2 == 0 ? i + 1 : i - 1]);
}
}
}


static class MockKafkaConsumer implements ReactiveKafkaConsumer<String, CloudEvent> {
static class MockKafkaConsumer implements ReactiveKafkaConsumer<String, CloudEvent> {

@Override
public Future<Map<TopicPartition, OffsetAndMetadata>> commit(
Map<TopicPartition, OffsetAndMetadata> offset) {
@Override
public Future<Map<TopicPartition, OffsetAndMetadata>> commit(Map<TopicPartition, OffsetAndMetadata> offset) {

return null;
}
return null;
}

@Override
public Future<Void> close() {
return null;
}
@Override
public Future<Void> close() {
return null;
}

@Override
public Future<Void> pause(Collection<TopicPartition> partitions) {
return null;
}
@Override
public Future<Void> pause(Collection<TopicPartition> partitions) {
return null;
}

@Override
public Future<ConsumerRecords<String, CloudEvent>> poll(Duration timeout) {
return null;
}
@Override
public Future<ConsumerRecords<String, CloudEvent>> poll(Duration timeout) {
return null;
}

@Override
public Future<Void> resume(Collection<TopicPartition> partitions) {
return null;
}
@Override
public Future<Void> resume(Collection<TopicPartition> partitions) {
return null;
}

@Override
public Future<Void> subscribe(Collection<String> topics) {
return null;
}
@Override
public Future<Void> subscribe(Collection<String> topics) {
return null;
}

@Override
public Future<Void> subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
return null;
}
@Override
public Future<Void> subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
return null;
}

@Override
public Consumer<String, CloudEvent> unwrap() {
return null;
}
@Override
public Consumer<String, CloudEvent> unwrap() {
return null;
}

@Override
public ReactiveKafkaConsumer<String, CloudEvent> exceptionHandler(Handler<Throwable> handler) {
return null;
@Override
public ReactiveKafkaConsumer<String, CloudEvent> exceptionHandler(Handler<Throwable> handler) {
return null;
}
}

}
}
Loading