From 120ad5a565ef4a428cb09b58e6555ca7512c534c Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Mon, 27 Nov 2023 11:36:33 +0800 Subject: [PATCH] [ISSUE #7585] Support message filtering in rocketmq tiered storage --- .../tieredstore/TieredMessageFetcher.java | 280 +++++++----------- .../common/GetMessageResultExt.java | 75 +++++ .../common/SelectBufferResult.java | 51 ++++ ...er.java => SelectBufferResultWrapper.java} | 53 ++-- .../tieredstore/file/CompositeFlatFile.java | 4 + .../metrics/TieredStoreMetricsManager.java | 4 +- .../provider/TieredFileSegment.java | 2 +- .../tieredstore/util/MessageBufferUtil.java | 71 +++-- .../tieredstore/TieredMessageFetcherTest.java | 9 +- .../util/MessageBufferUtilTest.java | 19 +- 10 files changed, 326 insertions(+), 242 deletions(-) create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java create mode 100644 tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java rename tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/{SelectMappedBufferResultWrapper.java => SelectBufferResultWrapper.java} (55%) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java index f739773eb346..a48560fab09a 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java @@ -19,17 +19,14 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Scheduler; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; -import com.google.common.collect.Sets; import io.opentelemetry.api.common.Attributes; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; import org.apache.commons.lang3.tuple.Pair; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.message.MessageQueue; @@ -40,12 +37,13 @@ import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.QueryMessageResult; import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.apache.rocketmq.tieredstore.common.GetMessageResultExt; import org.apache.rocketmq.tieredstore.common.InFlightRequestFuture; import org.apache.rocketmq.tieredstore.common.MessageCacheKey; -import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper; +import org.apache.rocketmq.tieredstore.common.SelectBufferResult; +import org.apache.rocketmq.tieredstore.common.SelectBufferResultWrapper; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; -import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode; import org.apache.rocketmq.tieredstore.exception.TieredStoreException; import org.apache.rocketmq.tieredstore.file.CompositeFlatFile; import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile; @@ -66,10 +64,10 @@ public class TieredMessageFetcher implements MessageStoreFetcher { private static final Logger LOGGER = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME); private final String brokerName; - private final TieredMessageStoreConfig storeConfig; private final TieredMetadataStore metadataStore; + private final TieredMessageStoreConfig storeConfig; private final TieredFlatFileManager flatFileManager; - private final Cache readAheadCache; + private final Cache readAheadCache; public TieredMessageFetcher(TieredMessageStoreConfig storeConfig) { this.storeConfig = storeConfig; @@ -79,7 +77,7 @@ public TieredMessageFetcher(TieredMessageStoreConfig storeConfig) { this.readAheadCache = this.initCache(storeConfig); } - private Cache initCache(TieredMessageStoreConfig storeConfig) { + private Cache initCache(TieredMessageStoreConfig storeConfig) { long memoryMaxSize = (long) (Runtime.getRuntime().maxMemory() * storeConfig.getReadAheadCacheSizeThresholdRate()); @@ -88,60 +86,35 @@ private Cache initCache(Tiered .expireAfterWrite(storeConfig.getReadAheadCacheExpireDuration(), TimeUnit.MILLISECONDS) .maximumWeight(memoryMaxSize) // Using the buffer size of messages to calculate memory usage - .weigher((MessageCacheKey key, SelectMappedBufferResultWrapper msg) -> msg.getDuplicateResult().getSize()) + .weigher((MessageCacheKey key, SelectBufferResultWrapper msg) -> msg.getBufferSize()) .recordStats() .build(); } - protected SelectMappedBufferResultWrapper putMessageToCache(CompositeFlatFile flatFile, - long queueOffset, SelectMappedBufferResult result, long minOffset, long maxOffset, int size) { - - return putMessageToCache(flatFile, queueOffset, result, minOffset, maxOffset, size, false); - } - - protected SelectMappedBufferResultWrapper putMessageToCache(CompositeFlatFile flatFile, - long queueOffset, SelectMappedBufferResult result, long minOffset, long maxOffset, int size, boolean used) { - - SelectMappedBufferResultWrapper wrapper = - new SelectMappedBufferResultWrapper(result, queueOffset, minOffset, maxOffset, size); - if (used) { - wrapper.addAccessCount(); - } - readAheadCache.put(new MessageCacheKey(flatFile, queueOffset), wrapper); - return wrapper; - } - - // Visible for metrics monitor - public Cache getMessageCache() { + @VisibleForTesting + public Cache getMessageCache() { return readAheadCache; } - // Waiting for the request in transit to complete - protected CompletableFuture getMessageFromCacheAsync( - CompositeQueueFlatFile flatFile, String group, long queueOffset, int maxCount) { - - return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount, true); + protected void putMessageToCache(CompositeFlatFile flatFile, SelectBufferResultWrapper result) { + readAheadCache.put(new MessageCacheKey(flatFile, result.getOffset()), result); } - @Nullable - protected SelectMappedBufferResultWrapper getMessageFromCache(CompositeFlatFile flatFile, long queueOffset) { - MessageCacheKey cacheKey = new MessageCacheKey(flatFile, queueOffset); - return readAheadCache.getIfPresent(cacheKey); + protected SelectBufferResultWrapper getMessageFromCache(CompositeFlatFile flatFile, long offset) { + return readAheadCache.getIfPresent(new MessageCacheKey(flatFile, offset)); } - protected void recordCacheAccess(CompositeFlatFile flatFile, String group, long queueOffset, - List resultWrapperList) { - if (resultWrapperList.size() > 0) { - queueOffset = resultWrapperList.get(resultWrapperList.size() - 1).getCurOffset(); + protected void recordCacheAccess(CompositeFlatFile flatFile, + String group, long offset, List resultWrapperList) { + if (!resultWrapperList.isEmpty()) { + offset = resultWrapperList.get(resultWrapperList.size() - 1).getOffset(); } - flatFile.recordGroupAccess(group, queueOffset); - for (SelectMappedBufferResultWrapper wrapper : resultWrapperList) { - wrapper.addAccessCount(); - if (wrapper.getAccessCount() >= flatFile.getActiveGroupCount()) { - MessageCacheKey cacheKey = new MessageCacheKey(flatFile, wrapper.getCurOffset()); - readAheadCache.invalidate(cacheKey); + flatFile.recordGroupAccess(group, offset); + resultWrapperList.forEach(wrapper -> { + if (wrapper.incrementAndGet() >= flatFile.getActiveGroupCount()) { + readAheadCache.invalidate(new MessageCacheKey(flatFile, wrapper.getOffset())); } - } + }); } private void prefetchMessage(CompositeQueueFlatFile flatFile, String group, int maxCount, long nextBeginOffset) { @@ -149,7 +122,6 @@ private void prefetchMessage(CompositeQueueFlatFile flatFile, String group, int return; } - MessageQueue mq = flatFile.getMessageQueue(); // make sure there is only one request per group and request range int prefetchBatchSize = Math.min(maxCount * flatFile.getReadAheadFactor(), storeConfig.getReadAheadMessageCountThreshold()); InFlightRequestFuture inflightRequest = flatFile.getInflightRequest(group, nextBeginOffset, prefetchBatchSize); @@ -166,13 +138,8 @@ private void prefetchMessage(CompositeQueueFlatFile flatFile, String group, int long maxOffsetOfLastRequest = inflightRequest.getLastFuture().join(); boolean lastRequestIsExpired = getMessageFromCache(flatFile, nextBeginOffset) == null; - // if message fetch by last request is expired, we need to prefetch the message from tiered store - int cacheRemainCount = (int) (maxOffsetOfLastRequest - nextBeginOffset); - LOGGER.debug("TieredMessageFetcher#preFetchMessage: group={}, nextBeginOffset={}, maxOffsetOfLastRequest={}, lastRequestIsExpired={}, cacheRemainCount={}", - group, nextBeginOffset, maxOffsetOfLastRequest, lastRequestIsExpired, cacheRemainCount); - - if (lastRequestIsExpired - || maxOffsetOfLastRequest != -1L && nextBeginOffset >= inflightRequest.getStartOffset()) { + if (lastRequestIsExpired || + maxOffsetOfLastRequest != -1L && nextBeginOffset >= inflightRequest.getStartOffset()) { long queueOffset; if (lastRequestIsExpired) { @@ -196,12 +163,12 @@ private void prefetchMessage(CompositeQueueFlatFile flatFile, String group, int long nextQueueOffset = queueOffset; if (flag == 1) { int firstBatchSize = factor % storeConfig.getReadAheadBatchSizeFactorThreshold() * maxCount; - CompletableFuture future = prefetchMessageThenPutToCache(flatFile, mq, nextQueueOffset, firstBatchSize); + CompletableFuture future = prefetchMessageThenPutToCache(flatFile, nextQueueOffset, firstBatchSize); futureList.add(Pair.of(firstBatchSize, future)); nextQueueOffset += firstBatchSize; } for (long i = 0; i < concurrency - flag; i++) { - CompletableFuture future = prefetchMessageThenPutToCache(flatFile, mq, nextQueueOffset + i * requestBatchSize, requestBatchSize); + CompletableFuture future = prefetchMessageThenPutToCache(flatFile, nextQueueOffset + i * requestBatchSize, requestBatchSize); futureList.add(Pair.of(requestBatchSize, future)); } flatFile.putInflightRequest(group, queueOffset, maxCount * factor, futureList); @@ -211,52 +178,41 @@ private void prefetchMessage(CompositeQueueFlatFile flatFile, String group, int } } - private CompletableFuture prefetchMessageThenPutToCache(CompositeQueueFlatFile flatFile, MessageQueue mq, - long queueOffset, int batchSize) { + private CompletableFuture prefetchMessageThenPutToCache( + CompositeQueueFlatFile flatFile, long queueOffset, int batchSize) { + + MessageQueue mq = flatFile.getMessageQueue(); return getMessageFromTieredStoreAsync(flatFile, queueOffset, batchSize) - .thenApplyAsync(result -> { + .thenApply(result -> { if (result.getStatus() != GetMessageStatus.FOUND) { - LOGGER.warn("TieredMessageFetcher#prefetchAndPutMsgToCache: read ahead failed: topic: {}, queue: {}, queue offset: {}, batch size: {}, result: {}", - mq.getTopic(), mq.getQueueId(), queueOffset, batchSize, result.getStatus()); + LOGGER.warn("MessageFetcher message then put to cache failed, result: {}, " + + "topic: {}, queue: {}, queue offset: {}, batch size: {}, result: {}", + result.getStatus(), mq.getTopic(), mq.getQueueId(), queueOffset, batchSize); return -1L; } - // put message into cache List offsetList = result.getMessageQueueOffset(); + List tagCodeList = result.getTagCodeList(); List msgList = result.getMessageMapedList(); - if (offsetList.size() != msgList.size()) { - LOGGER.error("TieredMessageFetcher#prefetchAndPutMsgToCache: read ahead failed, result is illegal: topic: {}, queue: {}, queue offset: {}, batch size: {}, offsetList size: {}, msgList size: {}", - mq.getTopic(), mq.getQueueId(), queueOffset, batchSize, offsetList.size(), msgList.size()); - return -1L; - } - if (offsetList.isEmpty()) { - LOGGER.error("TieredMessageFetcher#prefetchAndPutMsgToCache: read ahead failed, result is FOUND but msgList is empty: topic: {}, queue: {}, queue offset: {}, batch size: {}", - mq.getTopic(), mq.getQueueId(), queueOffset, batchSize); - return -1L; - } - Long minOffset = offsetList.get(0); - Long maxOffset = offsetList.get(offsetList.size() - 1); - int size = offsetList.size(); - for (int n = 0; n < offsetList.size(); n++) { - putMessageToCache(flatFile, offsetList.get(n), msgList.get(n), minOffset, maxOffset, size); + for (int i = 0; i < offsetList.size(); i++) { + SelectMappedBufferResult msg = msgList.get(i); + SelectBufferResultWrapper bufferResult = new SelectBufferResultWrapper( + msg, offsetList.get(i), tagCodeList.get(i), false); + this.putMessageToCache(flatFile, bufferResult); } - if (size != batchSize || maxOffset != queueOffset + batchSize - 1) { - LOGGER.warn("TieredMessageFetcher#prefetchAndPutMsgToCache: size not match: except: {}, actual: {}, queue offset: {}, min offset: {}, except offset: {}, max offset: {}", - batchSize, size, queueOffset, minOffset, queueOffset + batchSize - 1, maxOffset); - } - return maxOffset; - }, TieredStoreExecutor.fetchDataExecutor); + return offsetList.get(offsetList.size() - 1); + }); } - public CompletableFuture getMessageFromCacheAsync(CompositeQueueFlatFile flatFile, + public CompletableFuture getMessageFromCacheAsync(CompositeQueueFlatFile flatFile, String group, long queueOffset, int maxCount, boolean waitInflightRequest) { MessageQueue mq = flatFile.getMessageQueue(); long lastGetOffset = queueOffset - 1; - List resultWrapperList = new ArrayList<>(maxCount); + List resultWrapperList = new ArrayList<>(maxCount); for (int i = 0; i < maxCount; i++) { lastGetOffset++; - SelectMappedBufferResultWrapper wrapper = getMessageFromCache(flatFile, lastGetOffset); + SelectBufferResultWrapper wrapper = getMessageFromCache(flatFile, lastGetOffset); if (wrapper == null) { lastGetOffset--; break; @@ -293,7 +249,7 @@ public CompletableFuture getMessageFromCacheAsync(CompositeQue // try to get message from cache again when prefetch request is done for (int i = 0; i < maxCount - resultWrapperList.size(); i++) { lastGetOffset++; - SelectMappedBufferResultWrapper wrapper = getMessageFromCache(flatFile, lastGetOffset); + SelectBufferResultWrapper wrapper = getMessageFromCache(flatFile, lastGetOffset); if (wrapper == null) { lastGetOffset--; break; @@ -303,72 +259,71 @@ public CompletableFuture getMessageFromCacheAsync(CompositeQue recordCacheAccess(flatFile, group, queueOffset, resultWrapperList); - // if cache hit, result will be returned immediately and asynchronously prefetch messages for later requests - if (!resultWrapperList.isEmpty()) { - LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: " + - "topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}", + if (resultWrapperList.isEmpty()) { + // If cache miss, pull messages immediately + LOGGER.info("MessageFetcher cache miss, topic: {}, queue: {}, offset: {}, maxCount: {}", + mq.getTopic(), mq.getQueueId(), queueOffset, maxCount); + } else { + // If cache hit, return buffer result immediately and asynchronously prefetch messages + LOGGER.debug("MessageFetcher cache hit, topic: {}, queue: {}, offset: {}, maxCount: {}, resultSize: {}", mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size()); - prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1); + this.prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1); - GetMessageResult result = new GetMessageResult(); + GetMessageResultExt result = new GetMessageResultExt(); result.setStatus(GetMessageStatus.FOUND); result.setMinOffset(flatFile.getConsumeQueueMinOffset()); result.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); result.setNextBeginOffset(queueOffset + resultWrapperList.size()); - resultWrapperList.forEach(wrapper -> result.addMessage(wrapper.getDuplicateResult(), wrapper.getCurOffset())); + resultWrapperList.forEach( + wrapper -> result.addMessage(wrapper.getDuplicateResult(), wrapper.getOffset())); return CompletableFuture.completedFuture(result); } - // if cache is miss, immediately pull messages - LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " + - "topic: {}, queue: {}, queue offset: {}, max message num: {}", - mq.getTopic(), mq.getQueueId(), queueOffset, maxCount); - - CompletableFuture resultFuture; + CompletableFuture resultFuture; synchronized (flatFile) { int batchSize = maxCount * storeConfig.getReadAheadMinFactor(); resultFuture = getMessageFromTieredStoreAsync(flatFile, queueOffset, batchSize) - .thenApplyAsync(result -> { + .thenApply(result -> { if (result.getStatus() != GetMessageStatus.FOUND) { return result; } - GetMessageResult newResult = new GetMessageResult(); - newResult.setStatus(GetMessageStatus.FOUND); - newResult.setMinOffset(flatFile.getConsumeQueueMinOffset()); - newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); + GetMessageResultExt newResult = new GetMessageResultExt(); List offsetList = result.getMessageQueueOffset(); + List tagCodeList = result.getTagCodeList(); List msgList = result.getMessageMapedList(); - Long minOffset = offsetList.get(0); - Long maxOffset = offsetList.get(offsetList.size() - 1); - int size = offsetList.size(); + for (int i = 0; i < offsetList.size(); i++) { - Long offset = offsetList.get(i); SelectMappedBufferResult msg = msgList.get(i); - // put message into cache - SelectMappedBufferResultWrapper resultWrapper = putMessageToCache(flatFile, offset, msg, minOffset, maxOffset, size, true); - // try to meet maxCount + SelectBufferResultWrapper bufferResult = new SelectBufferResultWrapper( + msg, offsetList.get(i), tagCodeList.get(i), true); + this.putMessageToCache(flatFile, bufferResult); if (newResult.getMessageMapedList().size() < maxCount) { - newResult.addMessage(resultWrapper.getDuplicateResult(), offset); + newResult.addMessage(msg, offsetList.get(i)); } } + + newResult.setStatus(GetMessageStatus.FOUND); + newResult.setMinOffset(flatFile.getConsumeQueueMinOffset()); + newResult.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); newResult.setNextBeginOffset(queueOffset + newResult.getMessageMapedList().size()); return newResult; - }, TieredStoreExecutor.fetchDataExecutor); + }); List>> futureList = new ArrayList<>(); CompletableFuture inflightRequestFuture = resultFuture.thenApply(result -> - result.getStatus() == GetMessageStatus.FOUND ? result.getMessageQueueOffset().get(result.getMessageQueueOffset().size() - 1) : -1L); + result.getStatus() == GetMessageStatus.FOUND ? + result.getMessageQueueOffset().get(result.getMessageQueueOffset().size() - 1) : -1L); futureList.add(Pair.of(batchSize, inflightRequestFuture)); flatFile.putInflightRequest(group, queueOffset, batchSize, futureList); } return resultFuture; } - public CompletableFuture getMessageFromTieredStoreAsync(CompositeQueueFlatFile flatFile, - long queueOffset, int batchSize) { + public CompletableFuture getMessageFromTieredStoreAsync( + CompositeQueueFlatFile flatFile, long queueOffset, int batchSize) { - GetMessageResult result = new GetMessageResult(); + GetMessageResultExt result = new GetMessageResultExt(); result.setMinOffset(flatFile.getConsumeQueueMinOffset()); result.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); CompletableFuture readConsumeQueueFuture; @@ -389,66 +344,53 @@ public CompletableFuture getMessageFromTieredStoreAsync(Compos } } - CompletableFuture readCommitLogFuture = readConsumeQueueFuture.thenComposeAsync(cqBuffer -> { + CompletableFuture readCommitLogFuture = readConsumeQueueFuture.thenCompose(cqBuffer -> { long firstCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer); cqBuffer.position(cqBuffer.remaining() - TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); long lastCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer); if (lastCommitLogOffset < firstCommitLogOffset) { - MessageQueue mq = flatFile.getMessageQueue(); - LOGGER.error("TieredMessageFetcher#getMessageFromTieredStoreAsync: message is not in order, try to fetch data in next store, topic: {}, queueId: {}, batch size: {}, queue offset {}", - mq.getTopic(), mq.getQueueId(), batchSize, queueOffset); - throw new TieredStoreException(TieredStoreErrorCode.ILLEGAL_OFFSET, "message is not in order"); + LOGGER.error("MessageFetcher#getMessageFromTieredStoreAsync, last offset is smaller than first offset, " + + "topic: {} queueId: {}, offset: {}, firstOffset: {}, lastOffset: {}", + flatFile.getFilePath(), queueOffset, firstCommitLogOffset, lastCommitLogOffset); + return CompletableFuture.completedFuture(ByteBuffer.allocate(0)); } - long length = lastCommitLogOffset - firstCommitLogOffset + CQItemBufferUtil.getSize(cqBuffer); - // prevent OOM - long originLength = length; - while (cqBuffer.limit() > TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE && length > storeConfig.getReadAheadMessageSizeThreshold()) { + // Get the total size of the data by reducing the length limit of cq to prevent OOM + long length = lastCommitLogOffset - firstCommitLogOffset + CQItemBufferUtil.getSize(cqBuffer); + while (cqBuffer.limit() > TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE && + length > storeConfig.getReadAheadMessageSizeThreshold()) { cqBuffer.limit(cqBuffer.position()); cqBuffer.position(cqBuffer.limit() - TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); - length = CQItemBufferUtil.getCommitLogOffset(cqBuffer) - firstCommitLogOffset + CQItemBufferUtil.getSize(cqBuffer); - } - - if (originLength != length) { - MessageQueue mq = flatFile.getMessageQueue(); - LOGGER.info("TieredMessageFetcher#getMessageFromTieredStoreAsync: msg data is too large, topic: {}, queueId: {}, batch size: {}, fix it from {} to {}", - mq.getTopic(), mq.getQueueId(), batchSize, originLength, length); + length = CQItemBufferUtil.getCommitLogOffset(cqBuffer) + - firstCommitLogOffset + CQItemBufferUtil.getSize(cqBuffer); } return flatFile.getCommitLogAsync(firstCommitLogOffset, (int) length); - }, TieredStoreExecutor.fetchDataExecutor); + }); - return readConsumeQueueFuture.thenCombineAsync(readCommitLogFuture, (cqBuffer, msgBuffer) -> { - List> msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer); - if (!msgList.isEmpty()) { - int requestSize = cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE; + return readConsumeQueueFuture.thenCombine(readCommitLogFuture, (cqBuffer, msgBuffer) -> { + List bufferList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer); + int requestSize = cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE; + if (bufferList.isEmpty()) { + result.setStatus(GetMessageStatus.NO_MATCHED_MESSAGE); + result.setNextBeginOffset(queueOffset + requestSize); + } else { result.setStatus(GetMessageStatus.FOUND); - result.setNextBeginOffset(queueOffset + msgList.size()); - msgList.forEach(pair -> { - msgBuffer.position(pair.getLeft()); - ByteBuffer slice = msgBuffer.slice(); - slice.limit(pair.getRight()); - result.addMessage(new SelectMappedBufferResult(pair.getLeft(), slice, pair.getRight(), null), MessageBufferUtil.getQueueOffset(slice)); - }); - if (requestSize != msgList.size()) { - Set requestOffsetSet = new HashSet<>(); - for (int i = 0; i < requestSize; i++) { - requestOffsetSet.add(queueOffset + i); - } - LOGGER.error("TieredMessageFetcher#getMessageFromTieredStoreAsync: split message buffer failed, batch size: {}, request message count: {}, actual message count: {}, these messages may lost: {}", batchSize, requestSize, msgList.size(), Sets.difference(requestOffsetSet, Sets.newHashSet(result.getMessageQueueOffset()))); - } else if (requestSize != batchSize) { - LOGGER.debug("TieredMessageFetcher#getMessageFromTieredStoreAsync: message count does not meet batch size, maybe dispatch delay: batch size: {}, request message count: {}", batchSize, requestSize); + result.setNextBeginOffset(queueOffset + requestSize); + + for (SelectBufferResult bufferResult : bufferList) { + ByteBuffer slice = bufferResult.getByteBuffer().slice(); + slice.limit(bufferResult.getSize()); + SelectMappedBufferResult msg = new SelectMappedBufferResult(bufferResult.getStartOffset(), + bufferResult.getByteBuffer(), bufferResult.getSize(), null); + result.addMessage(msg, MessageBufferUtil.getQueueOffset(slice), bufferResult.getTagCode()); } - return result; } - long nextBeginOffset = queueOffset + cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE; - LOGGER.error("TieredMessageFetcher#getMessageFromTieredStoreAsync: split message buffer failed, consume queue buffer size: {}, message buffer size: {}, change offset from {} to {}", cqBuffer.remaining(), msgBuffer.remaining(), queueOffset, nextBeginOffset); - result.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING); - result.setNextBeginOffset(nextBeginOffset); return result; - }, TieredStoreExecutor.fetchDataExecutor).exceptionally(e -> { + }).exceptionally(e -> { MessageQueue mq = flatFile.getMessageQueue(); - LOGGER.warn("TieredMessageFetcher#getMessageFromTieredStoreAsync: get message failed: topic: {} queueId: {}", mq.getTopic(), mq.getQueueId(), e); + LOGGER.warn("MessageFetcher#getMessageFromTieredStoreAsync failed, " + + "topic: {} queueId: {}, offset: {}, batchSize: {}", mq.getTopic(), mq.getQueueId(), queueOffset, batchSize, e); result.setStatus(GetMessageStatus.OFFSET_FOUND_NULL); result.setNextBeginOffset(queueOffset); return result; @@ -498,7 +440,8 @@ public CompletableFuture getMessageAsync( return CompletableFuture.completedFuture(result); } - return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount); + return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount, true) + .thenApply(messageResultExt -> messageResultExt.doFilterMessage(messageFilter)); } @Override @@ -546,7 +489,7 @@ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, Bo return flatFile.getOffsetInConsumeQueueByTime(timestamp, type); } catch (Exception e) { LOGGER.error("TieredMessageFetcher#getOffsetInQueueByTime: " + - "get offset in queue by time failed: topic: {}, queue: {}, timestamp: {}, type: {}", + "get offset in queue by time failed: topic: {}, queue: {}, timestamp: {}, type: {}", topic, queueId, timestamp, type, e); } return -1L; @@ -598,7 +541,8 @@ public CompletableFuture queryMessageAsync( return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).thenApply(v -> result); }).whenComplete((result, throwable) -> { if (result != null) { - LOGGER.info("MessageFetcher#queryMessageAsync, query result: {}, topic: {}, topicId: {}, key: {}, maxCount: {}, timestamp: {}-{}", + LOGGER.info("MessageFetcher#queryMessageAsync, " + + "query result: {}, topic: {}, topicId: {}, key: {}, maxCount: {}, timestamp: {}-{}", result.getMessageBufferList().size(), topic, topicId, key, maxCount, begin, end); } }); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java new file mode 100644 index 000000000000..a00c72dbb630 --- /dev/null +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/GetMessageResultExt.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tieredstore.common; + +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.store.GetMessageResult; +import org.apache.rocketmq.store.GetMessageStatus; +import org.apache.rocketmq.store.MessageFilter; +import org.apache.rocketmq.store.SelectMappedBufferResult; +import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; + +public class GetMessageResultExt extends GetMessageResult { + + private final List tagCodeList; + + public GetMessageResultExt() { + this.tagCodeList = new ArrayList<>(); + } + + public void addMessage(SelectMappedBufferResult bufferResult, long queueOffset, long tagCode) { + super.addMessage(bufferResult, queueOffset); + this.tagCodeList.add(tagCode); + } + + public List getTagCodeList() { + return tagCodeList; + } + + public GetMessageResult doFilterMessage(MessageFilter messageFilter) { + if (GetMessageStatus.FOUND.equals(super.getStatus()) || messageFilter == null) { + return this; + } + + GetMessageResult result = new GetMessageResult(); + result.setMinOffset(this.getMinOffset()); + result.setMaxOffset(this.getMaxOffset()); + result.setNextBeginOffset(this.getNextBeginOffset()); + + for (int i = 0; i < this.getMessageMapedList().size(); i++) { + if (!messageFilter.isMatchedByConsumeQueue(this.tagCodeList.get(i), null)) { + continue; + } + + SelectMappedBufferResult bufferResult = this.getMessageMapedList().get(i); + if (!messageFilter.isMatchedByCommitLog(bufferResult.getByteBuffer().slice(), null)) { + continue; + } + + result.addMessage(new SelectMappedBufferResult(bufferResult.getStartOffset(), + bufferResult.getByteBuffer(), bufferResult.getSize(), null), + MessageBufferUtil.getQueueOffset(bufferResult.getByteBuffer())); + } + + if (result.getBufferTotalSize() == 0) { + result.setStatus(GetMessageStatus.NO_MATCHED_MESSAGE); + } + return result; + } +} diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java new file mode 100644 index 000000000000..d265ed0fc42f --- /dev/null +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResult.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tieredstore.common; + +import java.nio.ByteBuffer; + +public class SelectBufferResult { + + private final ByteBuffer byteBuffer; + private final long startOffset; + private final int size; + private final long tagCode; + + public SelectBufferResult(ByteBuffer byteBuffer, long startOffset, int size, long tagCode) { + this.startOffset = startOffset; + this.byteBuffer = byteBuffer; + this.size = size; + this.tagCode = tagCode; + } + + public ByteBuffer getByteBuffer() { + return byteBuffer; + } + + public long getStartOffset() { + return startOffset; + } + + public int getSize() { + return size; + } + + public long getTagCode() { + return tagCode; + } +} diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectMappedBufferResultWrapper.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultWrapper.java similarity index 55% rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectMappedBufferResultWrapper.java rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultWrapper.java index af0785f712c5..4f9f00a074cc 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectMappedBufferResultWrapper.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/SelectBufferResultWrapper.java @@ -16,32 +16,21 @@ */ package org.apache.rocketmq.tieredstore.common; -import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.store.SelectMappedBufferResult; -public class SelectMappedBufferResultWrapper { +public class SelectBufferResultWrapper { private final SelectMappedBufferResult result; - private final LongAdder accessCount; - - private final long curOffset; - private final long minOffset; - private final long maxOffset; - private final long size; - - public SelectMappedBufferResultWrapper( - SelectMappedBufferResult result, long curOffset, long minOffset, long maxOffset, long size) { + private final long offset; + private final long tagCode; + private final AtomicInteger accessCount; + public SelectBufferResultWrapper(SelectMappedBufferResult result, long offset, long tagCode, boolean used) { this.result = result; - this.accessCount = new LongAdder(); - this.curOffset = curOffset; - this.minOffset = minOffset; - this.maxOffset = maxOffset; - this.size = size; - } - - public SelectMappedBufferResult getResult() { - return result; + this.offset = offset; + this.tagCode = tagCode; + this.accessCount = new AtomicInteger(used ? 1 : 0); } public SelectMappedBufferResult getDuplicateResult() { @@ -53,27 +42,23 @@ public SelectMappedBufferResult getDuplicateResult() { result.getMappedFile()); } - public long getCurOffset() { - return curOffset; - } - - public long getMinOffset() { - return minOffset; + public long getOffset() { + return offset; } - public long getMaxOffset() { - return maxOffset; + public int getBufferSize() { + return this.result.getSize(); } - public long getSize() { - return size; + public long getTagCode() { + return tagCode; } - public void addAccessCount() { - accessCount.increment(); + public int incrementAndGet() { + return accessCount.incrementAndGet(); } - public long getAccessCount() { - return accessCount.sum(); + public int getAccessCount() { + return accessCount.get(); } } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java index 5ad3a6ff3207..2d5d8c9cb6c4 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java @@ -99,6 +99,10 @@ public boolean isClosed() { return closed; } + public String getFilePath() { + return filePath; + } + public ReentrantLock getCompositeFlatFileLock() { return compositeFlatFileLock; } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java index d8a07f0a75fe..2b9fc59d821c 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java @@ -46,7 +46,7 @@ import org.apache.rocketmq.tieredstore.TieredMessageFetcher; import org.apache.rocketmq.tieredstore.common.FileSegmentType; import org.apache.rocketmq.tieredstore.common.MessageCacheKey; -import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper; +import org.apache.rocketmq.tieredstore.common.SelectBufferResultWrapper; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile; import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager; @@ -265,7 +265,7 @@ public static void init(Meter meter, Supplier attributesBuild .setUnit("bytes") .ofLongs() .buildWithCallback(measurement -> { - Optional> eviction = fetcher.getMessageCache().policy().eviction(); + Optional> eviction = fetcher.getMessageCache().policy().eviction(); eviction.ifPresent(resultEviction -> measurement.record(resultEviction.weightedSize().orElse(0), newAttributesBuilder().build())); }); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java index aad42de98d84..5e3d8c5624fa 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java @@ -295,7 +295,7 @@ public CompletableFuture readAsync(long position, int length) { return future; } if (position + length > commitPosition) { - logger.warn("TieredFileSegment#readAsync request position + length is greater than commit position," + + logger.debug("TieredFileSegment#readAsync request position + length is greater than commit position," + " correct length using commit position, file: {}, request position: {}, commit position:{}, change length from {} to {}", getPath(), position, commitPosition, length, commitPosition - position); length = (int) (commitPosition - position); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java index 6db45a7479e6..2c4a6e5784bf 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtil.java @@ -20,11 +20,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.commons.lang3.tuple.Pair; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.tieredstore.common.SelectBufferResult; import org.apache.rocketmq.tieredstore.file.TieredCommitLog; import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue; @@ -113,53 +113,72 @@ public static Map getProperties(ByteBuffer message) { return MessageDecoder.decodeProperties(slice); } - public static List> splitMessageBuffer( - ByteBuffer cqBuffer, ByteBuffer msgBuffer) { + public static List splitMessageBuffer(ByteBuffer cqBuffer, ByteBuffer msgBuffer) { + cqBuffer.rewind(); msgBuffer.rewind(); - List> messageList = new ArrayList<>(cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); + + List bufferResultList = new ArrayList<>( + cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); + + if (msgBuffer.remaining() == 0) { + logger.error("MessageBufferUtil#splitMessage, msg buffer length is zero"); + return bufferResultList; + } + if (cqBuffer.remaining() % TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE != 0) { - logger.warn("MessageBufferUtil#splitMessage: consume queue buffer size {} is not an integer multiple of CONSUME_QUEUE_STORE_UNIT_SIZE {}", - cqBuffer.remaining(), TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); - return messageList; + logger.error("MessageBufferUtil#splitMessage, consume queue buffer size incorrect, {}", cqBuffer.remaining()); + return bufferResultList; } + try { - long startCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer); - for (int pos = cqBuffer.position(); pos < cqBuffer.limit(); pos += TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE) { - cqBuffer.position(pos); - int diff = (int) (CQItemBufferUtil.getCommitLogOffset(cqBuffer) - startCommitLogOffset); - int size = CQItemBufferUtil.getSize(cqBuffer); - if (diff + size > msgBuffer.limit()) { - logger.error("MessageBufferUtil#splitMessage: message buffer size is incorrect: record in consume queue: {}, actual: {}", diff + size, msgBuffer.remaining()); - return messageList; + long firstCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer); + + for (int position = cqBuffer.position(); position < cqBuffer.limit(); + position += TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE) { + + cqBuffer.position(position); + long logOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer); + int bufferSize = CQItemBufferUtil.getSize(cqBuffer); + long tagCode = CQItemBufferUtil.getTagCode(cqBuffer); + + int offset = (int) (logOffset - firstCommitLogOffset); + if (offset + bufferSize > msgBuffer.limit()) { + logger.error("MessageBufferUtil#splitMessage, message buffer size incorrect. " + + "Expect length in consume queue: {}, actual length: {}", offset + bufferSize, msgBuffer.limit()); + break; } - msgBuffer.position(diff); + msgBuffer.position(offset); int magicCode = getMagicCode(msgBuffer); if (magicCode == TieredCommitLog.BLANK_MAGIC_CODE) { - logger.warn("MessageBufferUtil#splitMessage: message decode error: blank magic code, this message may be coda, try to fix offset"); - diff = diff + TieredCommitLog.CODA_SIZE; - msgBuffer.position(diff); + offset += TieredCommitLog.CODA_SIZE; + msgBuffer.position(offset); magicCode = getMagicCode(msgBuffer); } - if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE && magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) { - logger.warn("MessageBufferUtil#splitMessage: message decode error: unknown magic code"); + if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE && + magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) { + logger.warn("MessageBufferUtil#splitMessage, found unknown magic code. " + + "Message offset: {}, wrong magic code: {}", offset, magicCode); continue; } - if (getTotalSize(msgBuffer) != size) { - logger.warn("MessageBufferUtil#splitMessage: message size is not right: except: {}, actual: {}", size, getTotalSize(msgBuffer)); + if (bufferSize != getTotalSize(msgBuffer)) { + logger.warn("MessageBufferUtil#splitMessage, message length in commitlog incorrect. " + + "Except length in commitlog: {}, actual: {}", getTotalSize(msgBuffer), bufferSize); continue; } - messageList.add(Pair.of(diff, size)); + ByteBuffer sliceBuffer = msgBuffer.slice(); + sliceBuffer.limit(bufferSize); + bufferResultList.add(new SelectBufferResult(sliceBuffer, offset, bufferSize, tagCode)); } } catch (Exception e) { - logger.error("MessageBufferUtil#splitMessage: split message failed, maybe decode consume queue item failed", e); + logger.error("MessageBufferUtil#splitMessage, split message buffer error", e); } finally { cqBuffer.rewind(); msgBuffer.rewind(); } - return messageList; + return bufferResultList; } } diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java index 4e0d7e697941..4e8287533f57 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java @@ -31,7 +31,7 @@ import org.apache.rocketmq.store.QueryMessageResult; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.tieredstore.common.AppendResult; -import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper; +import org.apache.rocketmq.tieredstore.common.SelectBufferResultWrapper; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; import org.apache.rocketmq.tieredstore.file.CompositeFlatFile; @@ -143,17 +143,18 @@ public void testGetMessageFromCacheAsync() { fetcher.recordCacheAccess(flatFile, "prevent-invalid-cache", 0, new ArrayList<>()); Assert.assertEquals(0, fetcher.getMessageCache().estimatedSize()); - fetcher.putMessageToCache(flatFile, 0, new SelectMappedBufferResult(0, msg1, msg1.remaining(), null), 0, 0, 1); + SelectMappedBufferResult bufferResult = new SelectMappedBufferResult(0, msg1, msg1.remaining(), null); + fetcher.putMessageToCache(flatFile, new SelectBufferResultWrapper(bufferResult, 0, 0, false)); Assert.assertEquals(1, fetcher.getMessageCache().estimatedSize()); - GetMessageResult getMessageResult = fetcher.getMessageFromCacheAsync(flatFile, "group", 0, 32).join(); + GetMessageResult getMessageResult = fetcher.getMessageFromCacheAsync(flatFile, "group", 0, 32, true).join(); Assert.assertEquals(GetMessageStatus.FOUND, getMessageResult.getStatus()); Assert.assertEquals(1, getMessageResult.getMessageBufferList().size()); Assert.assertEquals(msg1, getMessageResult.getMessageBufferList().get(0)); Awaitility.waitAtMost(3, TimeUnit.SECONDS) .until(() -> fetcher.getMessageCache().estimatedSize() == 2); - ArrayList wrapperList = new ArrayList<>(); + ArrayList wrapperList = new ArrayList<>(); wrapperList.add(fetcher.getMessageFromCache(flatFile, 0)); fetcher.recordCacheAccess(flatFile, "prevent-invalid-cache", 0, wrapperList); Assert.assertEquals(1, fetcher.getMessageCache().estimatedSize()); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java index 68277cacc5e1..a0b43894817d 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java @@ -22,9 +22,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.commons.lang3.tuple.Pair; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.tieredstore.common.SelectBufferResult; import org.apache.rocketmq.tieredstore.file.TieredCommitLog; import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue; import org.junit.Assert; @@ -206,10 +206,12 @@ public void testSplitMessages() { cqBuffer.flip(); cqBuffer1.rewind(); cqBuffer2.rewind(); - List> msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer); + List msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer); Assert.assertEquals(2, msgList.size()); - Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0)); - Assert.assertEquals(Pair.of(MSG_LEN + TieredCommitLog.CODA_SIZE, MSG_LEN), msgList.get(1)); + Assert.assertEquals(0, msgList.get(0).getStartOffset()); + Assert.assertEquals(MSG_LEN, msgList.get(0).getSize()); + Assert.assertEquals(MSG_LEN + TieredCommitLog.CODA_SIZE, msgList.get(1).getStartOffset()); + Assert.assertEquals(MSG_LEN, msgList.get(1).getSize()); cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 2); cqBuffer.put(cqBuffer1); @@ -219,7 +221,8 @@ public void testSplitMessages() { cqBuffer4.rewind(); msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer); Assert.assertEquals(1, msgList.size()); - Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0)); + Assert.assertEquals(0, msgList.get(0).getStartOffset()); + Assert.assertEquals(MSG_LEN, msgList.get(0).getSize()); cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 3); cqBuffer.put(cqBuffer1); @@ -227,8 +230,10 @@ public void testSplitMessages() { cqBuffer.flip(); msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer); Assert.assertEquals(2, msgList.size()); - Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0)); - Assert.assertEquals(Pair.of(MSG_LEN + TieredCommitLog.CODA_SIZE, MSG_LEN), msgList.get(1)); + Assert.assertEquals(0, msgList.get(0).getStartOffset()); + Assert.assertEquals(MSG_LEN, msgList.get(0).getSize()); + Assert.assertEquals(MSG_LEN + TieredCommitLog.CODA_SIZE, msgList.get(1).getStartOffset()); + Assert.assertEquals(MSG_LEN, msgList.get(1).getSize()); cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE); cqBuffer.put(cqBuffer5);