From 4cc33115903a8eeaba30d635e1f2dc356770e5f7 Mon Sep 17 00:00:00 2001 From: SSpirits Date: Tue, 21 Feb 2023 10:09:05 +0800 Subject: [PATCH] [ISSUE #6123] Fix flaky test in tiered storage (#6124) * fix flaky test in tiered storage * add debug logs * destroy metadata store and container manager after TieredMessageStoreTest * remove debug logs --- .../container/TieredContainerManager.java | 11 +++- .../container/TieredFileQueue.java | 7 +- .../provider/posix/PosixFileSegment.java | 8 +-- .../tieredstore/util/TieredStoreUtil.java | 4 ++ .../tieredstore/TieredDispatcherTest.java | 20 +++--- .../tieredstore/TieredMessageFetcherTest.java | 23 +++---- .../tieredstore/TieredMessageStoreTest.java | 10 +-- .../tieredstore/TieredStoreTestUtil.java | 62 ++++++++++++++++++ .../container/TieredContainerManagerTest.java | 21 +++--- .../container/TieredFileQueueTest.java | 15 +++-- .../container/TieredIndexFileTest.java | 22 +++---- .../TieredMessageQueueContainerTest.java | 22 ++++--- .../metadata/MetadataStoreTest.java | 20 +++--- .../TieredStoreMetricsManagerTest.java | 10 +++ .../tieredstore/mock/MemoryFileSegment.java | 4 +- .../mock/MemoryFileSegmentWithoutCheck.java | 64 +++++++++++++++++++ .../provider/posix/PosixFileSegmentTest.java | 13 ++-- 17 files changed, 251 insertions(+), 85 deletions(-) create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredStoreTestUtil.java create mode 100644 tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredContainerManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredContainerManager.java index ca2f4f81f1f..94f1e048ddc 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredContainerManager.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredContainerManager.java @@ -44,12 +44,17 @@ public class TieredContainerManager { private final TieredMessageStoreConfig storeConfig; public static TieredContainerManager getInstance(TieredMessageStoreConfig storeConfig) { + if (storeConfig == null) { + return instance; + } + if (instance == null) { synchronized (TieredContainerManager.class) { if (instance == null) { try { instance = new TieredContainerManager(storeConfig); - } catch (Exception ignored) { + } catch (Exception e) { + logger.error("TieredContainerManager#getInstance: create container manager failed", e); } } } @@ -58,6 +63,10 @@ public static TieredContainerManager getInstance(TieredMessageStoreConfig storeC } public static TieredIndexFile getIndexFile(TieredMessageStoreConfig storeConfig) { + if (storeConfig == null) { + return indexFile; + } + if (indexFile == null) { synchronized (TieredContainerManager.class) { if (indexFile == null) { diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredFileQueue.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredFileQueue.java index 8ad1b1491e7..1640e8daf13 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredFileQueue.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredFileQueue.java @@ -51,7 +51,7 @@ public class TieredFileQueue { private final TieredMessageStoreConfig storeConfig; private final TieredMetadataStore metadataStore; - private final List fileSegmentList = new ArrayList<>(); + protected final List fileSegmentList = new ArrayList<>(); protected final List needCommitFileSegmentList = new CopyOnWriteArrayList<>(); private final ReentrantReadWriteLock fileSegmentLock = new ReentrantReadWriteLock(); @@ -130,7 +130,10 @@ public long getCommitMsgQueueOffset() { } } - private void loadFromMetadata() { + protected void loadFromMetadata() { + fileSegmentList.clear(); + needCommitFileSegmentList.clear(); + metadataStore.iterateFileSegment(fileType, messageQueue.getTopic(), messageQueue.getQueueId(), metadata -> { if (metadata.getStatus() == FileSegmentMetadata.STATUS_DELETED) { return; diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java index 9def6bd29af..b83967db27f 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java @@ -129,10 +129,6 @@ public void createFile() { @Override public void destroyFile() { - if (file.exists()) { - file.delete(); - } - try { if (readFileChannel != null && readFileChannel.isOpen()) { readFileChannel.close(); @@ -143,6 +139,10 @@ public void destroyFile() { } catch (IOException e) { logger.error("PosixFileSegment#destroyFile: destroy file {} failed: ", filepath, e); } + + if (file.exists()) { + file.delete(); + } } @Override diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/TieredStoreUtil.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/TieredStoreUtil.java index c41e5a48e29..d1ba8f761b1 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/TieredStoreUtil.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/TieredStoreUtil.java @@ -135,6 +135,10 @@ public static boolean isSystemTopic(final String topic) { } public static TieredMetadataStore getMetadataStore(TieredMessageStoreConfig storeConfig) { + if (storeConfig == null) { + return metadataStoreInstance; + } + if (metadataStoreInstance == null) { synchronized (TieredMetadataStore.class) { if (metadataStoreInstance == null) { diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java index 33e908824ca..a89f736e82d 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java @@ -44,16 +44,17 @@ import org.mockito.Mockito; public class TieredDispatcherTest { - TieredMessageStoreConfig storeConfig; - MessageQueue mq; - TieredMetadataStore metadataStore; + private TieredMessageStoreConfig storeConfig; + private MessageQueue mq; + private TieredMetadataStore metadataStore; + + private final String storePath = FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID(); @Before public void setUp() { - MemoryFileSegment.checkSize = false; storeConfig = new TieredMessageStoreConfig(); - storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID()); - storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment"); + storeConfig.setStorePathRootDir(storePath); + storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegmentWithoutCheck"); storeConfig.setBrokerName(storeConfig.getBrokerName()); mq = new MessageQueue("TieredMessageQueueContainerTest", storeConfig.getBrokerName(), 0); metadataStore = TieredStoreUtil.getMetadataStore(storeConfig); @@ -61,10 +62,9 @@ public void setUp() { @After public void tearDown() throws IOException { - MemoryFileSegment.checkSize = true; - FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID())); - TieredStoreUtil.getMetadataStore(storeConfig).destroy(); - TieredContainerManager.getInstance(storeConfig).cleanup(); + TieredStoreTestUtil.destroyContainerManager(); + TieredStoreTestUtil.destroyMetadataStore(); + TieredStoreTestUtil.destroyTempDir(storePath); } @Test diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java index 9dd94ccf654..2d2c5d5f244 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java @@ -38,8 +38,6 @@ import org.apache.rocketmq.tieredstore.container.TieredContainerManager; import org.apache.rocketmq.tieredstore.container.TieredIndexFile; import org.apache.rocketmq.tieredstore.container.TieredMessageQueueContainer; -import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; -import org.apache.rocketmq.tieredstore.mock.MemoryFileSegment; import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; @@ -51,30 +49,29 @@ import org.junit.Test; public class TieredMessageFetcherTest { - TieredMessageStoreConfig storeConfig; - MessageQueue mq; - TieredMetadataStore metadataStore; + private TieredMessageStoreConfig storeConfig; + private MessageQueue mq; + + private final String storePath = FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID(); @Before public void setUp() { - MemoryFileSegment.checkSize = false; storeConfig = new TieredMessageStoreConfig(); - storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID()); + storeConfig.setStorePathRootDir(storePath); storeConfig.setBrokerName(storeConfig.getBrokerName()); storeConfig.setReadAheadCacheExpireDuration(Long.MAX_VALUE); - storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment"); + storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegmentWithoutCheck"); storeConfig.setTieredStoreIndexFileMaxHashSlotNum(2); storeConfig.setTieredStoreIndexFileMaxIndexNum(3); - metadataStore = TieredStoreUtil.getMetadataStore(storeConfig); mq = new MessageQueue("TieredMessageFetcherTest", storeConfig.getBrokerName(), 0); + TieredStoreUtil.getMetadataStore(storeConfig); } @After public void tearDown() throws IOException { - MemoryFileSegment.checkSize = true; - FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID())); - TieredStoreUtil.getMetadataStore(storeConfig).destroy(); - TieredContainerManager.getInstance(storeConfig).cleanup(); + TieredStoreTestUtil.destroyContainerManager(); + TieredStoreTestUtil.destroyMetadataStore(); + TieredStoreTestUtil.destroyTempDir(storePath); } public Triple buildFetcher() { diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java index 800b109384a..c16ba141cce 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java @@ -67,10 +67,12 @@ public class TieredMessageStoreTest { private Configuration configuration; private TieredContainerManager containerManager; + private final String storePath = FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID(); + @Before public void setUp() { storeConfig = new MessageStoreConfig(); - storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID()); + storeConfig.setStorePathRootDir(storePath); mq = new MessageQueue("TieredMessageStoreTest", "broker", 0); nextStore = Mockito.mock(DefaultMessageStore.class); @@ -102,9 +104,9 @@ public void setUp() { @After public void tearDown() throws IOException { - FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID())); - TieredStoreUtil.getMetadataStore(store.getStoreConfig()).destroy(); - TieredContainerManager.getInstance(store.getStoreConfig()).cleanup(); + TieredStoreTestUtil.destroyContainerManager(); + TieredStoreTestUtil.destroyMetadataStore(); + TieredStoreTestUtil.destroyTempDir(storePath); } private void mockContainer() { diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredStoreTestUtil.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredStoreTestUtil.java new file mode 100644 index 00000000000..c537a83c9f7 --- /dev/null +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredStoreTestUtil.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tieredstore; + +import java.io.File; +import java.lang.reflect.Field; +import org.apache.commons.io.FileUtils; +import org.apache.rocketmq.tieredstore.container.TieredContainerManager; +import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; +import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; +import org.junit.Assert; + +public class TieredStoreTestUtil { + public static void destroyMetadataStore() { + TieredMetadataStore metadataStore = TieredStoreUtil.getMetadataStore(null); + if (metadataStore != null) { + metadataStore.destroy(); + } + try { + Field field = TieredStoreUtil.class.getDeclaredField("metadataStoreInstance"); + field.setAccessible(true); + field.set(null, null); + } catch (NoSuchFieldException | IllegalAccessException e) { + Assert.fail(e.getClass().getCanonicalName() + ": " + e.getMessage()); + } + } + + public static void destroyContainerManager() { + TieredContainerManager containerManager = TieredContainerManager.getInstance(null); + if (containerManager != null) { + containerManager.destroy(); + } + try { + Field field = TieredContainerManager.class.getDeclaredField("instance"); + field.setAccessible(true); + field.set(null, null); + } catch (NoSuchFieldException | IllegalAccessException e) { + Assert.fail(e.getClass().getCanonicalName() + ": " + e.getMessage()); + } + } + + public static void destroyTempDir(String storePath) { + try { + FileUtils.deleteDirectory(new File(storePath)); + } catch (Exception ignore) { + } + } +} diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredContainerManagerTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredContainerManagerTest.java index 1c8254d9805..2f8ad36154c 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredContainerManagerTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredContainerManagerTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; @@ -32,14 +33,16 @@ import org.junit.Test; public class TieredContainerManagerTest { - TieredMessageStoreConfig storeConfig; - MessageQueue mq; - TieredMetadataStore metadataStore; + private TieredMessageStoreConfig storeConfig; + private MessageQueue mq; + private TieredMetadataStore metadataStore; + + private final String storePath = FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID(); @Before public void setUp() { storeConfig = new TieredMessageStoreConfig(); - storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID()); + storeConfig.setStorePathRootDir(storePath); storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment"); storeConfig.setBrokerName(storeConfig.getBrokerName()); mq = new MessageQueue("TieredContainerManagerTest", storeConfig.getBrokerName(), 0); @@ -48,9 +51,9 @@ public void setUp() { @After public void tearDown() throws IOException { - FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID())); - TieredStoreUtil.getMetadataStore(storeConfig).destroy(); - TieredContainerManager.getInstance(storeConfig).cleanup(); + TieredStoreTestUtil.destroyContainerManager(); + TieredStoreTestUtil.destroyMetadataStore(); + TieredStoreTestUtil.destroyTempDir(storePath); } @@ -64,7 +67,9 @@ public void testLoadAndDestroy() { boolean load = containerManager.load(); Assert.assertTrue(load); - Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() -> containerManager.getAllMQContainer().size() == 2); + Awaitility.await() + .atMost(3, TimeUnit.SECONDS) + .until(() -> containerManager.getAllMQContainer().size() == 2); TieredMessageQueueContainer container = containerManager.getMQContainer(mq); Assert.assertNotNull(container); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredFileQueueTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredFileQueueTest.java index 6385fa281f9..60f751a623d 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredFileQueueTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredFileQueueTest.java @@ -22,6 +22,7 @@ import java.util.UUID; import org.apache.commons.io.FileUtils; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; import org.apache.rocketmq.tieredstore.mock.MemoryFileSegment; @@ -33,21 +34,23 @@ import org.junit.Test; public class TieredFileQueueTest { - TieredMessageStoreConfig storeConfig; - MessageQueue queue; + private TieredMessageStoreConfig storeConfig; + private MessageQueue queue; + + private final String storePath = FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID(); @Before public void setUp() { storeConfig = new TieredMessageStoreConfig(); - storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID()); + storeConfig.setStorePathRootDir(storePath); storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment"); queue = new MessageQueue("TieredFileQueueTest", storeConfig.getBrokerName(), 0); } @After public void tearDown() throws IOException { - FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID())); - TieredStoreUtil.getMetadataStore(storeConfig).destroy(); + TieredStoreTestUtil.destroyMetadataStore(); + TieredStoreTestUtil.destroyTempDir(storePath); } @Test @@ -149,7 +152,7 @@ public void testCheckFileSize() throws ClassNotFoundException, NoSuchMethodExcep TieredFileSegment fileSegment1 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.CONSUME_QUEUE, queue, 100, storeConfig); fileSegment1.initPosition(fileSegment1.getSize() - 100); - fileSegment1.setFull(false); + fileSegment1.setFull(); metadataStore.updateFileSegment(fileSegment1); metadataStore.updateFileSegment(fileSegment1); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredIndexFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredIndexFileTest.java index 0824cf35d23..6a114e7ca89 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredIndexFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredIndexFileTest.java @@ -26,9 +26,8 @@ import org.apache.commons.lang3.SystemUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; -import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; -import org.apache.rocketmq.tieredstore.mock.MemoryFileSegment; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; import org.awaitility.Awaitility; import org.junit.After; @@ -39,27 +38,26 @@ import org.junit.Test; public class TieredIndexFileTest { - MessageQueue mq; - TieredMessageStoreConfig storeConfig; - TieredMetadataStore metadataStore; + private MessageQueue mq; + private TieredMessageStoreConfig storeConfig; + + private final String storePath = FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID(); @Before public void setUp() { - MemoryFileSegment.checkSize = false; storeConfig = new TieredMessageStoreConfig(); - storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID()); - storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment"); + storeConfig.setStorePathRootDir(storePath); + storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegmentWithoutCheck"); storeConfig.setTieredStoreIndexFileMaxHashSlotNum(2); storeConfig.setTieredStoreIndexFileMaxIndexNum(3); mq = new MessageQueue("TieredIndexFileTest", storeConfig.getBrokerName(), 1); - metadataStore = TieredStoreUtil.getMetadataStore(storeConfig); + TieredStoreUtil.getMetadataStore(storeConfig); } @After public void tearDown() throws IOException { - MemoryFileSegment.checkSize = true; - FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID())); -// metadataStore.reLoadStore(); + TieredStoreTestUtil.destroyMetadataStore(); + TieredStoreTestUtil.destroyTempDir(storePath); } @Ignore diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java index a9eb444c90f..11afa362b24 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java @@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.store.DispatchRequest; +import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; import org.apache.rocketmq.tieredstore.common.AppendResult; import org.apache.rocketmq.tieredstore.common.BoundaryType; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; @@ -40,14 +41,16 @@ import org.junit.Test; public class TieredMessageQueueContainerTest { - TieredMessageStoreConfig storeConfig; - MessageQueue mq; - TieredMetadataStore metadataStore; + private TieredMessageStoreConfig storeConfig; + private MessageQueue mq; + private TieredMetadataStore metadataStore; + + private final String storePath = FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID(); @Before public void setUp() { storeConfig = new TieredMessageStoreConfig(); - storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID()); + storeConfig.setStorePathRootDir(storePath); storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment"); storeConfig.setCommitLogRollingInterval(0); storeConfig.setCommitLogRollingMinimumSize(999); @@ -57,14 +60,13 @@ public void setUp() { @After public void tearDown() throws IOException { - MemoryFileSegment.checkSize = true; - FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID())); - TieredStoreUtil.getMetadataStore(storeConfig).destroy(); - TieredContainerManager.getInstance(storeConfig).cleanup(); + TieredStoreTestUtil.destroyContainerManager(); + TieredStoreTestUtil.destroyMetadataStore(); + TieredStoreTestUtil.destroyTempDir(storePath); } @Test - public void testAppendCommitLog() throws ClassNotFoundException, NoSuchMethodException, IOException { + public void testAppendCommitLog() throws ClassNotFoundException, NoSuchMethodException { TieredMessageQueueContainer container = new TieredMessageQueueContainer(mq, storeConfig); ByteBuffer message = MessageBufferUtilTest.buildMessageBuffer(); AppendResult result = container.appendCommitLog(message); @@ -136,7 +138,7 @@ public void testAppendConsumeQueue() throws ClassNotFoundException, NoSuchMethod @Test public void testBinarySearchInQueueByTime() throws ClassNotFoundException, NoSuchMethodException { - MemoryFileSegment.checkSize = false; + storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegmentWithoutCheck"); TieredMessageQueueContainer container = new TieredMessageQueueContainer(mq, storeConfig); container.initOffset(50); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metadata/MetadataStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metadata/MetadataStoreTest.java index 4832d1246c5..96539d1c449 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metadata/MetadataStoreTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metadata/MetadataStoreTest.java @@ -26,27 +26,29 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; import org.apache.rocketmq.tieredstore.container.TieredCommitLog; import org.apache.rocketmq.tieredstore.mock.MemoryFileSegment; import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; -import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; public class MetadataStoreTest { - MessageQueue mq0; - MessageQueue mq1; - MessageQueue mq2; - TieredMessageStoreConfig storeConfig; - TieredMetadataStore metadataStore; + private MessageQueue mq0; + private MessageQueue mq1; + private MessageQueue mq2; + private TieredMessageStoreConfig storeConfig; + private TieredMetadataStore metadataStore; + + private final String storePath = FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID(); @Before public void setUp() { storeConfig = new TieredMessageStoreConfig(); - storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID()); + storeConfig.setStorePathRootDir(storePath); mq0 = new MessageQueue("MetadataStoreTest0", storeConfig.getBrokerName(), 0); mq1 = new MessageQueue("MetadataStoreTest1", storeConfig.getBrokerName(), 0); mq2 = new MessageQueue("MetadataStoreTest1", storeConfig.getBrokerName(), 1); @@ -55,8 +57,8 @@ public void setUp() { @After public void tearDown() throws IOException { - FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID())); - TieredStoreUtil.getMetadataStore(storeConfig).destroy(); + TieredStoreTestUtil.destroyMetadataStore(); + TieredStoreTestUtil.destroyTempDir(storePath); } @Test diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java index dea8f503f82..170728d4b83 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java @@ -17,12 +17,22 @@ package org.apache.rocketmq.tieredstore.metrics; import io.opentelemetry.sdk.OpenTelemetrySdk; +import java.io.IOException; import org.apache.rocketmq.tieredstore.TieredMessageFetcher; +import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; +import org.junit.After; import org.junit.Test; public class TieredStoreMetricsManagerTest { + @After + public void tearDown() throws IOException { + TieredStoreTestUtil.destroyContainerManager(); + TieredStoreTestUtil.destroyMetadataStore(); + } + + @Test public void getMetricsView() { TieredStoreMetricsManager.getMetricsView(); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java index 25f4a6b6c54..254b151e64e 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java @@ -26,11 +26,11 @@ import org.junit.Assert; public class MemoryFileSegment extends TieredFileSegment { - private final ByteBuffer memStore; + protected final ByteBuffer memStore; public CompletableFuture blocker; - public static boolean checkSize = true; + protected boolean checkSize = true; public MemoryFileSegment(TieredFileSegment.FileSegmentType fileType, MessageQueue messageQueue, long baseOffset, TieredMessageStoreConfig storeConfig) { diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java new file mode 100644 index 00000000000..f7e5488da8b --- /dev/null +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tieredstore.mock; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; +import org.junit.Assert; + +public class MemoryFileSegmentWithoutCheck extends MemoryFileSegment { + + public MemoryFileSegmentWithoutCheck(FileSegmentType fileType, + MessageQueue messageQueue, long baseOffset, + TieredMessageStoreConfig storeConfig) { + super(fileType, messageQueue, baseOffset, storeConfig); + } + + @Override + public long getSize() { + return 0; + } + + @Override + public CompletableFuture commit0(TieredFileSegmentInputStream inputStream, long position, int length, + boolean append) { + try { + if (blocker != null && !blocker.get()) { + throw new IllegalStateException(); + } + } catch (InterruptedException | ExecutionException e) { + Assert.fail(e.getMessage()); + } + + byte[] buffer = new byte[1024]; + + int startPos = memStore.position(); + try { + int len; + while ((len = inputStream.read(buffer)) > 0) { + memStore.put(buffer, 0, len); + } + Assert.assertEquals(length, memStore.position() - startPos); + } catch (Exception e) { + Assert.fail(e.getMessage()); + return CompletableFuture.completedFuture(false); + } + return CompletableFuture.completedFuture(true); + } +} diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java index 0f2ee2f3796..736da0637c5 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java @@ -25,6 +25,7 @@ import java.util.UUID; import org.apache.commons.io.FileUtils; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; import org.junit.After; @@ -33,19 +34,23 @@ import org.junit.Test; public class PosixFileSegmentTest { - TieredMessageStoreConfig storeConfig; - MessageQueue mq; + private TieredMessageStoreConfig storeConfig; + private MessageQueue mq; + + private final String storePath = FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID(); @Before public void setUp() { storeConfig = new TieredMessageStoreConfig(); - storeConfig.setTieredStoreFilepath(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID()); + storeConfig.setTieredStoreFilepath(storePath); mq = new MessageQueue("OSSFileSegmentTest", "broker", 0); } @After public void tearDown() throws IOException { - FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID())); + TieredStoreTestUtil.destroyContainerManager(); + TieredStoreTestUtil.destroyMetadataStore(); + TieredStoreTestUtil.destroyTempDir(storePath); } @Test