Skip to content

Commit

Permalink
Merge branch 'master' into gradle_cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
kszapsza authored Jan 13, 2025
2 parents dfe12b6 + 80ab55c commit 6fde495
Show file tree
Hide file tree
Showing 42 changed files with 526 additions and 385 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ public Set<ConsumerGroupMember> getMembers() {
return members;
}

public boolean isStable() {
return state.equals("Stable");
}

public boolean isEmpty() {
return state.equals("Empty");
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,16 @@ public boolean equals(Object obj) {
public int hashCode() {
return Objects.hash(topic, partition, offset);
}

@Override
public String toString() {
return "PartitionOffset{"
+ "topic="
+ topic
+ ", partition="
+ partition
+ ", offset="
+ offset
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ public PartitionOffsets addAll(PartitionOffsets offsets) {
public Iterator<PartitionOffset> iterator() {
return offsets.iterator();
}

public boolean isEmpty() {
return offsets.isEmpty();
}
}
8 changes: 3 additions & 5 deletions hermes-console/json-server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ server.post('/query/subscriptions', (req, res) => {
res.jsonp(subscriptions);
});

server.post('/topics/*/subscriptions/*/moveOffsetsToTheEnd', (req, res) => {
res.sendStatus(200);
});

server.post('/topicSubscriptions', (req, res) => {
res.sendStatus(200);
});
Expand Down Expand Up @@ -83,7 +79,9 @@ server.post('/offline-retransmission/tasks', (req, res) => {
server.put(
'/topics/:topic/subscriptions/:subscroption/retransmission',
(req, res) => {
res.sendStatus(200);
setTimeout(() => {
res.sendStatus(200);
}, 2000);
},
);

Expand Down
9 changes: 0 additions & 9 deletions hermes-console/src/api/hermes-client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,15 +310,6 @@ export function fetchDashboardUrl(path: string): ResponsePromise<DashboardUrl> {
return axios.get<DashboardUrl>(path);
}

export function moveSubscriptionOffsets(
topicName: string,
subscription: string,
): ResponsePromise<null> {
return axios.post<null>(
`/topics/${topicName}/subscriptions/${subscription}/moveOffsetsToTheEnd`,
);
}

export function removeTopic(topic: String): ResponsePromise<void> {
return axios.delete(`/topics/${topic}`);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,9 @@ import { createTestingPinia } from '@pinia/testing';
import { dummyConsumerGroups } from '@/dummy/consumerGroups';
import { dummySubscription } from '@/dummy/subscription';
import { dummyTopic } from '@/dummy/topic';
import {
expectNotificationDispatched,
notificationStoreSpy,
} from '@/utils/test-utils';
import {
fetchConsumerGroupsErrorHandler,
fetchConsumerGroupsHandler,
moveSubscriptionOffsetsHandler,
} from '@/mocks/handlers';
import { setActivePinia } from 'pinia';
import { setupServer } from 'msw/node';
Expand Down Expand Up @@ -81,56 +76,4 @@ describe('useConsumerGroups', () => {
expect(error.value.fetchConsumerGroups).not.toBeNull();
});
});

it('should show message that moving offsets was successful', async () => {
// given
server.use(
moveSubscriptionOffsetsHandler({
topicName,
subscriptionName,
statusCode: 200,
}),
);
server.listen();
const notificationStore = notificationStoreSpy();

const { moveOffsets } = useConsumerGroups(topicName, subscriptionName);

// when
moveOffsets();

// then
await waitFor(() => {
expectNotificationDispatched(notificationStore, {
type: 'success',
title: 'notifications.subscriptionOffsets.move.success',
});
});
});

it('should show message that moving offsets was unsuccessful', async () => {
// given
server.use(
moveSubscriptionOffsetsHandler({
topicName,
subscriptionName,
statusCode: 500,
}),
);
server.listen();

const notificationStore = notificationStoreSpy();
const { moveOffsets } = useConsumerGroups(topicName, subscriptionName);

// when
moveOffsets();

// then
await waitFor(() => {
expectNotificationDispatched(notificationStore, {
type: 'error',
title: 'notifications.subscriptionOffsets.move.failure',
});
});
});
});
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
import { dispatchErrorNotification } from '@/utils/notification-utils';
import {
fetchConsumerGroups as getConsumerGroups,
moveSubscriptionOffsets,
} from '@/api/hermes-client';
import { fetchConsumerGroups as getConsumerGroups } from '@/api/hermes-client';
import { ref } from 'vue';
import { useGlobalI18n } from '@/i18n';
import { useNotificationsStore } from '@/store/app-notifications/useAppNotifications';
import type { ConsumerGroup } from '@/api/consumer-group';
import type { Ref } from 'vue';

export interface UseConsumerGroups {
consumerGroups: Ref<ConsumerGroup[] | undefined>;
moveOffsets: () => void;
loading: Ref<boolean>;
error: Ref<UseConsumerGroupsErrors>;
}
Expand Down Expand Up @@ -43,36 +36,10 @@ export function useConsumerGroups(
}
};

const moveOffsets = async () => {
const notificationsStore = useNotificationsStore();
try {
await moveSubscriptionOffsets(topicName, subscriptionName);
await notificationsStore.dispatchNotification({
title: useGlobalI18n().t(
'notifications.subscriptionOffsets.move.success',
{
subscriptionName,
},
),
text: '',
type: 'success',
});
} catch (e: any) {
await dispatchErrorNotification(
e,
notificationsStore,
useGlobalI18n().t('notifications.subscriptionOffsets.move.failure', {
subscriptionName,
}),
);
}
};

fetchConsumerGroups();

return {
consumerGroups,
moveOffsets,
loading,
error,
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { afterEach } from 'vitest';
import { afterEach, expect } from 'vitest';
import {
createRetransmissionHandler,
fetchSubscriptionErrorHandler,
Expand All @@ -18,7 +18,6 @@ import {
dummySubscriptionHealth,
dummySubscriptionMetrics,
} from '@/dummy/subscription';
import { expect } from 'vitest';
import {
expectNotificationDispatched,
notificationStoreSpy,
Expand Down Expand Up @@ -391,6 +390,39 @@ describe('useSubscription', () => {
});
});

[200, 500].forEach((statusCode) => {
it(`should correctly manage the state of retransmission with status code ${statusCode}`, async () => {
// given
server.use(
createRetransmissionHandler({
topicName: dummySubscription.topicName,
subscriptionName: dummySubscription.name,
statusCode,
delayMs: 100,
}),
);
server.listen();

const { retransmitMessages, retransmitting } = useSubscription(
dummySubscription.topicName,
dummySubscription.name,
);

expect(retransmitting.value).toBeFalsy();

// when
retransmitMessages(new Date().toISOString());

// then
await waitFor(() => {
expect(retransmitting.value).toBeTruthy();
});
await waitFor(() => {
expect(retransmitting.value).toBeFalsy();
});
});
});

it('should show message that skipping all messages was successful', async () => {
// given
server.use(
Expand Down Expand Up @@ -448,6 +480,39 @@ describe('useSubscription', () => {
});
});
});

[200, 500].forEach((statusCode) => {
it(`should correctly manage the state of skipping all messages with status code ${statusCode}`, async () => {
// given
server.use(
createRetransmissionHandler({
topicName: dummySubscription.topicName,
subscriptionName: dummySubscription.name,
statusCode,
delayMs: 100,
}),
);
server.listen();

const { skipAllMessages, skippingAllMessages } = useSubscription(
dummySubscription.topicName,
dummySubscription.name,
);

expect(skippingAllMessages.value).toBeFalsy();

// when
skipAllMessages();

// then
await waitFor(() => {
expect(skippingAllMessages.value).toBeTruthy();
});
await waitFor(() => {
expect(skippingAllMessages.value).toBeFalsy();
});
});
});
});

