Skip to content

Commit

Permalink
[ISSUE #6123] Fix flaky test in tiered storage (#6124)
Browse files Browse the repository at this point in the history
* fix flaky test in tiered storage

* add debug logs

* destroy metadata store and container manager after TieredMessageStoreTest

* remove debug logs
  • Loading branch information
ShadowySpirits authored Feb 21, 2023
1 parent 7049527 commit 4cc3311
Show file tree
Hide file tree
Showing 17 changed files with 251 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,17 @@ public class TieredContainerManager {
private final TieredMessageStoreConfig storeConfig;

public static TieredContainerManager getInstance(TieredMessageStoreConfig storeConfig) {
if (storeConfig == null) {
return instance;
}

if (instance == null) {
synchronized (TieredContainerManager.class) {
if (instance == null) {
try {
instance = new TieredContainerManager(storeConfig);
} catch (Exception ignored) {
} catch (Exception e) {
logger.error("TieredContainerManager#getInstance: create container manager failed", e);
}
}
}
Expand All @@ -58,6 +63,10 @@ public static TieredContainerManager getInstance(TieredMessageStoreConfig storeC
}

public static TieredIndexFile getIndexFile(TieredMessageStoreConfig storeConfig) {
if (storeConfig == null) {
return indexFile;
}

if (indexFile == null) {
synchronized (TieredContainerManager.class) {
if (indexFile == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class TieredFileQueue {
private final TieredMessageStoreConfig storeConfig;
private final TieredMetadataStore metadataStore;

private final List<TieredFileSegment> fileSegmentList = new ArrayList<>();
protected final List<TieredFileSegment> fileSegmentList = new ArrayList<>();
protected final List<TieredFileSegment> needCommitFileSegmentList = new CopyOnWriteArrayList<>();
private final ReentrantReadWriteLock fileSegmentLock = new ReentrantReadWriteLock();

Expand Down Expand Up @@ -130,7 +130,10 @@ public long getCommitMsgQueueOffset() {
}
}

private void loadFromMetadata() {
protected void loadFromMetadata() {
fileSegmentList.clear();
needCommitFileSegmentList.clear();

metadataStore.iterateFileSegment(fileType, messageQueue.getTopic(), messageQueue.getQueueId(), metadata -> {
if (metadata.getStatus() == FileSegmentMetadata.STATUS_DELETED) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,6 @@ public void createFile() {

@Override
public void destroyFile() {
if (file.exists()) {
file.delete();
}

try {
if (readFileChannel != null && readFileChannel.isOpen()) {
readFileChannel.close();
Expand All @@ -143,6 +139,10 @@ public void destroyFile() {
} catch (IOException e) {
logger.error("PosixFileSegment#destroyFile: destroy file {} failed: ", filepath, e);
}

if (file.exists()) {
file.delete();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ public static boolean isSystemTopic(final String topic) {
}

public static TieredMetadataStore getMetadataStore(TieredMessageStoreConfig storeConfig) {
if (storeConfig == null) {
return metadataStoreInstance;
}

if (metadataStoreInstance == null) {
synchronized (TieredMetadataStore.class) {
if (metadataStoreInstance == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,27 @@
import org.mockito.Mockito;

public class TieredDispatcherTest {
TieredMessageStoreConfig storeConfig;
MessageQueue mq;
TieredMetadataStore metadataStore;
private TieredMessageStoreConfig storeConfig;
private MessageQueue mq;
private TieredMetadataStore metadataStore;

private final String storePath = FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID();

@Before
public void setUp() {
MemoryFileSegment.checkSize = false;
storeConfig = new TieredMessageStoreConfig();
storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID());
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment");
storeConfig.setStorePathRootDir(storePath);
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegmentWithoutCheck");
storeConfig.setBrokerName(storeConfig.getBrokerName());
mq = new MessageQueue("TieredMessageQueueContainerTest", storeConfig.getBrokerName(), 0);
metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
}

@After
public void tearDown() throws IOException {
MemoryFileSegment.checkSize = true;
FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID()));
TieredStoreUtil.getMetadataStore(storeConfig).destroy();
TieredContainerManager.getInstance(storeConfig).cleanup();
TieredStoreTestUtil.destroyContainerManager();
TieredStoreTestUtil.destroyMetadataStore();
TieredStoreTestUtil.destroyTempDir(storePath);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import org.apache.rocketmq.tieredstore.container.TieredContainerManager;
import org.apache.rocketmq.tieredstore.container.TieredIndexFile;
import org.apache.rocketmq.tieredstore.container.TieredMessageQueueContainer;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.mock.MemoryFileSegment;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
Expand All @@ -51,30 +49,29 @@
import org.junit.Test;

public class TieredMessageFetcherTest {
TieredMessageStoreConfig storeConfig;
MessageQueue mq;
TieredMetadataStore metadataStore;
private TieredMessageStoreConfig storeConfig;
private MessageQueue mq;

private final String storePath = FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID();

@Before
public void setUp() {
MemoryFileSegment.checkSize = false;
storeConfig = new TieredMessageStoreConfig();
storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID());
storeConfig.setStorePathRootDir(storePath);
storeConfig.setBrokerName(storeConfig.getBrokerName());
storeConfig.setReadAheadCacheExpireDuration(Long.MAX_VALUE);
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment");
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegmentWithoutCheck");
storeConfig.setTieredStoreIndexFileMaxHashSlotNum(2);
storeConfig.setTieredStoreIndexFileMaxIndexNum(3);
metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
mq = new MessageQueue("TieredMessageFetcherTest", storeConfig.getBrokerName(), 0);
TieredStoreUtil.getMetadataStore(storeConfig);
}

@After
public void tearDown() throws IOException {
MemoryFileSegment.checkSize = true;
FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID()));
TieredStoreUtil.getMetadataStore(storeConfig).destroy();
TieredContainerManager.getInstance(storeConfig).cleanup();
TieredStoreTestUtil.destroyContainerManager();
TieredStoreTestUtil.destroyMetadataStore();
TieredStoreTestUtil.destroyTempDir(storePath);
}

public Triple<TieredMessageFetcher, ByteBuffer, ByteBuffer> buildFetcher() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@ public class TieredMessageStoreTest {
private Configuration configuration;
private TieredContainerManager containerManager;

private final String storePath = FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID();

@Before
public void setUp() {
storeConfig = new MessageStoreConfig();
storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID());
storeConfig.setStorePathRootDir(storePath);
mq = new MessageQueue("TieredMessageStoreTest", "broker", 0);

nextStore = Mockito.mock(DefaultMessageStore.class);
Expand Down Expand Up @@ -102,9 +104,9 @@ public void setUp() {

@After
public void tearDown() throws IOException {
FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID()));
TieredStoreUtil.getMetadataStore(store.getStoreConfig()).destroy();
TieredContainerManager.getInstance(store.getStoreConfig()).cleanup();
TieredStoreTestUtil.destroyContainerManager();
TieredStoreTestUtil.destroyMetadataStore();
TieredStoreTestUtil.destroyTempDir(storePath);
}

private void mockContainer() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tieredstore;

import java.io.File;
import java.lang.reflect.Field;
import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.tieredstore.container.TieredContainerManager;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import org.junit.Assert;

public class TieredStoreTestUtil {
public static void destroyMetadataStore() {
TieredMetadataStore metadataStore = TieredStoreUtil.getMetadataStore(null);
if (metadataStore != null) {
metadataStore.destroy();
}
try {
Field field = TieredStoreUtil.class.getDeclaredField("metadataStoreInstance");
field.setAccessible(true);
field.set(null, null);
} catch (NoSuchFieldException | IllegalAccessException e) {
Assert.fail(e.getClass().getCanonicalName() + ": " + e.getMessage());
}
}

public static void destroyContainerManager() {
TieredContainerManager containerManager = TieredContainerManager.getInstance(null);
if (containerManager != null) {
containerManager.destroy();
}
try {
Field field = TieredContainerManager.class.getDeclaredField("instance");
field.setAccessible(true);
field.set(null, null);
} catch (NoSuchFieldException | IllegalAccessException e) {
Assert.fail(e.getClass().getCanonicalName() + ": " + e.getMessage());
}
}

public static void destroyTempDir(String storePath) {
try {
FileUtils.deleteDirectory(new File(storePath));
} catch (Exception ignore) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
Expand All @@ -32,14 +33,16 @@
import org.junit.Test;

public class TieredContainerManagerTest {
TieredMessageStoreConfig storeConfig;
MessageQueue mq;
TieredMetadataStore metadataStore;
private TieredMessageStoreConfig storeConfig;
private MessageQueue mq;
private TieredMetadataStore metadataStore;

private final String storePath = FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID();

@Before
public void setUp() {
storeConfig = new TieredMessageStoreConfig();
storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID());
storeConfig.setStorePathRootDir(storePath);
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment");
storeConfig.setBrokerName(storeConfig.getBrokerName());
mq = new MessageQueue("TieredContainerManagerTest", storeConfig.getBrokerName(), 0);
Expand All @@ -48,9 +51,9 @@ public void setUp() {

@After
public void tearDown() throws IOException {
FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID()));
TieredStoreUtil.getMetadataStore(storeConfig).destroy();
TieredContainerManager.getInstance(storeConfig).cleanup();
TieredStoreTestUtil.destroyContainerManager();
TieredStoreTestUtil.destroyMetadataStore();
TieredStoreTestUtil.destroyTempDir(storePath);
}


Expand All @@ -64,7 +67,9 @@ public void testLoadAndDestroy() {
boolean load = containerManager.load();
Assert.assertTrue(load);

Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() -> containerManager.getAllMQContainer().size() == 2);
Awaitility.await()
.atMost(3, TimeUnit.SECONDS)
.until(() -> containerManager.getAllMQContainer().size() == 2);

TieredMessageQueueContainer container = containerManager.getMQContainer(mq);
Assert.assertNotNull(container);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.mock.MemoryFileSegment;
Expand All @@ -33,21 +34,23 @@
import org.junit.Test;

public class TieredFileQueueTest {
TieredMessageStoreConfig storeConfig;
MessageQueue queue;
private TieredMessageStoreConfig storeConfig;
private MessageQueue queue;

private final String storePath = FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID();

@Before
public void setUp() {
storeConfig = new TieredMessageStoreConfig();
storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID());
storeConfig.setStorePathRootDir(storePath);
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment");
queue = new MessageQueue("TieredFileQueueTest", storeConfig.getBrokerName(), 0);
}

@After
public void tearDown() throws IOException {
FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID()));
TieredStoreUtil.getMetadataStore(storeConfig).destroy();
TieredStoreTestUtil.destroyMetadataStore();
TieredStoreTestUtil.destroyTempDir(storePath);
}

@Test
Expand Down Expand Up @@ -149,7 +152,7 @@ public void testCheckFileSize() throws ClassNotFoundException, NoSuchMethodExcep
TieredFileSegment fileSegment1 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
queue, 100, storeConfig);
fileSegment1.initPosition(fileSegment1.getSize() - 100);
fileSegment1.setFull(false);
fileSegment1.setFull();
metadataStore.updateFileSegment(fileSegment1);
metadataStore.updateFileSegment(fileSegment1);

Expand Down
Loading

0 comments on commit 4cc3311

Please sign in to comment.