From 80ab55cc1f34e322a8e4dd55ed24af4d09f770ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcin=20Bobi=C5=84ski?= <49727204+MarcinBobinski@users.noreply.github.com> Date: Fri, 10 Jan 2025 11:44:41 +0100 Subject: [PATCH] SKYEDEN-3020 | consumers retransmission improvements (#1941) * SKYEDEN-3020 | commiting offsets change on retransmission * SKYEDEN-3020 | prevent overriding commited offset with lower offset * SKYEDEN-3020 | tests + new approach * SKYEDEN-3020 | add comment * SKYEDEN-3020 | resolve conflicts * SKYEDEN-3020 | refactor --- .../common/kafka/offset/PartitionOffsets.java | 4 ++ .../consumers/consumer/BatchConsumer.java | 8 +-- .../hermes/consumers/consumer/Consumer.java | 4 +- .../consumers/consumer/SerialConsumer.java | 6 +-- .../consumer/batch/MessageBatchReceiver.java | 6 +-- .../ConsumerPartitionAssignmentState.java | 2 +- .../broker/KafkaConsumerOffsetMover.java | 54 ++++++++++++++++++- .../consumer/receiver/MessageReceiver.java | 4 +- .../receiver/ThrottlingMessageReceiver.java | 6 +-- .../UninitializedMessageReceiver.java | 4 +- .../kafka/FilteringMessageReceiver.java | 6 +-- .../KafkaSingleThreadedMessageReceiver.java | 11 ++-- .../supervisor/process/Retransmitter.java | 40 +++++--------- .../supervisor/process/ConsumerStub.groovy | 6 +-- .../client/integration/HermesTestClient.java | 2 +- .../KafkaRetransmissionServiceTest.java | 20 ++++++- 16 files changed, 121 insertions(+), 62 deletions(-) diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffsets.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffsets.java index c0c5b1ef30..4a9e552d62 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffsets.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffsets.java @@ -22,4 +22,8 @@ public PartitionOffsets addAll(PartitionOffsets offsets) { public Iterator iterator() { return offsets.iterator(); } + + public boolean isEmpty() { + return offsets.isEmpty(); + } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java index 272ca6287a..04047ed47d 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java @@ -22,7 +22,7 @@ import pl.allegro.tech.hermes.api.BatchSubscriptionPolicy; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper; import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatch; @@ -239,11 +239,11 @@ public void commit(Set offsetsToCommit) { } @Override - public boolean moveOffset(PartitionOffset partitionOffset) { + public PartitionOffsets moveOffset(PartitionOffsets partitionOffsets) { if (receiver != null) { - return receiver.moveOffset(partitionOffset); + return receiver.moveOffset(partitionOffsets); } - return false; + return new PartitionOffsets(); } @Override diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Consumer.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Consumer.java index 858811f891..1b95b33399 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Consumer.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Consumer.java @@ -3,7 +3,7 @@ import java.util.Set; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset; public interface Consumer { @@ -26,7 +26,7 @@ public interface Consumer { void commit(Set offsets); - boolean moveOffset(PartitionOffset subscriptionPartitionOffset); + PartitionOffsets moveOffset(PartitionOffsets subscriptionPartitionOffsets); Subscription getSubscription(); diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java index a8f1bd3212..6d07d74a14 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java @@ -11,7 +11,7 @@ import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.consumers.CommonConsumerParameters; import pl.allegro.tech.hermes.consumers.consumer.converter.MessageConverterResolver; @@ -262,8 +262,8 @@ public void commit(Set offsets) { } @Override - public boolean moveOffset(PartitionOffset offset) { - return messageReceiver.moveOffset(offset); + public PartitionOffsets moveOffset(PartitionOffsets offsets) { + return messageReceiver.moveOffset(offsets); } @Override diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java index 03d76d0162..526e0f9977 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java @@ -16,7 +16,7 @@ import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper; import pl.allegro.tech.hermes.common.message.wrapper.UnsupportedContentTypeException; import pl.allegro.tech.hermes.consumers.consumer.Message; @@ -180,8 +180,8 @@ public void commit(Set offsets) { receiver.commit(offsets); } - public boolean moveOffset(PartitionOffset offset) { - return receiver.moveOffset(offset); + public PartitionOffsets moveOffset(PartitionOffsets offsets) { + return receiver.moveOffset(offsets); } public Set getAssignedPartitions() { diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/ConsumerPartitionAssignmentState.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/ConsumerPartitionAssignmentState.java index ffc41462d9..ca30da47bb 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/ConsumerPartitionAssignmentState.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/ConsumerPartitionAssignmentState.java @@ -35,7 +35,7 @@ public void assign(SubscriptionName name, Collection partitions) { })); } - private void incrementTerm(SubscriptionName name) { + public void incrementTerm(SubscriptionName name) { terms.compute(name, ((subscriptionName, term) -> term == null ? 0L : term + 1L)); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/kafka/broker/KafkaConsumerOffsetMover.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/kafka/broker/KafkaConsumerOffsetMover.java index 7f26db83f2..ef3028ffe2 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/kafka/broker/KafkaConsumerOffsetMover.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/kafka/broker/KafkaConsumerOffsetMover.java @@ -1,11 +1,16 @@ package pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker; +import java.util.LinkedHashMap; +import java.util.Map; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; +import pl.allegro.tech.hermes.consumers.consumer.offset.ConsumerPartitionAssignmentState; public class KafkaConsumerOffsetMover { @@ -13,13 +18,38 @@ public class KafkaConsumerOffsetMover { private final SubscriptionName subscriptionName; private KafkaConsumer consumer; + private ConsumerPartitionAssignmentState partitionAssignmentState; - public KafkaConsumerOffsetMover(SubscriptionName subscriptionName, KafkaConsumer consumer) { + public KafkaConsumerOffsetMover( + SubscriptionName subscriptionName, + KafkaConsumer consumer, + ConsumerPartitionAssignmentState partitionAssignmentState) { this.subscriptionName = subscriptionName; this.consumer = consumer; + this.partitionAssignmentState = partitionAssignmentState; } - public boolean move(PartitionOffset offset) { + public PartitionOffsets move(PartitionOffsets offsets) { + PartitionOffsets movedOffsets = new PartitionOffsets(); + + for (PartitionOffset offset : offsets) { + if (move(offset)) { + movedOffsets.add(offset); + } + } + + commit(movedOffsets); + + if (!movedOffsets.isEmpty()) { + // Incrementing assignment term ensures that currently committed offsets won't be overwritten + // by the events from the past which are concurrently processed by the consumer + partitionAssignmentState.incrementTerm(subscriptionName); + } + + return movedOffsets; + } + + private boolean move(PartitionOffset offset) { try { TopicPartition tp = new TopicPartition(offset.getTopic().asString(), offset.getPartition()); if (consumer.assignment().contains(tp)) { @@ -46,4 +76,24 @@ public boolean move(PartitionOffset offset) { return false; } } + + private void commit(PartitionOffsets partitionOffsets) { + try { + Map offsetsToCommit = new LinkedHashMap<>(); + for (PartitionOffset partitionOffset : partitionOffsets) { + offsetsToCommit.put( + new TopicPartition( + partitionOffset.getTopic().asString(), partitionOffset.getPartition()), + new OffsetAndMetadata(partitionOffset.getOffset())); + } + if (!offsetsToCommit.isEmpty()) { + consumer.commitSync(offsetsToCommit); + } + } catch (Exception e) { + logger.error( + "Failed to commit offsets while trying to move them for subscription {}", + subscriptionName, + e); + } + } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java index 30997eb2bc..69f3920880 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java @@ -3,7 +3,7 @@ import java.util.Optional; import java.util.Set; import pl.allegro.tech.hermes.api.Subscription; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset; @@ -33,7 +33,7 @@ default void update(Subscription newSubscription) {} void commit(Set offsets); - boolean moveOffset(PartitionOffset offset); + PartitionOffsets moveOffset(PartitionOffsets offsets); Set getAssignedPartitions(); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/ThrottlingMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/ThrottlingMessageReceiver.java index 7febb6fd21..4e3ff42bb0 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/ThrottlingMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/ThrottlingMessageReceiver.java @@ -4,7 +4,7 @@ import java.util.Set; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.SubscriptionName; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.idletime.IdleTimeCalculator; @@ -53,8 +53,8 @@ public void commit(Set offsets) { } @Override - public boolean moveOffset(PartitionOffset offset) { - return receiver.moveOffset(offset); + public PartitionOffsets moveOffset(PartitionOffsets offsets) { + return receiver.moveOffset(offsets); } @Override diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/UninitializedMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/UninitializedMessageReceiver.java index f890174b58..f06e0a8136 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/UninitializedMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/UninitializedMessageReceiver.java @@ -2,7 +2,7 @@ import java.util.Optional; import java.util.Set; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset; @@ -18,7 +18,7 @@ public void commit(Set offsets) { } @Override - public boolean moveOffset(PartitionOffset offset) { + public PartitionOffsets moveOffset(PartitionOffsets offsets) { throw new ConsumerNotInitializedException(); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java index b6addea90d..b3c1e7a69d 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java @@ -4,7 +4,7 @@ import java.util.Optional; import java.util.Set; import pl.allegro.tech.hermes.api.Subscription; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.filtering.FilteredMessageHandler; import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset; @@ -65,8 +65,8 @@ public void commit(Set offsets) { } @Override - public boolean moveOffset(PartitionOffset offset) { - return receiver.moveOffset(offset); + public PartitionOffsets moveOffset(PartitionOffsets offsets) { + return receiver.moveOffset(offsets); } @Override diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java index b1f6f9b86d..6e45559c46 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java @@ -25,7 +25,7 @@ import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper; import pl.allegro.tech.hermes.common.kafka.KafkaTopic; import pl.allegro.tech.hermes.common.kafka.KafkaTopics; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.load.SubscriptionLoadRecorder; @@ -74,7 +74,9 @@ public KafkaSingleThreadedMessageReceiver( this.partitionAssignmentState = partitionAssignmentState; this.consumer = consumer; this.readQueue = new ArrayBlockingQueue<>(readQueueCapacity); - this.offsetMover = new KafkaConsumerOffsetMover(subscription.getQualifiedName(), consumer); + this.offsetMover = + new KafkaConsumerOffsetMover( + subscription.getQualifiedName(), consumer, partitionAssignmentState); Map topics = getKafkaTopics(topic, kafkaNamesMapper).stream() .collect(Collectors.toMap(t -> t.name().asString(), Function.identity())); @@ -195,6 +197,7 @@ public void commit(Set offsets) { private Map createOffset( Set partitionOffsets) { + Map offsetsData = new LinkedHashMap<>(); for (SubscriptionPartitionOffset partitionOffset : partitionOffsets) { TopicPartition topicAndPartition = @@ -223,8 +226,8 @@ private Map createOffset( } @Override - public boolean moveOffset(PartitionOffset offset) { - return offsetMover.move(offset); + public PartitionOffsets moveOffset(PartitionOffsets offsets) { + return offsetMover.move(offsets); } public Set getAssignedPartitions() { diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/Retransmitter.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/Retransmitter.java index 9ecde5980e..1df91a8b24 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/Retransmitter.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/Retransmitter.java @@ -33,36 +33,22 @@ public void reloadOffsets(SubscriptionName subscriptionName, Consumer consumer) brokersClusterName, consumer.getAssignedPartitions()); - for (PartitionOffset partitionOffset : offsets) { - if (moveOffset(subscriptionName, consumer, partitionOffset)) { - subscriptionOffsetChangeIndicator.removeOffset( - subscriptionName.getTopicName(), - subscriptionName.getName(), - brokersClusterName, - partitionOffset.getTopic(), - partitionOffset.getPartition()); - logger.info( - "Removed offset indicator for subscription={} and partition={}", - subscriptionName, - partitionOffset.getPartition()); - } + PartitionOffsets movedOffsets = consumer.moveOffset(offsets); + + for (PartitionOffset partitionOffset : movedOffsets) { + subscriptionOffsetChangeIndicator.removeOffset( + subscriptionName.getTopicName(), + subscriptionName.getName(), + brokersClusterName, + partitionOffset.getTopic(), + partitionOffset.getPartition()); + logger.info( + "Removed offset indicator for subscription={} and partition={}", + subscriptionName, + partitionOffset.getPartition()); } } catch (Exception ex) { throw new RetransmissionException(ex); } } - - private boolean moveOffset( - SubscriptionName subscriptionName, Consumer consumer, PartitionOffset partitionOffset) { - try { - return consumer.moveOffset(partitionOffset); - } catch (IllegalStateException ex) { - logger.warn( - "Cannot move offset for subscription={} and partition={} , possibly owned by different node", - subscriptionName, - partitionOffset.getPartition(), - ex); - return false; - } - } } diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerStub.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerStub.groovy index b503282b7b..e9445d016e 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerStub.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerStub.groovy @@ -2,7 +2,7 @@ package pl.allegro.tech.hermes.consumers.supervisor.process import pl.allegro.tech.hermes.api.Subscription import pl.allegro.tech.hermes.api.Topic -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets import pl.allegro.tech.hermes.consumers.consumer.Consumer import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset @@ -78,8 +78,8 @@ class ConsumerStub implements Consumer { } @Override - boolean moveOffset(PartitionOffset partitionOffset) { - return true + PartitionOffsets moveOffset(PartitionOffsets partitionOffset) { + return partitionOffset } boolean getInitialized() { diff --git a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java index b4c3bb02b0..e75cc56ee3 100644 --- a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java +++ b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java @@ -189,7 +189,7 @@ public void waitUntilConsumerCommitsOffset(String topicQualifiedName, String sub }); } - private long calculateCommittedMessages(String topicQualifiedName, String subscription) { + public long calculateCommittedMessages(String topicQualifiedName, String subscription) { AtomicLong messagesCommittedCount = new AtomicLong(0); List consumerGroups = getConsumerGroupsDescription(topicQualifiedName, subscription) diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java index 77a2b9c93f..0daa5be05a 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java @@ -75,6 +75,9 @@ public void shouldMoveOffsetNearGivenTimestamp(boolean suspendedSubscription) publishAndConsumeMessages(messages2, topic, subscriber); hermes.api().waitUntilConsumerCommitsOffset(topic.getQualifiedName(), subscription.getName()); + long commitedMessages = + hermes.api().calculateCommittedMessages(topic.getQualifiedName(), subscription.getName()); + if (suspendedSubscription) { hermes.api().suspendSubscription(topic, subscription.getName()); hermes.api().waitUntilSubscriptionSuspended(topic.getQualifiedName(), subscription.getName()); @@ -87,14 +90,27 @@ public void shouldMoveOffsetNearGivenTimestamp(boolean suspendedSubscription) .retransmit( topic.getQualifiedName(), subscription.getName(), retransmissionDate, false); + // then + response.expectStatus().isOk(); + // Check if Kafka committed offsets were moved on retransmission + assertThat( + hermes + .api() + .calculateCommittedMessages(topic.getQualifiedName(), subscription.getName())) + .isLessThan(commitedMessages); + if (suspendedSubscription) { hermes.api().activateSubscription(topic, subscription.getName()); hermes.api().waitUntilSubscriptionActivated(topic.getQualifiedName(), subscription.getName()); } - // then - response.expectStatus().isOk(); messages2.forEach(subscriber::waitUntilReceived); + hermes.api().waitUntilConsumerCommitsOffset(topic.getQualifiedName(), subscription.getName()); + assertThat( + hermes + .api() + .calculateCommittedMessages(topic.getQualifiedName(), subscription.getName())) + .isEqualTo(commitedMessages); } @Test