function expectErrors(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ export interface UseSubscription {
subscriptionLastUndeliveredMessage: Ref<SentMessageTrace | null>;
trackingUrls: Ref<TrackingUrl[] | undefined>;
loading: Ref<boolean>;
retransmitting: Ref<boolean>;
skippingAllMessages: Ref<boolean>;
error: Ref<UseSubscriptionsErrors>;
removeSubscription: () => Promise<boolean>;
suspendSubscription: () => Promise<boolean>;
Expand Down Expand Up @@ -73,7 +75,8 @@ export function useSubscription(
fetchSubscriptionLastUndeliveredMessage: null,
getSubscriptionTrackingUrls: null,
});

const retransmitting = ref(false);
const skippingAllMessages = ref(false);
const fetchSubscription = async () => {
try {
loading.value = true;
Expand Down Expand Up @@ -233,6 +236,7 @@ export function useSubscription(
};

const retransmitMessages = async (from: string): Promise<boolean> => {
retransmitting.value = true;
try {
await retransmitSubscriptionMessages(topicName, subscriptionName, {
retransmissionDate: from,
Expand All @@ -257,10 +261,13 @@ export function useSubscription(
}),
);
return false;
} finally {
retransmitting.value = false;
}
};

const skipAllMessages = async (): Promise<boolean> => {
skippingAllMessages.value = true;
const tomorrowDate = new Date();
tomorrowDate.setDate(tomorrowDate.getDate() + 1);
try {
Expand Down Expand Up @@ -290,6 +297,8 @@ export function useSubscription(
),
);
return false;
} finally {
skippingAllMessages.value = false;
}
};

Expand All @@ -305,6 +314,8 @@ export function useSubscription(
subscriptionLastUndeliveredMessage,
trackingUrls,
loading,
retransmitting,
skippingAllMessages,
error,
removeSubscription,
suspendSubscription,
Expand Down
4 changes: 0 additions & 4 deletions hermes-console/src/i18n/en-US/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -625,10 +625,6 @@ const en_US = {
reason: 'Reason',
timestamp: 'Timestamp',
},
moveOffsets: {
tooltip: 'Move subscription offsets to the end',
button: 'MOVE OFFSETS',
},
},
search: {
collection: {
Expand Down
Loading

0 comments on commit 6fde495

Please sign in to comment.