Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #8463] Some statistical items should also be deleted to prevent memory leakage when a topic or group is deleted #8464

Merged
merged 2 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,7 @@ public class Stats {
public static final String GROUP_GET_FALL_SIZE = "GROUP_GET_FALL_SIZE";
public static final String GROUP_GET_FALL_TIME = "GROUP_GET_FALL_TIME";
public static final String GROUP_GET_LATENCY = "GROUP_GET_LATENCY";
public static final String TOPIC_PUT_LATENCY = "TOPIC_PUT_LATENCY";
public static final String GROUP_ACK_NUMS = "GROUP_ACK_NUMS";
public static final String GROUP_CK_NUMS = "GROUP_CK_NUMS";
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ public class BrokerStatsManager {
@Deprecated public static final String COMMERCIAL_PERM_FAILURES = Stats.COMMERCIAL_PERM_FAILURES;

// Send message latency
public static final String TOPIC_PUT_LATENCY = "TOPIC_PUT_LATENCY";
public static final String GROUP_ACK_NUMS = "GROUP_ACK_NUMS";
public static final String GROUP_CK_NUMS = "GROUP_CK_NUMS";
@Deprecated public static final String TOPIC_PUT_LATENCY = "TOPIC_PUT_LATENCY";
@Deprecated public static final String GROUP_ACK_NUMS = "GROUP_ACK_NUMS";
@Deprecated public static final String GROUP_CK_NUMS = "GROUP_CK_NUMS";
public static final String DLQ_PUT_NUMS = "DLQ_PUT_NUMS";
public static final String BROKER_ACK_NUMS = "BROKER_ACK_NUMS";
public static final String BROKER_CK_NUMS = "BROKER_CK_NUMS";
Expand Down Expand Up @@ -179,10 +179,10 @@ public void init() {
this.statsTable.put(Stats.TOPIC_PUT_SIZE, new StatsItemSet(Stats.TOPIC_PUT_SIZE, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_GET_NUMS, new StatsItemSet(Stats.GROUP_GET_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_GET_SIZE, new StatsItemSet(Stats.GROUP_GET_SIZE, this.scheduledExecutorService, log));
this.statsTable.put(GROUP_ACK_NUMS, new StatsItemSet(GROUP_ACK_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(GROUP_CK_NUMS, new StatsItemSet(GROUP_CK_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_ACK_NUMS, new StatsItemSet(Stats.GROUP_ACK_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_CK_NUMS, new StatsItemSet(Stats.GROUP_CK_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_GET_LATENCY, new StatsItemSet(Stats.GROUP_GET_LATENCY, this.scheduledExecutorService, log));
this.statsTable.put(TOPIC_PUT_LATENCY, new StatsItemSet(TOPIC_PUT_LATENCY, this.scheduledExecutorService, log));
this.statsTable.put(Stats.TOPIC_PUT_LATENCY, new StatsItemSet(Stats.TOPIC_PUT_LATENCY, this.scheduledExecutorService, log));
this.statsTable.put(Stats.SNDBCK_PUT_NUMS, new StatsItemSet(Stats.SNDBCK_PUT_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(DLQ_PUT_NUMS, new StatsItemSet(DLQ_PUT_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.BROKER_PUT_NUMS, new StatsItemSet(Stats.BROKER_PUT_NUMS, this.scheduledExecutorService, log));
Expand Down Expand Up @@ -338,17 +338,22 @@ public void onTopicDeleted(final String topic) {
}
this.statsTable.get(Stats.GROUP_GET_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(Stats.GROUP_GET_SIZE).delValueByPrefixKey(topic, "@");
this.statsTable.get(Stats.GROUP_CK_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(Stats.GROUP_ACK_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(Stats.QUEUE_GET_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(Stats.QUEUE_GET_SIZE).delValueByPrefixKey(topic, "@");
this.statsTable.get(Stats.SNDBCK_PUT_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(Stats.GROUP_GET_LATENCY).delValueByInfixKey(topic, "@");
this.statsTable.get(Stats.TOPIC_PUT_LATENCY).delValueBySuffixKey(topic, "@");
this.momentStatsItemSetFallSize.delValueByInfixKey(topic, "@");
this.momentStatsItemSetFallTime.delValueByInfixKey(topic, "@");
}

public void onGroupDeleted(final String group) {
this.statsTable.get(Stats.GROUP_GET_NUMS).delValueBySuffixKey(group, "@");
this.statsTable.get(Stats.GROUP_GET_SIZE).delValueBySuffixKey(group, "@");
this.statsTable.get(Stats.GROUP_CK_NUMS).delValueBySuffixKey(group, "@");
this.statsTable.get(Stats.GROUP_ACK_NUMS).delValueBySuffixKey(group, "@");
if (enableQueueStat) {
this.statsTable.get(Stats.QUEUE_GET_NUMS).delValueBySuffixKey(group, "@");
this.statsTable.get(Stats.QUEUE_GET_SIZE).delValueBySuffixKey(group, "@");
Expand Down Expand Up @@ -434,12 +439,12 @@ public void incGroupGetNums(final String group, final String topic, final int in

public void incGroupCkNums(final String group, final String topic, final int incValue) {
final String statsKey = buildStatsKey(topic, group);
this.statsTable.get(GROUP_CK_NUMS).addValue(statsKey, incValue, 1);
this.statsTable.get(Stats.GROUP_CK_NUMS).addValue(statsKey, incValue, 1);
}

public void incGroupAckNums(final String group, final String topic, final int incValue) {
final String statsKey = buildStatsKey(topic, group);
this.statsTable.get(GROUP_ACK_NUMS).addValue(statsKey, incValue, 1);
this.statsTable.get(Stats.GROUP_ACK_NUMS).addValue(statsKey, incValue, 1);
}

public String buildStatsKey(String topic, String group) {
Expand Down Expand Up @@ -509,9 +514,8 @@ public void incTopicPutLatency(final String topic, final int queueId, final int
statsKey = new StringBuilder(6);
}
statsKey.append(queueId).append("@").append(topic);
this.statsTable.get(TOPIC_PUT_LATENCY).addValue(statsKey.toString(), incValue, 1);
this.statsTable.get(Stats.TOPIC_PUT_LATENCY).addValue(statsKey.toString(), incValue, 1);
}

public void incBrokerPutNums() {
this.statsTable.get(Stats.BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.junit.Test;

import static org.apache.rocketmq.common.stats.Stats.BROKER_PUT_NUMS;
import static org.apache.rocketmq.common.stats.Stats.GROUP_ACK_NUMS;
import static org.apache.rocketmq.common.stats.Stats.GROUP_CK_NUMS;
import static org.apache.rocketmq.common.stats.Stats.GROUP_GET_FALL_SIZE;
import static org.apache.rocketmq.common.stats.Stats.GROUP_GET_FALL_TIME;
import static org.apache.rocketmq.common.stats.Stats.GROUP_GET_LATENCY;
Expand All @@ -34,6 +36,7 @@
import static org.apache.rocketmq.common.stats.Stats.QUEUE_PUT_NUMS;
import static org.apache.rocketmq.common.stats.Stats.QUEUE_PUT_SIZE;
import static org.apache.rocketmq.common.stats.Stats.SNDBCK_PUT_NUMS;
import static org.apache.rocketmq.common.stats.Stats.TOPIC_PUT_LATENCY;
import static org.apache.rocketmq.common.stats.Stats.TOPIC_PUT_NUMS;
import static org.apache.rocketmq.common.stats.Stats.TOPIC_PUT_SIZE;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -139,8 +142,11 @@ public void testOnTopicDeleted() {
brokerStatsManager.incTopicPutSize(TOPIC, 100);
brokerStatsManager.incQueuePutNums(TOPIC, QUEUE_ID);
brokerStatsManager.incQueuePutSize(TOPIC, QUEUE_ID, 100);
brokerStatsManager.incTopicPutLatency(TOPIC, QUEUE_ID, 10);
brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100);
brokerStatsManager.incGroupCkNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.incGroupAckNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.incQueueGetNums(GROUP_NAME, TOPIC, QUEUE_ID, 1);
brokerStatsManager.incQueueGetSize(GROUP_NAME, TOPIC, QUEUE_ID, 100);
brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC);
Expand All @@ -162,6 +168,9 @@ public void testOnTopicDeleted() {
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_CK_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_ACK_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_LATENCY, QUEUE_ID + "@" + TOPIC));
}

@Test
Expand All @@ -174,6 +183,8 @@ public void testOnGroupDeleted() {
brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L);
brokerStatsManager.recordDiskFallBehindSize(GROUP_NAME, TOPIC, 1, 11L);
brokerStatsManager.incGroupCkNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.incGroupAckNums(GROUP_NAME, TOPIC, 1);

brokerStatsManager.onGroupDeleted(GROUP_NAME);

Expand All @@ -185,6 +196,8 @@ public void testOnGroupDeleted() {
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_CK_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_ACK_NUMS, TOPIC + "@" + GROUP_NAME));
}

@Test
Expand Down
Loading