Skip to content

Commit

Permalink
[ISSUE #8463] Some statistical items should also be deleted to preven…
Browse files Browse the repository at this point in the history
…t memory leakage when a topic or group is deleted (#8464)

* Some important statistical items should also be deleted to prevent memory leakage when a topic or group is deleted

* Add UTs
  • Loading branch information
RongtongJin authored Aug 1, 2024
1 parent 69334a7 commit 3696be0
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,7 @@ public class Stats {
public static final String GROUP_GET_FALL_SIZE = "GROUP_GET_FALL_SIZE";
public static final String GROUP_GET_FALL_TIME = "GROUP_GET_FALL_TIME";
public static final String GROUP_GET_LATENCY = "GROUP_GET_LATENCY";
public static final String TOPIC_PUT_LATENCY = "TOPIC_PUT_LATENCY";
public static final String GROUP_ACK_NUMS = "GROUP_ACK_NUMS";
public static final String GROUP_CK_NUMS = "GROUP_CK_NUMS";
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ public class BrokerStatsManager {
@Deprecated public static final String COMMERCIAL_PERM_FAILURES = Stats.COMMERCIAL_PERM_FAILURES;

// Send message latency
public static final String TOPIC_PUT_LATENCY = "TOPIC_PUT_LATENCY";
public static final String GROUP_ACK_NUMS = "GROUP_ACK_NUMS";
public static final String GROUP_CK_NUMS = "GROUP_CK_NUMS";
@Deprecated public static final String TOPIC_PUT_LATENCY = "TOPIC_PUT_LATENCY";
@Deprecated public static final String GROUP_ACK_NUMS = "GROUP_ACK_NUMS";
@Deprecated public static final String GROUP_CK_NUMS = "GROUP_CK_NUMS";
public static final String DLQ_PUT_NUMS = "DLQ_PUT_NUMS";
public static final String BROKER_ACK_NUMS = "BROKER_ACK_NUMS";
public static final String BROKER_CK_NUMS = "BROKER_CK_NUMS";
Expand Down Expand Up @@ -179,10 +179,10 @@ public void init() {
this.statsTable.put(Stats.TOPIC_PUT_SIZE, new StatsItemSet(Stats.TOPIC_PUT_SIZE, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_GET_NUMS, new StatsItemSet(Stats.GROUP_GET_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_GET_SIZE, new StatsItemSet(Stats.GROUP_GET_SIZE, this.scheduledExecutorService, log));
this.statsTable.put(GROUP_ACK_NUMS, new StatsItemSet(GROUP_ACK_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(GROUP_CK_NUMS, new StatsItemSet(GROUP_CK_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_ACK_NUMS, new StatsItemSet(Stats.GROUP_ACK_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_CK_NUMS, new StatsItemSet(Stats.GROUP_CK_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_GET_LATENCY, new StatsItemSet(Stats.GROUP_GET_LATENCY, this.scheduledExecutorService, log));
this.statsTable.put(TOPIC_PUT_LATENCY, new StatsItemSet(TOPIC_PUT_LATENCY, this.scheduledExecutorService, log));
this.statsTable.put(Stats.TOPIC_PUT_LATENCY, new StatsItemSet(Stats.TOPIC_PUT_LATENCY, this.scheduledExecutorService, log));
this.statsTable.put(Stats.SNDBCK_PUT_NUMS, new StatsItemSet(Stats.SNDBCK_PUT_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(DLQ_PUT_NUMS, new StatsItemSet(DLQ_PUT_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.BROKER_PUT_NUMS, new StatsItemSet(Stats.BROKER_PUT_NUMS, this.scheduledExecutorService, log));
Expand Down Expand Up @@ -338,17 +338,22 @@ public void onTopicDeleted(final String topic) {
}
this.statsTable.get(Stats.GROUP_GET_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(Stats.GROUP_GET_SIZE).delValueByPrefixKey(topic, "@");
this.statsTable.get(Stats.GROUP_CK_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(Stats.GROUP_ACK_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(Stats.QUEUE_GET_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(Stats.QUEUE_GET_SIZE).delValueByPrefixKey(topic, "@");
this.statsTable.get(Stats.SNDBCK_PUT_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(Stats.GROUP_GET_LATENCY).delValueByInfixKey(topic, "@");
this.statsTable.get(Stats.TOPIC_PUT_LATENCY).delValueBySuffixKey(topic, "@");
this.momentStatsItemSetFallSize.delValueByInfixKey(topic, "@");
this.momentStatsItemSetFallTime.delValueByInfixKey(topic, "@");
}

public void onGroupDeleted(final String group) {
this.statsTable.get(Stats.GROUP_GET_NUMS).delValueBySuffixKey(group, "@");
this.statsTable.get(Stats.GROUP_GET_SIZE).delValueBySuffixKey(group, "@");
this.statsTable.get(Stats.GROUP_CK_NUMS).delValueBySuffixKey(group, "@");
this.statsTable.get(Stats.GROUP_ACK_NUMS).delValueBySuffixKey(group, "@");
if (enableQueueStat) {
this.statsTable.get(Stats.QUEUE_GET_NUMS).delValueBySuffixKey(group, "@");
this.statsTable.get(Stats.QUEUE_GET_SIZE).delValueBySuffixKey(group, "@");
Expand Down Expand Up @@ -434,12 +439,12 @@ public void incGroupGetNums(final String group, final String topic, final int in

public void incGroupCkNums(final String group, final String topic, final int incValue) {
final String statsKey = buildStatsKey(topic, group);
this.statsTable.get(GROUP_CK_NUMS).addValue(statsKey, incValue, 1);
this.statsTable.get(Stats.GROUP_CK_NUMS).addValue(statsKey, incValue, 1);
}

public void incGroupAckNums(final String group, final String topic, final int incValue) {
final String statsKey = buildStatsKey(topic, group);
this.statsTable.get(GROUP_ACK_NUMS).addValue(statsKey, incValue, 1);
this.statsTable.get(Stats.GROUP_ACK_NUMS).addValue(statsKey, incValue, 1);
}

public String buildStatsKey(String topic, String group) {
Expand Down Expand Up @@ -509,9 +514,8 @@ public void incTopicPutLatency(final String topic, final int queueId, final int
statsKey = new StringBuilder(6);
}
statsKey.append(queueId).append("@").append(topic);
this.statsTable.get(TOPIC_PUT_LATENCY).addValue(statsKey.toString(), incValue, 1);
this.statsTable.get(Stats.TOPIC_PUT_LATENCY).addValue(statsKey.toString(), incValue, 1);
}

public void incBrokerPutNums() {
this.statsTable.get(Stats.BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.junit.Test;

import static org.apache.rocketmq.common.stats.Stats.BROKER_PUT_NUMS;
import static org.apache.rocketmq.common.stats.Stats.GROUP_ACK_NUMS;
import static org.apache.rocketmq.common.stats.Stats.GROUP_CK_NUMS;
import static org.apache.rocketmq.common.stats.Stats.GROUP_GET_FALL_SIZE;
import static org.apache.rocketmq.common.stats.Stats.GROUP_GET_FALL_TIME;
import static org.apache.rocketmq.common.stats.Stats.GROUP_GET_LATENCY;
Expand All @@ -34,6 +36,7 @@
import static org.apache.rocketmq.common.stats.Stats.QUEUE_PUT_NUMS;
import static org.apache.rocketmq.common.stats.Stats.QUEUE_PUT_SIZE;
import static org.apache.rocketmq.common.stats.Stats.SNDBCK_PUT_NUMS;
import static org.apache.rocketmq.common.stats.Stats.TOPIC_PUT_LATENCY;
import static org.apache.rocketmq.common.stats.Stats.TOPIC_PUT_NUMS;
import static org.apache.rocketmq.common.stats.Stats.TOPIC_PUT_SIZE;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -139,8 +142,11 @@ public void testOnTopicDeleted() {
brokerStatsManager.incTopicPutSize(TOPIC, 100);
brokerStatsManager.incQueuePutNums(TOPIC, QUEUE_ID);
brokerStatsManager.incQueuePutSize(TOPIC, QUEUE_ID, 100);
brokerStatsManager.incTopicPutLatency(TOPIC, QUEUE_ID, 10);
brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100);
brokerStatsManager.incGroupCkNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.incGroupAckNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.incQueueGetNums(GROUP_NAME, TOPIC, QUEUE_ID, 1);
brokerStatsManager.incQueueGetSize(GROUP_NAME, TOPIC, QUEUE_ID, 100);
brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC);
Expand All @@ -162,6 +168,9 @@ public void testOnTopicDeleted() {
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_CK_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_ACK_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_LATENCY, QUEUE_ID + "@" + TOPIC));
}

@Test
Expand All @@ -174,6 +183,8 @@ public void testOnGroupDeleted() {
brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L);
brokerStatsManager.recordDiskFallBehindSize(GROUP_NAME, TOPIC, 1, 11L);
brokerStatsManager.incGroupCkNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.incGroupAckNums(GROUP_NAME, TOPIC, 1);

brokerStatsManager.onGroupDeleted(GROUP_NAME);

Expand All @@ -185,6 +196,8 @@ public void testOnGroupDeleted() {
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_CK_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_ACK_NUMS, TOPIC + "@" + GROUP_NAME));
}

@Test
Expand Down

0 comments on commit 3696be0

Please sign in to comment.