diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ConsumerGroup.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ConsumerGroup.java index 61528237df..223c7132be 100644 --- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ConsumerGroup.java +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ConsumerGroup.java @@ -41,6 +41,14 @@ public Set getMembers() { return members; } + public boolean isStable() { + return state.equals("Stable"); + } + + public boolean isEmpty() { + return state.equals("Empty"); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffset.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffset.java index 459770e508..10aa1f3caf 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffset.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffset.java @@ -52,4 +52,16 @@ public boolean equals(Object obj) { public int hashCode() { return Objects.hash(topic, partition, offset); } + + @Override + public String toString() { + return "PartitionOffset{" + + "topic=" + + topic + + ", partition=" + + partition + + ", offset=" + + offset + + '}'; + } } diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffsets.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffsets.java index c0c5b1ef30..4a9e552d62 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffsets.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/PartitionOffsets.java @@ -22,4 +22,8 @@ public PartitionOffsets addAll(PartitionOffsets offsets) { public Iterator iterator() { return offsets.iterator(); } + + public boolean isEmpty() { + return offsets.isEmpty(); + } } diff --git a/hermes-console/json-server/server.ts b/hermes-console/json-server/server.ts index 0a47cb6361..14790ef81f 100644 --- a/hermes-console/json-server/server.ts +++ b/hermes-console/json-server/server.ts @@ -20,10 +20,6 @@ server.post('/query/subscriptions', (req, res) => { res.jsonp(subscriptions); }); -server.post('/topics/*/subscriptions/*/moveOffsetsToTheEnd', (req, res) => { - res.sendStatus(200); -}); - server.post('/topicSubscriptions', (req, res) => { res.sendStatus(200); }); @@ -83,7 +79,9 @@ server.post('/offline-retransmission/tasks', (req, res) => { server.put( '/topics/:topic/subscriptions/:subscroption/retransmission', (req, res) => { - res.sendStatus(200); + setTimeout(() => { + res.sendStatus(200); + }, 2000); }, ); diff --git a/hermes-console/src/api/hermes-client/index.ts b/hermes-console/src/api/hermes-client/index.ts index c98cefc70f..ecd99622fc 100644 --- a/hermes-console/src/api/hermes-client/index.ts +++ b/hermes-console/src/api/hermes-client/index.ts @@ -310,15 +310,6 @@ export function fetchDashboardUrl(path: string): ResponsePromise { return axios.get(path); } -export function moveSubscriptionOffsets( - topicName: string, - subscription: string, -): ResponsePromise { - return axios.post( - `/topics/${topicName}/subscriptions/${subscription}/moveOffsetsToTheEnd`, - ); -} - export function removeTopic(topic: String): ResponsePromise { return axios.delete(`/topics/${topic}`); } diff --git a/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.spec.ts b/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.spec.ts index 0da6ef7da9..f6a2dc05c4 100644 --- a/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.spec.ts +++ b/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.spec.ts @@ -3,14 +3,9 @@ import { createTestingPinia } from '@pinia/testing'; import { dummyConsumerGroups } from '@/dummy/consumerGroups'; import { dummySubscription } from '@/dummy/subscription'; import { dummyTopic } from '@/dummy/topic'; -import { - expectNotificationDispatched, - notificationStoreSpy, -} from '@/utils/test-utils'; import { fetchConsumerGroupsErrorHandler, fetchConsumerGroupsHandler, - moveSubscriptionOffsetsHandler, } from '@/mocks/handlers'; import { setActivePinia } from 'pinia'; import { setupServer } from 'msw/node'; @@ -81,56 +76,4 @@ describe('useConsumerGroups', () => { expect(error.value.fetchConsumerGroups).not.toBeNull(); }); }); - - it('should show message that moving offsets was successful', async () => { - // given - server.use( - moveSubscriptionOffsetsHandler({ - topicName, - subscriptionName, - statusCode: 200, - }), - ); - server.listen(); - const notificationStore = notificationStoreSpy(); - - const { moveOffsets } = useConsumerGroups(topicName, subscriptionName); - - // when - moveOffsets(); - - // then - await waitFor(() => { - expectNotificationDispatched(notificationStore, { - type: 'success', - title: 'notifications.subscriptionOffsets.move.success', - }); - }); - }); - - it('should show message that moving offsets was unsuccessful', async () => { - // given - server.use( - moveSubscriptionOffsetsHandler({ - topicName, - subscriptionName, - statusCode: 500, - }), - ); - server.listen(); - - const notificationStore = notificationStoreSpy(); - const { moveOffsets } = useConsumerGroups(topicName, subscriptionName); - - // when - moveOffsets(); - - // then - await waitFor(() => { - expectNotificationDispatched(notificationStore, { - type: 'error', - title: 'notifications.subscriptionOffsets.move.failure', - }); - }); - }); }); diff --git a/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.ts b/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.ts index 0d5557c4a3..8956243cef 100644 --- a/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.ts +++ b/hermes-console/src/composables/consumer-groups/use-consumer-groups/useConsumerGroups.ts @@ -1,17 +1,10 @@ -import { dispatchErrorNotification } from '@/utils/notification-utils'; -import { - fetchConsumerGroups as getConsumerGroups, - moveSubscriptionOffsets, -} from '@/api/hermes-client'; +import { fetchConsumerGroups as getConsumerGroups } from '@/api/hermes-client'; import { ref } from 'vue'; -import { useGlobalI18n } from '@/i18n'; -import { useNotificationsStore } from '@/store/app-notifications/useAppNotifications'; import type { ConsumerGroup } from '@/api/consumer-group'; import type { Ref } from 'vue'; export interface UseConsumerGroups { consumerGroups: Ref; - moveOffsets: () => void; loading: Ref; error: Ref; } @@ -43,36 +36,10 @@ export function useConsumerGroups( } }; - const moveOffsets = async () => { - const notificationsStore = useNotificationsStore(); - try { - await moveSubscriptionOffsets(topicName, subscriptionName); - await notificationsStore.dispatchNotification({ - title: useGlobalI18n().t( - 'notifications.subscriptionOffsets.move.success', - { - subscriptionName, - }, - ), - text: '', - type: 'success', - }); - } catch (e: any) { - await dispatchErrorNotification( - e, - notificationsStore, - useGlobalI18n().t('notifications.subscriptionOffsets.move.failure', { - subscriptionName, - }), - ); - } - }; - fetchConsumerGroups(); return { consumerGroups, - moveOffsets, loading, error, }; diff --git a/hermes-console/src/composables/subscription/use-subscription/useSubscription.spec.ts b/hermes-console/src/composables/subscription/use-subscription/useSubscription.spec.ts index d4bfdc6f82..6a615eace7 100644 --- a/hermes-console/src/composables/subscription/use-subscription/useSubscription.spec.ts +++ b/hermes-console/src/composables/subscription/use-subscription/useSubscription.spec.ts @@ -1,4 +1,4 @@ -import { afterEach } from 'vitest'; +import { afterEach, expect } from 'vitest'; import { createRetransmissionHandler, fetchSubscriptionErrorHandler, @@ -18,7 +18,6 @@ import { dummySubscriptionHealth, dummySubscriptionMetrics, } from '@/dummy/subscription'; -import { expect } from 'vitest'; import { expectNotificationDispatched, notificationStoreSpy, @@ -391,6 +390,39 @@ describe('useSubscription', () => { }); }); + [200, 500].forEach((statusCode) => { + it(`should correctly manage the state of retransmission with status code ${statusCode}`, async () => { + // given + server.use( + createRetransmissionHandler({ + topicName: dummySubscription.topicName, + subscriptionName: dummySubscription.name, + statusCode, + delayMs: 100, + }), + ); + server.listen(); + + const { retransmitMessages, retransmitting } = useSubscription( + dummySubscription.topicName, + dummySubscription.name, + ); + + expect(retransmitting.value).toBeFalsy(); + + // when + retransmitMessages(new Date().toISOString()); + + // then + await waitFor(() => { + expect(retransmitting.value).toBeTruthy(); + }); + await waitFor(() => { + expect(retransmitting.value).toBeFalsy(); + }); + }); + }); + it('should show message that skipping all messages was successful', async () => { // given server.use( @@ -448,6 +480,39 @@ describe('useSubscription', () => { }); }); }); + + [200, 500].forEach((statusCode) => { + it(`should correctly manage the state of skipping all messages with status code ${statusCode}`, async () => { + // given + server.use( + createRetransmissionHandler({ + topicName: dummySubscription.topicName, + subscriptionName: dummySubscription.name, + statusCode, + delayMs: 100, + }), + ); + server.listen(); + + const { skipAllMessages, skippingAllMessages } = useSubscription( + dummySubscription.topicName, + dummySubscription.name, + ); + + expect(skippingAllMessages.value).toBeFalsy(); + + // when + skipAllMessages(); + + // then + await waitFor(() => { + expect(skippingAllMessages.value).toBeTruthy(); + }); + await waitFor(() => { + expect(skippingAllMessages.value).toBeFalsy(); + }); + }); + }); }); function expectErrors( diff --git a/hermes-console/src/composables/subscription/use-subscription/useSubscription.ts b/hermes-console/src/composables/subscription/use-subscription/useSubscription.ts index 7b4581bda1..149ac21a1c 100644 --- a/hermes-console/src/composables/subscription/use-subscription/useSubscription.ts +++ b/hermes-console/src/composables/subscription/use-subscription/useSubscription.ts @@ -32,6 +32,8 @@ export interface UseSubscription { subscriptionLastUndeliveredMessage: Ref; trackingUrls: Ref; loading: Ref; + retransmitting: Ref; + skippingAllMessages: Ref; error: Ref; removeSubscription: () => Promise; suspendSubscription: () => Promise; @@ -73,7 +75,8 @@ export function useSubscription( fetchSubscriptionLastUndeliveredMessage: null, getSubscriptionTrackingUrls: null, }); - + const retransmitting = ref(false); + const skippingAllMessages = ref(false); const fetchSubscription = async () => { try { loading.value = true; @@ -233,6 +236,7 @@ export function useSubscription( }; const retransmitMessages = async (from: string): Promise => { + retransmitting.value = true; try { await retransmitSubscriptionMessages(topicName, subscriptionName, { retransmissionDate: from, @@ -257,10 +261,13 @@ export function useSubscription( }), ); return false; + } finally { + retransmitting.value = false; } }; const skipAllMessages = async (): Promise => { + skippingAllMessages.value = true; const tomorrowDate = new Date(); tomorrowDate.setDate(tomorrowDate.getDate() + 1); try { @@ -290,6 +297,8 @@ export function useSubscription( ), ); return false; + } finally { + skippingAllMessages.value = false; } }; @@ -305,6 +314,8 @@ export function useSubscription( subscriptionLastUndeliveredMessage, trackingUrls, loading, + retransmitting, + skippingAllMessages, error, removeSubscription, suspendSubscription, diff --git a/hermes-console/src/i18n/en-US/index.ts b/hermes-console/src/i18n/en-US/index.ts index e189bac9df..2af47cb606 100644 --- a/hermes-console/src/i18n/en-US/index.ts +++ b/hermes-console/src/i18n/en-US/index.ts @@ -625,10 +625,6 @@ const en_US = { reason: 'Reason', timestamp: 'Timestamp', }, - moveOffsets: { - tooltip: 'Move subscription offsets to the end', - button: 'MOVE OFFSETS', - }, }, search: { collection: { diff --git a/hermes-console/src/mocks/handlers.ts b/hermes-console/src/mocks/handlers.ts index 335b4d152f..02927fad47 100644 --- a/hermes-console/src/mocks/handlers.ts +++ b/hermes-console/src/mocks/handlers.ts @@ -817,24 +817,6 @@ export const switchReadinessErrorHandler = ({ }); }); -export const moveSubscriptionOffsetsHandler = ({ - topicName, - subscriptionName, - statusCode, -}: { - topicName: string; - subscriptionName: string; - statusCode: number; -}) => - http.post( - `${url}/topics/${topicName}/subscriptions/${subscriptionName}/moveOffsetsToTheEnd`, - () => { - return new HttpResponse(undefined, { - status: statusCode, - }); - }, - ); - export const upsertTopicConstraintHandler = ({ statusCode, }: { @@ -974,14 +956,19 @@ export const createRetransmissionHandler = ({ statusCode, topicName, subscriptionName, + delayMs, }: { statusCode: number; topicName: string; subscriptionName: string; + delayMs?: number; }) => http.put( `${url}/topics/${topicName}/subscriptions/${subscriptionName}/retransmission`, - () => { + async () => { + if (delayMs && delayMs > 0) { + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } return new HttpResponse(undefined, { status: statusCode, }); diff --git a/hermes-console/src/views/admin/consumer-groups/ConsumerGroupsView.vue b/hermes-console/src/views/admin/consumer-groups/ConsumerGroupsView.vue index e6149517dc..1da2a768ce 100644 --- a/hermes-console/src/views/admin/consumer-groups/ConsumerGroupsView.vue +++ b/hermes-console/src/views/admin/consumer-groups/ConsumerGroupsView.vue @@ -12,7 +12,7 @@ const params = route.params as Record; const { subscriptionId, topicId, groupId } = params; - const { consumerGroups, moveOffsets, loading, error } = useConsumerGroups( + const { consumerGroups, loading, error } = useConsumerGroups( topicId, subscriptionId, ); @@ -64,14 +64,6 @@ {{ $t('consumerGroups.title') }}

- - - {{ $t('subscription.moveOffsets.button') }} - {{ - $t('subscription.moveOffsets.tooltip') - }} - - diff --git a/hermes-console/src/views/subscription/SubscriptionView.spec.ts b/hermes-console/src/views/subscription/SubscriptionView.spec.ts index 5cf315fa29..06b5372ab4 100644 --- a/hermes-console/src/views/subscription/SubscriptionView.spec.ts +++ b/hermes-console/src/views/subscription/SubscriptionView.spec.ts @@ -37,6 +37,8 @@ const useSubscriptionStub: ReturnType = { subscriptionUndeliveredMessages: ref(dummyUndeliveredMessages), subscriptionLastUndeliveredMessage: ref(dummyUndeliveredMessage), trackingUrls: ref(dummyTrackingUrls), + retransmitting: computed(() => false), + skippingAllMessages: computed(() => false), error: ref({ fetchSubscription: null, fetchOwner: null, diff --git a/hermes-console/src/views/subscription/SubscriptionView.vue b/hermes-console/src/views/subscription/SubscriptionView.vue index 728ae5e0d0..e314757030 100644 --- a/hermes-console/src/views/subscription/SubscriptionView.vue +++ b/hermes-console/src/views/subscription/SubscriptionView.vue @@ -36,6 +36,8 @@ subscriptionUndeliveredMessages, subscriptionLastUndeliveredMessage, trackingUrls, + retransmitting, + skippingAllMessages, error, loading, removeSubscription, @@ -108,6 +110,10 @@ await retransmitMessages(fromDate); }; + const onSkipAllMessages = async () => { + await skipAllMessages(); + }; + const breadcrumbsItems = [ { title: t('subscription.subscriptionBreadcrumbs.home'), @@ -234,8 +240,10 @@ v-if="isSubscriptionOwnerOrAdmin(roles)" :topic="topicId" :subscription="subscriptionId" + :retransmitting="retransmitting" + :skippingAllMessages="skippingAllMessages" @retransmit="onRetransmit" - @skipAllMessages="skipAllMessages" + @skipAllMessages="onSkipAllMessages" /> - {{ $t('subscription.manageMessagesCard.retransmitButton') }} + + + {{ $t('subscription.manageMessagesCard.retransmitButton') }} + @@ -128,12 +135,16 @@ {{ $t('subscription.manageMessagesCard.skipAllMessagesTitle') }}

- {{ $t('subscription.manageMessagesCard.skipAllMessagesButton') }} + + + {{ $t('subscription.manageMessagesCard.skipAllMessagesButton') }} + diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java index 272ca6287a..04047ed47d 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java @@ -22,7 +22,7 @@ import pl.allegro.tech.hermes.api.BatchSubscriptionPolicy; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper; import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatch; @@ -239,11 +239,11 @@ public void commit(Set offsetsToCommit) { } @Override - public boolean moveOffset(PartitionOffset partitionOffset) { + public PartitionOffsets moveOffset(PartitionOffsets partitionOffsets) { if (receiver != null) { - return receiver.moveOffset(partitionOffset); + return receiver.moveOffset(partitionOffsets); } - return false; + return new PartitionOffsets(); } @Override diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Consumer.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Consumer.java index 858811f891..1b95b33399 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Consumer.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Consumer.java @@ -3,7 +3,7 @@ import java.util.Set; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset; public interface Consumer { @@ -26,7 +26,7 @@ public interface Consumer { void commit(Set offsets); - boolean moveOffset(PartitionOffset subscriptionPartitionOffset); + PartitionOffsets moveOffset(PartitionOffsets subscriptionPartitionOffsets); Subscription getSubscription(); diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java index a8f1bd3212..6d07d74a14 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java @@ -11,7 +11,7 @@ import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.consumers.CommonConsumerParameters; import pl.allegro.tech.hermes.consumers.consumer.converter.MessageConverterResolver; @@ -262,8 +262,8 @@ public void commit(Set offsets) { } @Override - public boolean moveOffset(PartitionOffset offset) { - return messageReceiver.moveOffset(offset); + public PartitionOffsets moveOffset(PartitionOffsets offsets) { + return messageReceiver.moveOffset(offsets); } @Override diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java index 03d76d0162..526e0f9977 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java @@ -16,7 +16,7 @@ import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.Topic; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper; import pl.allegro.tech.hermes.common.message.wrapper.UnsupportedContentTypeException; import pl.allegro.tech.hermes.consumers.consumer.Message; @@ -180,8 +180,8 @@ public void commit(Set offsets) { receiver.commit(offsets); } - public boolean moveOffset(PartitionOffset offset) { - return receiver.moveOffset(offset); + public PartitionOffsets moveOffset(PartitionOffsets offsets) { + return receiver.moveOffset(offsets); } public Set getAssignedPartitions() { diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/ConsumerPartitionAssignmentState.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/ConsumerPartitionAssignmentState.java index ffc41462d9..ca30da47bb 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/ConsumerPartitionAssignmentState.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/ConsumerPartitionAssignmentState.java @@ -35,7 +35,7 @@ public void assign(SubscriptionName name, Collection partitions) { })); } - private void incrementTerm(SubscriptionName name) { + public void incrementTerm(SubscriptionName name) { terms.compute(name, ((subscriptionName, term) -> term == null ? 0L : term + 1L)); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/kafka/broker/KafkaConsumerOffsetMover.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/kafka/broker/KafkaConsumerOffsetMover.java index 7f26db83f2..ef3028ffe2 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/kafka/broker/KafkaConsumerOffsetMover.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/kafka/broker/KafkaConsumerOffsetMover.java @@ -1,11 +1,16 @@ package pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker; +import java.util.LinkedHashMap; +import java.util.Map; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; +import pl.allegro.tech.hermes.consumers.consumer.offset.ConsumerPartitionAssignmentState; public class KafkaConsumerOffsetMover { @@ -13,13 +18,38 @@ public class KafkaConsumerOffsetMover { private final SubscriptionName subscriptionName; private KafkaConsumer consumer; + private ConsumerPartitionAssignmentState partitionAssignmentState; - public KafkaConsumerOffsetMover(SubscriptionName subscriptionName, KafkaConsumer consumer) { + public KafkaConsumerOffsetMover( + SubscriptionName subscriptionName, + KafkaConsumer consumer, + ConsumerPartitionAssignmentState partitionAssignmentState) { this.subscriptionName = subscriptionName; this.consumer = consumer; + this.partitionAssignmentState = partitionAssignmentState; } - public boolean move(PartitionOffset offset) { + public PartitionOffsets move(PartitionOffsets offsets) { + PartitionOffsets movedOffsets = new PartitionOffsets(); + + for (PartitionOffset offset : offsets) { + if (move(offset)) { + movedOffsets.add(offset); + } + } + + commit(movedOffsets); + + if (!movedOffsets.isEmpty()) { + // Incrementing assignment term ensures that currently committed offsets won't be overwritten + // by the events from the past which are concurrently processed by the consumer + partitionAssignmentState.incrementTerm(subscriptionName); + } + + return movedOffsets; + } + + private boolean move(PartitionOffset offset) { try { TopicPartition tp = new TopicPartition(offset.getTopic().asString(), offset.getPartition()); if (consumer.assignment().contains(tp)) { @@ -46,4 +76,24 @@ public boolean move(PartitionOffset offset) { return false; } } + + private void commit(PartitionOffsets partitionOffsets) { + try { + Map offsetsToCommit = new LinkedHashMap<>(); + for (PartitionOffset partitionOffset : partitionOffsets) { + offsetsToCommit.put( + new TopicPartition( + partitionOffset.getTopic().asString(), partitionOffset.getPartition()), + new OffsetAndMetadata(partitionOffset.getOffset())); + } + if (!offsetsToCommit.isEmpty()) { + consumer.commitSync(offsetsToCommit); + } + } catch (Exception e) { + logger.error( + "Failed to commit offsets while trying to move them for subscription {}", + subscriptionName, + e); + } + } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java index 30997eb2bc..69f3920880 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java @@ -3,7 +3,7 @@ import java.util.Optional; import java.util.Set; import pl.allegro.tech.hermes.api.Subscription; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset; @@ -33,7 +33,7 @@ default void update(Subscription newSubscription) {} void commit(Set offsets); - boolean moveOffset(PartitionOffset offset); + PartitionOffsets moveOffset(PartitionOffsets offsets); Set getAssignedPartitions(); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/ThrottlingMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/ThrottlingMessageReceiver.java index 7febb6fd21..4e3ff42bb0 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/ThrottlingMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/ThrottlingMessageReceiver.java @@ -4,7 +4,7 @@ import java.util.Set; import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.SubscriptionName; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.idletime.IdleTimeCalculator; @@ -53,8 +53,8 @@ public void commit(Set offsets) { } @Override - public boolean moveOffset(PartitionOffset offset) { - return receiver.moveOffset(offset); + public PartitionOffsets moveOffset(PartitionOffsets offsets) { + return receiver.moveOffset(offsets); } @Override diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/UninitializedMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/UninitializedMessageReceiver.java index f890174b58..f06e0a8136 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/UninitializedMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/UninitializedMessageReceiver.java @@ -2,7 +2,7 @@ import java.util.Optional; import java.util.Set; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset; @@ -18,7 +18,7 @@ public void commit(Set offsets) { } @Override - public boolean moveOffset(PartitionOffset offset) { + public PartitionOffsets moveOffset(PartitionOffsets offsets) { throw new ConsumerNotInitializedException(); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java index b6addea90d..b3c1e7a69d 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java @@ -4,7 +4,7 @@ import java.util.Optional; import java.util.Set; import pl.allegro.tech.hermes.api.Subscription; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.filtering.FilteredMessageHandler; import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset; @@ -65,8 +65,8 @@ public void commit(Set offsets) { } @Override - public boolean moveOffset(PartitionOffset offset) { - return receiver.moveOffset(offset); + public PartitionOffsets moveOffset(PartitionOffsets offsets) { + return receiver.moveOffset(offsets); } @Override diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java index b1f6f9b86d..6e45559c46 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java @@ -25,7 +25,7 @@ import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper; import pl.allegro.tech.hermes.common.kafka.KafkaTopic; import pl.allegro.tech.hermes.common.kafka.KafkaTopics; -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets; import pl.allegro.tech.hermes.common.metric.MetricsFacade; import pl.allegro.tech.hermes.consumers.consumer.Message; import pl.allegro.tech.hermes.consumers.consumer.load.SubscriptionLoadRecorder; @@ -74,7 +74,9 @@ public KafkaSingleThreadedMessageReceiver( this.partitionAssignmentState = partitionAssignmentState; this.consumer = consumer; this.readQueue = new ArrayBlockingQueue<>(readQueueCapacity); - this.offsetMover = new KafkaConsumerOffsetMover(subscription.getQualifiedName(), consumer); + this.offsetMover = + new KafkaConsumerOffsetMover( + subscription.getQualifiedName(), consumer, partitionAssignmentState); Map topics = getKafkaTopics(topic, kafkaNamesMapper).stream() .collect(Collectors.toMap(t -> t.name().asString(), Function.identity())); @@ -195,6 +197,7 @@ public void commit(Set offsets) { private Map createOffset( Set partitionOffsets) { + Map offsetsData = new LinkedHashMap<>(); for (SubscriptionPartitionOffset partitionOffset : partitionOffsets) { TopicPartition topicAndPartition = @@ -223,8 +226,8 @@ private Map createOffset( } @Override - public boolean moveOffset(PartitionOffset offset) { - return offsetMover.move(offset); + public PartitionOffsets moveOffset(PartitionOffsets offsets) { + return offsetMover.move(offsets); } public Set getAssignedPartitions() { diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/Retransmitter.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/Retransmitter.java index 9ecde5980e..1df91a8b24 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/Retransmitter.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/Retransmitter.java @@ -33,36 +33,22 @@ public void reloadOffsets(SubscriptionName subscriptionName, Consumer consumer) brokersClusterName, consumer.getAssignedPartitions()); - for (PartitionOffset partitionOffset : offsets) { - if (moveOffset(subscriptionName, consumer, partitionOffset)) { - subscriptionOffsetChangeIndicator.removeOffset( - subscriptionName.getTopicName(), - subscriptionName.getName(), - brokersClusterName, - partitionOffset.getTopic(), - partitionOffset.getPartition()); - logger.info( - "Removed offset indicator for subscription={} and partition={}", - subscriptionName, - partitionOffset.getPartition()); - } + PartitionOffsets movedOffsets = consumer.moveOffset(offsets); + + for (PartitionOffset partitionOffset : movedOffsets) { + subscriptionOffsetChangeIndicator.removeOffset( + subscriptionName.getTopicName(), + subscriptionName.getName(), + brokersClusterName, + partitionOffset.getTopic(), + partitionOffset.getPartition()); + logger.info( + "Removed offset indicator for subscription={} and partition={}", + subscriptionName, + partitionOffset.getPartition()); } } catch (Exception ex) { throw new RetransmissionException(ex); } } - - private boolean moveOffset( - SubscriptionName subscriptionName, Consumer consumer, PartitionOffset partitionOffset) { - try { - return consumer.moveOffset(partitionOffset); - } catch (IllegalStateException ex) { - logger.warn( - "Cannot move offset for subscription={} and partition={} , possibly owned by different node", - subscriptionName, - partitionOffset.getPartition(), - ex); - return false; - } - } } diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerStub.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerStub.groovy index b503282b7b..e9445d016e 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerStub.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerStub.groovy @@ -2,7 +2,7 @@ package pl.allegro.tech.hermes.consumers.supervisor.process import pl.allegro.tech.hermes.api.Subscription import pl.allegro.tech.hermes.api.Topic -import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets import pl.allegro.tech.hermes.consumers.consumer.Consumer import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset @@ -78,8 +78,8 @@ class ConsumerStub implements Consumer { } @Override - boolean moveOffset(PartitionOffset partitionOffset) { - return true + PartitionOffsets moveOffset(PartitionOffsets partitionOffset) { + return partitionOffset } boolean getInitialized() { diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java index 4c92836529..faa1b7a5b4 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java @@ -35,7 +35,6 @@ import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.SubscriptionHealth; import pl.allegro.tech.hermes.api.SubscriptionMetrics; -import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.api.Topic; import pl.allegro.tech.hermes.api.TopicName; import pl.allegro.tech.hermes.management.api.auth.HermesSecurityAwareRequestUser; @@ -273,7 +272,7 @@ public Response retransmit( @Context ContainerRequestContext requestContext) { MultiDCOffsetChangeSummary summary = - multiDCAwareService.retransmit( + subscriptionService.retransmit( topicService.getTopicDetails(TopicName.fromQualifiedName(qualifiedTopicName)), subscriptionName, offsetRetransmissionDate.getRetransmissionDate().toInstant().toEpochMilli(), @@ -283,20 +282,6 @@ public Response retransmit( return Response.status(OK).entity(summary).build(); } - @POST - @Consumes(APPLICATION_JSON) - @Produces(APPLICATION_JSON) - @RolesAllowed({Roles.ADMIN}) - @Path("/{subscriptionName}/moveOffsetsToTheEnd") - public Response moveOffsetsToTheEnd( - @PathParam("topicName") String qualifiedTopicName, - @PathParam("subscriptionName") String subscriptionName) { - TopicName topicName = fromQualifiedName(qualifiedTopicName); - multiDCAwareService.moveOffsetsToTheEnd( - topicService.getTopicDetails(topicName), new SubscriptionName(subscriptionName, topicName)); - return responseStatus(OK); - } - @GET @Produces(APPLICATION_JSON) @Path("/{subscriptionName}/events/{messageId}/trace") diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java index 995c6b2370..0ec19b12a9 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java @@ -36,7 +36,6 @@ import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaBrokerTopicManagement; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerGroupManager; -import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerManager; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaRawMessageReader; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaSingleMessageReader; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.LogEndOffsetChecker; @@ -111,8 +110,7 @@ MultiDCAwareService multiDCAwareService( new LogEndOffsetChecker(consumerPool), brokerAdminClient, createConsumerGroupManager( - kafkaProperties, kafkaNamesMapper, brokerAdminClient), - createKafkaConsumerManager(kafkaProperties, kafkaNamesMapper)); + kafkaProperties, kafkaNamesMapper, brokerAdminClient)); }) .collect(toList()); @@ -138,12 +136,6 @@ private ConsumerGroupManager createConsumerGroupManager( : new NoOpConsumerGroupManager(); } - private KafkaConsumerManager createKafkaConsumerManager( - KafkaProperties kafkaProperties, KafkaNamesMapper kafkaNamesMapper) { - return new KafkaConsumerManager( - kafkaProperties, kafkaNamesMapper, kafkaProperties.getBrokerList()); - } - private SubscriptionOffsetChangeIndicator getRepository( List> repositories, KafkaProperties kafkaProperties) { diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java index 5ad8f6fc35..76e8e64546 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java @@ -6,8 +6,15 @@ public interface RetransmissionService { - List indicateOffsetChange( - Topic topic, String subscription, String brokersClusterName, long timestamp, boolean dryRun); + List fetchTopicOffsetsAt(Topic topic, Long timestamp); + + List fetchTopicEndOffsets(Topic topic); + + void indicateOffsetChange( + Topic topic, + String subscription, + String brokersClusterName, + List partitionOffsets); boolean areOffsetsMoved(Topic topic, String subscriptionName, String brokersClusterName); } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java index c1fdd819c1..cac619a239 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java @@ -51,7 +51,9 @@ import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthChecker; import pl.allegro.tech.hermes.management.domain.subscription.validator.SubscriptionValidator; import pl.allegro.tech.hermes.management.domain.topic.TopicService; +import pl.allegro.tech.hermes.management.infrastructure.kafka.MovingSubscriptionOffsetsValidationException; import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService; +import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCOffsetChangeSummary; import pl.allegro.tech.hermes.tracker.management.LogRepository; public class SubscriptionService { @@ -466,4 +468,42 @@ private List getSubscriptionsMetrics( }) .collect(toList()); } + + public MultiDCOffsetChangeSummary retransmit( + Topic topic, String subscriptionName, Long timestamp, boolean dryRun, RequestUser requester) { + Subscription subscription = getSubscriptionDetails(topic.getName(), subscriptionName); + + MultiDCOffsetChangeSummary multiDCOffsetChangeSummary = + multiDCAwareService.fetchTopicOffsetsAt(topic, timestamp); + + if (dryRun) return multiDCOffsetChangeSummary; + + /* + * The subscription state is used to determine how to move the offsets. + * When the subscription is ACTIVE, the management instance notifies consumers to change offsets. + * The consumers are responsible for moving their local offsets(KafkaConsumer::seek method) as well as committed ones on Kafka (KafkaConsumer::commitSync method). + * When the subscription is SUSPENDED, the management instance changes the commited offsets on kafka on its own (AdminClient::alterConsumerGroupOffsets). + * There is no active consumer to notify in that case. + */ + switch (subscription.getState()) { + case ACTIVE: + multiDCAwareService.moveOffsetsForActiveConsumers( + topic, + subscriptionName, + multiDCOffsetChangeSummary.getPartitionOffsetListPerBrokerName(), + requester); + break; + case SUSPENDED: + multiDCAwareService.moveOffsets( + topic, + subscriptionName, + multiDCOffsetChangeSummary.getPartitionOffsetListPerBrokerName()); + break; + case PENDING: + throw new MovingSubscriptionOffsetsValidationException( + "Cannot retransmit messages for subscription in PENDING state"); + } + + return multiDCOffsetChangeSummary; + } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java index 44117189d9..1ab6675422 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java @@ -63,8 +63,13 @@ void waitUntilAllSubscriptionsHasConsumersAssigned( private void notifySingleSubscription( Topic topic, Instant beforeMigrationInstant, String subscriptionName, RequestUser requester) { - multiDCAwareService.retransmit( - topic, subscriptionName, beforeMigrationInstant.toEpochMilli(), false, requester); + multiDCAwareService.moveOffsetsForActiveConsumers( + topic, + subscriptionName, + multiDCAwareService + .fetchTopicOffsetsAt(topic, beforeMigrationInstant.toEpochMilli()) + .getPartitionOffsetListPerBrokerName(), + requester); } private void waitUntilOffsetsAvailableOnAllKafkaTopics( diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java index 474c99d202..3b1833a5ed 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java @@ -6,8 +6,10 @@ import java.time.Duration; import java.time.Instant; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.function.Consumer; @@ -19,6 +21,7 @@ import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.api.Topic; import pl.allegro.tech.hermes.common.exception.InternalProcessingException; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; import pl.allegro.tech.hermes.management.domain.auth.RequestUser; import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor; import pl.allegro.tech.hermes.management.domain.retransmit.RetransmitCommand; @@ -62,36 +65,67 @@ public String readMessageFromPrimary( .readMessageFromPrimary(topic, partition, offset); } - public MultiDCOffsetChangeSummary retransmit( - Topic topic, String subscriptionName, Long timestamp, boolean dryRun, RequestUser requester) { + public MultiDCOffsetChangeSummary fetchTopicOffsetsAt(Topic topic, Long timestamp) { MultiDCOffsetChangeSummary multiDCOffsetChangeSummary = new MultiDCOffsetChangeSummary(); clusters.forEach( cluster -> multiDCOffsetChangeSummary.addPartitionOffsetList( - cluster.getClusterName(), - cluster.indicateOffsetChange(topic, subscriptionName, timestamp, dryRun))); - - if (!dryRun) { - logger.info( - "Starting retransmission for subscription {}. Requested by {}. Retransmission timestamp: {}", - topic.getQualifiedName() + "$" + subscriptionName, - requester.getUsername(), - timestamp); - multiDcExecutor.executeByUser( - new RetransmitCommand(new SubscriptionName(subscriptionName, topic.getName())), - requester); - clusters.forEach(clusters -> waitUntilOffsetsAreMoved(topic, subscriptionName)); - logger.info( - "Successfully moved offsets for retransmission of subscription {}. Requested by user: {}. Retransmission timestamp: {}", - topic.getQualifiedName() + "$" + subscriptionName, - requester.getUsername(), - timestamp); - } + cluster.getClusterName(), cluster.fetchTopicOffsetsAt(topic, timestamp))); return multiDCOffsetChangeSummary; } + public void moveOffsets( + Topic topic, + String subscriptionName, + Map> brokerPartitionOffsets) { + clusters.forEach( + cluster -> + cluster.validateIfOffsetsCanBeMoved( + topic, new SubscriptionName(subscriptionName, topic.getName()))); + + clusters.forEach( + cluster -> + cluster.moveOffsets( + new SubscriptionName(subscriptionName, topic.getName()), + brokerPartitionOffsets.getOrDefault( + cluster.getClusterName(), Collections.emptyList()))); + } + + public void moveOffsetsForActiveConsumers( + Topic topic, + String subscriptionName, + Map> brokerPartitionOffsets, + RequestUser requester) { + clusters.forEach( + cluster -> + cluster.validateIfOffsetsCanBeMovedByConsumers( + topic, new SubscriptionName(subscriptionName, topic.getName()))); + + clusters.forEach( + cluster -> + Optional.ofNullable(brokerPartitionOffsets.get(cluster.getClusterName())) + .ifPresent( + offsets -> cluster.indicateOffsetChange(topic, subscriptionName, offsets))); + + logger.info( + "Starting moving offsets for subscription {}. Requested by {}. Retransmission offsets: {}", + topic.getQualifiedName() + "$" + subscriptionName, + requester.getUsername(), + brokerPartitionOffsets); + + multiDcExecutor.executeByUser( + new RetransmitCommand(new SubscriptionName(subscriptionName, topic.getName())), requester); + clusters.forEach(clusters -> waitUntilOffsetsAreMoved(topic, subscriptionName)); + + logger.info( + "Successfully moved offsets for retransmission of subscription {}. Requested by user: {}. Retransmission offsets: {}", + topic.getQualifiedName() + "$" + subscriptionName, + requester.getUsername(), + brokerPartitionOffsets); + } + public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) { return clusters.stream() .allMatch(cluster -> cluster.areOffsetsAvailableOnAllKafkaTopics(topic)); @@ -166,10 +200,6 @@ public List describeConsumerGroups(Topic topic, String subscripti .collect(toList()); } - public void moveOffsetsToTheEnd(Topic topic, SubscriptionName subscription) { - clusters.forEach(c -> c.moveOffsetsToTheEnd(topic, subscription)); - } - public void deleteConsumerGroupForDatacenter( SubscriptionName subscriptionName, String datacenter) { clusters.stream() diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java index 85752100eb..185048d93f 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java @@ -29,6 +29,7 @@ import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.api.Topic; +import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId; import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper; import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; import pl.allegro.tech.hermes.management.domain.message.RetransmissionService; @@ -51,7 +52,6 @@ public class BrokersClusterService { private final ConsumerGroupsDescriber consumerGroupsDescriber; private final AdminClient adminClient; private final ConsumerGroupManager consumerGroupManager; - private final KafkaConsumerManager kafkaConsumerManager; public BrokersClusterService( String datacenter, @@ -63,8 +63,7 @@ public BrokersClusterService( OffsetsAvailableChecker offsetsAvailableChecker, LogEndOffsetChecker logEndOffsetChecker, AdminClient adminClient, - ConsumerGroupManager consumerGroupManager, - KafkaConsumerManager kafkaConsumerManager) { + ConsumerGroupManager consumerGroupManager) { this.datacenter = datacenter; this.clusterName = clusterName; this.singleMessageReader = singleMessageReader; @@ -77,7 +76,6 @@ public BrokersClusterService( kafkaNamesMapper, adminClient, logEndOffsetChecker, clusterName); this.adminClient = adminClient; this.consumerGroupManager = consumerGroupManager; - this.kafkaConsumerManager = kafkaConsumerManager; } public String getClusterName() { @@ -97,10 +95,14 @@ public String readMessageFromPrimary(Topic topic, Integer partition, Long offset topic, kafkaNamesMapper.toKafkaTopics(topic).getPrimary(), partition, offset); } - public List indicateOffsetChange( - Topic topic, String subscriptionName, Long timestamp, boolean dryRun) { - return retransmissionService.indicateOffsetChange( - topic, subscriptionName, clusterName, timestamp, dryRun); + public void indicateOffsetChange( + Topic topic, String subscriptionName, List partitionOffsets) { + retransmissionService.indicateOffsetChange( + topic, subscriptionName, clusterName, partitionOffsets); + } + + public List fetchTopicOffsetsAt(Topic topic, Long timestamp) { + return retransmissionService.fetchTopicOffsetsAt(topic, timestamp); } public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) { @@ -162,23 +164,16 @@ public Optional describeConsumerGroup(Topic topic, String subscri return consumerGroupsDescriber.describeConsumerGroup(topic, subscriptionName); } - public void moveOffsetsToTheEnd(Topic topic, SubscriptionName subscription) { - validateIfOffsetsCanBeMoved(topic, subscription); - - KafkaConsumer consumer = kafkaConsumerManager.createConsumer(subscription); - String kafkaTopicName = kafkaNamesMapper.toKafkaTopics(topic).getPrimary().name().asString(); - Set topicPartitions = getTopicPartitions(consumer, kafkaTopicName); - consumer.assign(topicPartitions); - - Map endOffsets = consumer.endOffsets(topicPartitions); - Map endOffsetsMetadata = buildOffsetsMetadata(endOffsets); - consumer.commitSync(endOffsetsMetadata); - consumer.close(); + public void moveOffsets(SubscriptionName subscription, List offsets) { + ConsumerGroupId consumerGroupId = kafkaNamesMapper.toConsumerGroupId(subscription); + Map offsetAndMetadata = buildOffsetsMetadata(offsets); + adminClient.alterConsumerGroupOffsets(consumerGroupId.asString(), offsetAndMetadata).all(); logger.info( - "Successfully moved offset to the end position for subscription {} and consumer group {}", + "Successfully moved offsets for subscription {} and consumer group {} to {}", subscription.getQualifiedName(), - kafkaNamesMapper.toConsumerGroupId(subscription)); + kafkaNamesMapper.toConsumerGroupId(subscription), + offsetAndMetadata.toString()); } private int numberOfAssignmentsForConsumersGroups(List consumerGroupsIds) @@ -193,11 +188,32 @@ private int numberOfAssignmentsForConsumersGroups(List consumerGroupsIds .size(); } - private void validateIfOffsetsCanBeMoved(Topic topic, SubscriptionName subscription) { + public void validateIfOffsetsCanBeMovedByConsumers(Topic topic, SubscriptionName subscription) { + describeConsumerGroup(topic, subscription.getName()) + .ifPresentOrElse( + group -> { + if (!group.isStable()) { + String s = + format( + "Consumer group %s for subscription %s is not stable.", + group.getGroupId(), subscription.getQualifiedName()); + throw new MovingSubscriptionOffsetsValidationException(s); + } + }, + () -> { + String s = + format( + "No consumer group for subscription %s exists.", + subscription.getQualifiedName()); + throw new MovingSubscriptionOffsetsValidationException(s); + }); + } + + public void validateIfOffsetsCanBeMoved(Topic topic, SubscriptionName subscription) { describeConsumerGroup(topic, subscription.getName()) .ifPresentOrElse( group -> { - if (!group.getMembers().isEmpty()) { + if (!group.isEmpty()) { String s = format( "Consumer group %s for subscription %s has still active members.", @@ -234,9 +250,13 @@ private Set getTopicPartitions( } private Map buildOffsetsMetadata( - Map offsets) { - return offsets.entrySet().stream() - .map(entry -> ImmutablePair.of(entry.getKey(), new OffsetAndMetadata(entry.getValue()))) + List offsets) { + return offsets.stream() + .map( + offset -> + ImmutablePair.of( + new TopicPartition(offset.getTopic().asString(), offset.getPartition()), + new OffsetAndMetadata(offset.getOffset()))) .collect(toMap(Pair::getKey, Pair::getValue)); } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java index d805d639c7..8c8cc04182 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java @@ -37,63 +37,79 @@ public KafkaRetransmissionService( } @Override - public List indicateOffsetChange( - Topic topic, String subscription, String brokersClusterName, long timestamp, boolean dryRun) { + public void indicateOffsetChange( + Topic topic, + String subscription, + String brokersClusterName, + List partitionOffsets) { + for (PartitionOffset partitionOffset : partitionOffsets) { + subscriptionOffsetChange.setSubscriptionOffset( + topic.getName(), subscription, brokersClusterName, partitionOffset); + } + } + @Override + public boolean areOffsetsMoved(Topic topic, String subscriptionName, String brokersClusterName) { + return kafkaNamesMapper + .toKafkaTopics(topic) + .allMatch( + kafkaTopic -> { + List partitionIds = + brokerStorage.readPartitionsIds(kafkaTopic.name().asString()); + return subscriptionOffsetChange.areOffsetsMoved( + topic.getName(), subscriptionName, brokersClusterName, kafkaTopic, partitionIds); + }); + } + + private KafkaConsumer createKafkaConsumer(KafkaTopic kafkaTopic, int partition) { + return consumerPool.get(kafkaTopic, partition); + } + + public List fetchTopicEndOffsets(Topic topic) { + return fetchTopicOffsetsAt(topic, null); + } + + public List fetchTopicOffsetsAt(Topic topic, Long timestamp) { List partitionOffsetList = new ArrayList<>(); kafkaNamesMapper .toKafkaTopics(topic) .forEach( k -> { List partitionsIds = brokerStorage.readPartitionsIds(k.name().asString()); - for (Integer partitionId : partitionsIds) { KafkaConsumer consumer = createKafkaConsumer(k, partitionId); - long offset = - findClosestOffsetJustBeforeTimestamp(consumer, k, partitionId, timestamp); + long offset = getOffsetForTimestampOrEnd(timestamp, k, partitionId, consumer); PartitionOffset partitionOffset = new PartitionOffset(k.name(), offset, partitionId); partitionOffsetList.add(partitionOffset); - if (!dryRun) { - subscriptionOffsetChange.setSubscriptionOffset( - topic.getName(), subscription, brokersClusterName, partitionOffset); - } } }); return partitionOffsetList; } - @Override - public boolean areOffsetsMoved(Topic topic, String subscriptionName, String brokersClusterName) { - return kafkaNamesMapper - .toKafkaTopics(topic) - .allMatch( - kafkaTopic -> { - List partitionIds = - brokerStorage.readPartitionsIds(kafkaTopic.name().asString()); - return subscriptionOffsetChange.areOffsetsMoved( - topic.getName(), subscriptionName, brokersClusterName, kafkaTopic, partitionIds); - }); - } - - private KafkaConsumer createKafkaConsumer(KafkaTopic kafkaTopic, int partition) { - return consumerPool.get(kafkaTopic, partition); + private long getOffsetForTimestampOrEnd( + Long timestamp, + KafkaTopic kafkaTopic, + Integer partitionId, + KafkaConsumer consumer) { + long endOffset = getEndingOffset(consumer, kafkaTopic, partitionId); + return Optional.ofNullable(timestamp) + .flatMap(ts -> findClosestOffsetJustBeforeTimestamp(consumer, kafkaTopic, partitionId, ts)) + .orElse(endOffset); } - private long findClosestOffsetJustBeforeTimestamp( + private Optional findClosestOffsetJustBeforeTimestamp( KafkaConsumer consumer, KafkaTopic kafkaTopic, int partition, long timestamp) { - long endOffset = getEndingOffset(consumer, kafkaTopic, partition); TopicPartition topicPartition = new TopicPartition(kafkaTopic.name().asString(), partition); return Optional.ofNullable( consumer .offsetsForTimes(Collections.singletonMap(topicPartition, timestamp)) .get(topicPartition)) - .orElse(new OffsetAndTimestamp(endOffset, timestamp)) - .offset(); + .map(OffsetAndTimestamp::offset); } private long getEndingOffset( diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/OffsetNotFoundException.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/OffsetNotFoundException.java index 6220c31f3b..616dfa8b69 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/OffsetNotFoundException.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/OffsetNotFoundException.java @@ -7,7 +7,7 @@ class OffsetNotFoundException extends ManagementException { - OffsetNotFoundException(String message) { + public OffsetNotFoundException(String message) { super(message); } diff --git a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java index c9a8293b0b..e75cc56ee3 100644 --- a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java +++ b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java @@ -119,6 +119,13 @@ public WebTestClient.ResponseSpec suspendSubscription(Topic topic, String subscr .is2xxSuccessful(); } + public WebTestClient.ResponseSpec activateSubscription(Topic topic, String subscription) { + return managementTestClient + .updateSubscriptionState(topic, subscription, Subscription.State.ACTIVE) + .expectStatus() + .is2xxSuccessful(); + } + public void waitUntilSubscriptionActivated(String topicQualifiedName, String subscriptionName) { waitAtMost(Duration.ofSeconds(10)) .untilAsserted( @@ -182,7 +189,7 @@ public void waitUntilConsumerCommitsOffset(String topicQualifiedName, String sub }); } - private long calculateCommittedMessages(String topicQualifiedName, String subscription) { + public long calculateCommittedMessages(String topicQualifiedName, String subscription) { AtomicLong messagesCommittedCount = new AtomicLong(0); List consumerGroups = getConsumerGroupsDescription(topicQualifiedName, subscription) @@ -550,9 +557,4 @@ public WebTestClient.ResponseSpec updateGroup(String groupName, Group group) { public List getGroups() { return managementTestClient.getGroups(); } - - public WebTestClient.ResponseSpec moveOffsetsToTheEnd( - String topicQualifiedName, String subscriptionName) { - return managementTestClient.moveOffsetsToTheEnd(topicQualifiedName, subscriptionName); - } } diff --git a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/ManagementTestClient.java b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/ManagementTestClient.java index 2735b5f710..9a36ee9d31 100644 --- a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/ManagementTestClient.java +++ b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/ManagementTestClient.java @@ -59,9 +59,6 @@ public class ManagementTestClient { private static final String TOPIC_PREVIEW_OFFSET = "/topics/{topicName}/preview/cluster/{brokersClusterName}/partition/{partition}/offset/{offset}"; - private static final String MOVE_SUBSCRIPTION_OFFSETS = - "/topics/{topicName}/subscriptions/{subscriptionName}/moveOffsetsToTheEnd"; - private static final String SET_READINESS = "/readiness/datacenters/{dc}"; private static final String GET_READINESS = "/readiness/datacenters"; @@ -804,15 +801,4 @@ public WebTestClient.ResponseSpec updateGroup(String groupName, Group group) { .body(Mono.just(group), Group.class) .exchange(); } - - public WebTestClient.ResponseSpec moveOffsetsToTheEnd( - String topicQualifiedName, String subscriptionName) { - return webTestClient - .post() - .uri( - UriBuilder.fromUri(managementContainerUrl) - .path(MOVE_SUBSCRIPTION_OFFSETS) - .build(topicQualifiedName, subscriptionName)) - .exchange(); - } } diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java index 94c7eb8f43..0daa5be05a 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java @@ -14,6 +14,8 @@ import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.springframework.test.web.reactive.server.WebTestClient; import pl.allegro.tech.hermes.api.ContentType; import pl.allegro.tech.hermes.api.OffsetRetransmissionDate; @@ -53,8 +55,10 @@ public class KafkaRetransmissionServiceTest { @RegisterExtension public static final TestSubscribersExtension subscribers = new TestSubscribersExtension(); - @Test - public void shouldMoveOffsetNearGivenTimestamp() throws InterruptedException { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldMoveOffsetNearGivenTimestamp(boolean suspendedSubscription) + throws InterruptedException { // given final TestSubscriber subscriber = subscribers.createSubscriber(); final Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); @@ -71,6 +75,14 @@ public void shouldMoveOffsetNearGivenTimestamp() throws InterruptedException { publishAndConsumeMessages(messages2, topic, subscriber); hermes.api().waitUntilConsumerCommitsOffset(topic.getQualifiedName(), subscription.getName()); + long commitedMessages = + hermes.api().calculateCommittedMessages(topic.getQualifiedName(), subscription.getName()); + + if (suspendedSubscription) { + hermes.api().suspendSubscription(topic, subscription.getName()); + hermes.api().waitUntilSubscriptionSuspended(topic.getQualifiedName(), subscription.getName()); + } + // when WebTestClient.ResponseSpec response = hermes @@ -80,7 +92,25 @@ public void shouldMoveOffsetNearGivenTimestamp() throws InterruptedException { // then response.expectStatus().isOk(); + // Check if Kafka committed offsets were moved on retransmission + assertThat( + hermes + .api() + .calculateCommittedMessages(topic.getQualifiedName(), subscription.getName())) + .isLessThan(commitedMessages); + + if (suspendedSubscription) { + hermes.api().activateSubscription(topic, subscription.getName()); + hermes.api().waitUntilSubscriptionActivated(topic.getQualifiedName(), subscription.getName()); + } + messages2.forEach(subscriber::waitUntilReceived); + hermes.api().waitUntilConsumerCommitsOffset(topic.getQualifiedName(), subscription.getName()); + assertThat( + hermes + .api() + .calculateCommittedMessages(topic.getQualifiedName(), subscription.getName())) + .isEqualTo(commitedMessages); } @Test diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/SubscriptionManagementTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/SubscriptionManagementTest.java index 3cf0ff68b5..c09b4cf7cd 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/SubscriptionManagementTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/SubscriptionManagementTest.java @@ -9,7 +9,6 @@ import static pl.allegro.tech.hermes.integrationtests.prometheus.SubscriptionMetrics.subscriptionMetrics; import static pl.allegro.tech.hermes.integrationtests.prometheus.TopicMetrics.topicMetrics; import static pl.allegro.tech.hermes.integrationtests.setup.HermesExtension.auditEvents; -import static pl.allegro.tech.hermes.integrationtests.setup.HermesExtension.brokerOperations; import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription; import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscriptionWithRandomName; import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topicWithRandomName; @@ -18,7 +17,6 @@ import java.time.Duration; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; @@ -39,7 +37,6 @@ import pl.allegro.tech.hermes.api.TopicName; import pl.allegro.tech.hermes.api.TopicPartition; import pl.allegro.tech.hermes.api.TrackingMode; -import pl.allegro.tech.hermes.env.BrokerOperations; import pl.allegro.tech.hermes.integrationtests.prometheus.PrometheusExtension; import pl.allegro.tech.hermes.integrationtests.setup.HermesExtension; import pl.allegro.tech.hermes.integrationtests.subscriber.TestSubscriber; @@ -760,52 +757,4 @@ public void shouldReturnInflightSizeWhenSetToNonNullValue() { // then assertThat(response.getSerialSubscriptionPolicy().getInflightSize()).isEqualTo(42); } - - @Test - public void shouldMoveOffsetsToTheEnd() { - // given - TestSubscriber subscriber = subscribers.createSubscriber(503); - Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); - Subscription subscription = - hermes - .initHelper() - .createSubscription( - subscriptionWithRandomName(topic.getName(), subscriber.getEndpoint()) - .withSubscriptionPolicy(SubscriptionPolicy.create(Map.of("messageTtl", 3600))) - .build()); - List messages = List.of(MESSAGE.body(), MESSAGE.body(), MESSAGE.body(), MESSAGE.body()); - - // prevents from moving offsets during messages sending - messages.forEach( - message -> { - hermes.api().publishUntilSuccess(topic.getQualifiedName(), message); - subscriber.waitUntilReceived(message); - }); - - assertThat(allConsumerGroupOffsetsMovedToTheEnd(subscription)).isFalse(); - - hermes.api().deleteSubscription(topic.getQualifiedName(), subscription.getName()); - - // when - waitAtMost(Duration.ofSeconds(10)) - .untilAsserted( - () -> - hermes - .api() - .moveOffsetsToTheEnd(topic.getQualifiedName(), subscription.getName()) - .expectStatus() - .isOk()); - - // then - waitAtMost(Duration.ofSeconds(10)) - .untilAsserted( - () -> assertThat(allConsumerGroupOffsetsMovedToTheEnd(subscription)).isTrue()); - } - - private boolean allConsumerGroupOffsetsMovedToTheEnd(Subscription subscription) { - List partitionsOffsets = - brokerOperations.getTopicPartitionsOffsets(subscription.getQualifiedName()); - return !partitionsOffsets.isEmpty() - && partitionsOffsets.stream().allMatch(BrokerOperations.ConsumerGroupOffset::movedToEnd); - } }