From 6e02c886e7825ddc0db4032f46e88722dea25eb2 Mon Sep 17 00:00:00 2001 From: Sergei Egorov Date: Thu, 27 Jun 2019 21:28:06 +0200 Subject: [PATCH] support `null` keys --- .../service/ReactorLiiklusServiceImpl.java | 32 +++++++++------- .../com/github/bsideup/liiklus/SmokeTest.java | 37 +++++++++++++++++++ .../inmemory/InMemoryRecordsStorage.java | 7 +++- .../liiklus/records/tests/SubscribeTest.java | 22 +++++++++++ 4 files changed, 82 insertions(+), 16 deletions(-) diff --git a/app/src/main/java/com/github/bsideup/liiklus/service/ReactorLiiklusServiceImpl.java b/app/src/main/java/com/github/bsideup/liiklus/service/ReactorLiiklusServiceImpl.java index eb411b4f..e9e4e6d6 100644 --- a/app/src/main/java/com/github/bsideup/liiklus/service/ReactorLiiklusServiceImpl.java +++ b/app/src/main/java/com/github/bsideup/liiklus/service/ReactorLiiklusServiceImpl.java @@ -214,20 +214,24 @@ public Flux receive(Mono requestMono) { records = records.transform(processor::postProcess); } return records - .map(consumerRecord -> ReceiveReply.newBuilder() - .setRecord( - ReceiveReply.Record.newBuilder() - .setOffset(consumerRecord.getOffset()) - .setReplay(consumerRecord.getOffset() <= latestAckedOffsets.getOrDefault(partition, Optional.empty()).orElse(-1L)) - .setKey(ByteString.copyFrom(consumerRecord.getEnvelope().getKey())) - .setValue(ByteString.copyFrom(consumerRecord.getEnvelope().getValue())) - .setTimestamp(Timestamp.newBuilder() - .setSeconds(consumerRecord.getTimestamp().getEpochSecond()) - .setNanos(consumerRecord.getTimestamp().getNano()) - ) - ) - .build() - ); + .map(consumerRecord -> { + var envelope = consumerRecord.getEnvelope(); + + var replyBuilder = ReceiveReply.Record.newBuilder() + .setOffset(consumerRecord.getOffset()) + .setReplay(consumerRecord.getOffset() <= latestAckedOffsets.getOrDefault(partition, Optional.empty()).orElse(-1L)) + .setValue(ByteString.copyFrom(envelope.getValue())) + .setTimestamp(Timestamp.newBuilder() + .setSeconds(consumerRecord.getTimestamp().getEpochSecond()) + .setNanos(consumerRecord.getTimestamp().getNano()) + ); + + if (envelope.getKey() != null) { + replyBuilder.setKey(ByteString.copyFrom(envelope.getKey())); + } + + return ReceiveReply.newBuilder().setRecord(replyBuilder).build(); + }); }); }) .log("receive", Level.SEVERE, SignalType.ON_ERROR) diff --git a/app/src/test/java/com/github/bsideup/liiklus/SmokeTest.java b/app/src/test/java/com/github/bsideup/liiklus/SmokeTest.java index d1353e45..e457c003 100644 --- a/app/src/test/java/com/github/bsideup/liiklus/SmokeTest.java +++ b/app/src/test/java/com/github/bsideup/liiklus/SmokeTest.java @@ -4,6 +4,7 @@ import com.github.bsideup.liiklus.protocol.ReceiveReply; import com.github.bsideup.liiklus.protocol.ReceiveRequest; import com.github.bsideup.liiklus.protocol.SubscribeRequest; +import com.github.bsideup.liiklus.records.RecordsStorage; import com.github.bsideup.liiklus.test.AbstractIntegrationTest; import com.google.protobuf.ByteString; import org.assertj.core.api.Condition; @@ -12,8 +13,11 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.SignalType; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.List; +import java.util.UUID; import java.util.logging.Level; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -86,4 +90,37 @@ public boolean matches(ReceiveReply value) { .extracting(it -> it.getRecord().getValue().toStringUtf8()) .containsSubsequence(values.toArray(new String[values.size()])); } + + @Test + public void testNullKey() throws Exception { + var subscribeAction = SubscribeRequest.newBuilder() + .setTopic(testName.getMethodName()) + .setGroup(testName.getMethodName()) + .setAutoOffsetReset(SubscribeRequest.AutoOffsetReset.EARLIEST) + .build(); + + var value = UUID.randomUUID().toString(); + var recordsStorage = applicationContext.getBean(RecordsStorage.class); + recordsStorage.publish(new RecordsStorage.Envelope( + subscribeAction.getTopic(), + null, // intentionally + ByteBuffer.wrap(value.getBytes(StandardCharsets.UTF_8)) + )).toCompletableFuture().join(); + + var record = stub + .subscribe(subscribeAction) + .flatMap(it -> stub.receive( + ReceiveRequest.newBuilder() + .setAssignment(it.getAssignment()) + .build() + )) + .map(ReceiveReply::getRecord) + .blockFirst(Duration.ofSeconds(10)); + + assertThat(record) + .isNotNull() + .satisfies(it -> { + assertThat(it.getValue().toStringUtf8()).as("value").isEqualTo(value); + }); + } } diff --git a/plugins/inmemory-records-storage/src/main/java/com/github/bsideup/liiklus/records/inmemory/InMemoryRecordsStorage.java b/plugins/inmemory-records-storage/src/main/java/com/github/bsideup/liiklus/records/inmemory/InMemoryRecordsStorage.java index cf6c9f40..76919d60 100644 --- a/plugins/inmemory-records-storage/src/main/java/com/github/bsideup/liiklus/records/inmemory/InMemoryRecordsStorage.java +++ b/plugins/inmemory-records-storage/src/main/java/com/github/bsideup/liiklus/records/inmemory/InMemoryRecordsStorage.java @@ -21,6 +21,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -51,7 +52,9 @@ public CompletionStage publish(Envelope envelope) { var topic = envelope.getTopic(); var storedTopic = state.computeIfAbsent(topic, __ -> new StoredTopic(numberOfPartitions)); - var partition = partitionByKey(envelope.getKey(), numberOfPartitions); + var partition = envelope.getKey() != null + ? partitionByKey(envelope.getKey(), numberOfPartitions) + : ThreadLocalRandom.current().nextInt(0, numberOfPartitions); var storedPartition = storedTopic.getPartitions().computeIfAbsent( partition, __ -> new StoredTopic.StoredPartition() @@ -126,7 +129,7 @@ public Publisher getPublisher() { .map(it -> new Record( new Envelope( topic, - it.getKey().asReadOnlyBuffer(), + it.getKey() != null ? it.getKey().asReadOnlyBuffer() : null, it.getValue().asReadOnlyBuffer() ), it.getTimestamp(), diff --git a/tck/src/main/java/com/github/bsideup/liiklus/records/tests/SubscribeTest.java b/tck/src/main/java/com/github/bsideup/liiklus/records/tests/SubscribeTest.java index e087a5af..bb00326c 100644 --- a/tck/src/main/java/com/github/bsideup/liiklus/records/tests/SubscribeTest.java +++ b/tck/src/main/java/com/github/bsideup/liiklus/records/tests/SubscribeTest.java @@ -5,6 +5,7 @@ import org.junit.jupiter.api.Test; import reactor.core.publisher.DirectProcessor; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.*; import java.util.concurrent.CompletableFuture; @@ -122,4 +123,25 @@ default void testInitialOffsets() throws Exception { offsetInfos.get(9).getOffset() ); } + + @Test + default void testNullKey() throws Exception { + var topic = getTopic(); + var offsetInfo = publish(new RecordsStorage.Envelope( + topic, + null, + ByteBuffer.wrap("hello".getBytes()) + )); + int partition = offsetInfo.getPartition(); + + var record = subscribeToPartition(partition) + .flatMap(RecordsStorage.PartitionSource::getPublisher) + .blockFirst(Duration.ofSeconds(10)); + + assertThat(record) + .isNotNull() + .satisfies(it -> { + assertThat(it.getOffset()).isEqualTo(offsetInfo.getOffset()); + }); + } }