Skip to content

Commit

Permalink
HDDS-11152. OMDoubleBuffer error when handling snapshot's background …
Browse files Browse the repository at this point in the history
…operations (apache#7112)
  • Loading branch information
hemantk-12 authored Aug 28, 2024
1 parent 5659b7e commit 23f3e5b
Show file tree
Hide file tree
Showing 19 changed files with 183 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,7 @@ private void init() throws Exception {
conf.setBoolean(OZONE_OM_ENABLE_FILESYSTEM_PATHS, enabledFileSystemPaths);
conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, bucketLayout.name());
conf.setBoolean(OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF, forceFullSnapshotDiff);
conf.setBoolean(OZONE_OM_SNAPSHOT_DIFF_DISABLE_NATIVE_LIBS,
disableNativeDiff);
conf.setBoolean(OZONE_OM_ENABLE_FILESYSTEM_PATHS, enabledFileSystemPaths);
conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, bucketLayout.name());
conf.setBoolean(OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF, forceFullSnapshotDiff);
conf.setBoolean(OZONE_OM_SNAPSHOT_DIFF_DISABLE_NATIVE_LIBS, disableNativeDiff);
conf.setEnum(HDDS_DB_PROFILE, DBProfile.TEST);
// Enable filesystem snapshot feature for the test regardless of the default
conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true);
Expand Down Expand Up @@ -1481,10 +1477,8 @@ public void testSnapDiffCancel() throws Exception {
String toSnapshotTableKey =
SnapshotInfo.getTableKey(volumeName, bucketName, toSnapName);

UUID fromSnapshotID = ozoneManager.getOmSnapshotManager()
.getSnapshotInfo(fromSnapshotTableKey).getSnapshotId();
UUID toSnapshotID = ozoneManager.getOmSnapshotManager()
.getSnapshotInfo(toSnapshotTableKey).getSnapshotId();
UUID fromSnapshotID = SnapshotUtils.getSnapshotInfo(ozoneManager, fromSnapshotTableKey).getSnapshotId();
UUID toSnapshotID = SnapshotUtils.getSnapshotInfo(ozoneManager, toSnapshotTableKey).getSnapshotId();

// Construct SnapshotDiffJob table key.
String snapDiffJobKey = fromSnapshotID + DELIMITER + toSnapshotID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.RDBCheckpointUtils;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.TestDataUtil;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
Expand All @@ -34,20 +36,27 @@
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer;
import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE;
import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN_PROGRESS;
import static org.apache.ozone.test.LambdaTestUtils.await;
Expand All @@ -72,6 +81,8 @@ public class TestOzoneManagerHASnapshot {
public static void staticInit() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true);
conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS);
conf.setTimeDuration(OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS);

cluster = MiniOzoneCluster.newHABuilder(conf)
.setOMServiceId("om-service-test")
Expand Down Expand Up @@ -265,4 +276,97 @@ private void createFileKey(OzoneBucket bucket, String keyName)
fileKey.write(value);
}
}

/**
* This is to simulate HDDS-11152 scenario. In which a follower's doubleBuffer is lagging and accumulates purgeKey
* and purgeSnapshot in same batch.
*/
@Test
public void testKeyAndSnapshotDeletionService() throws IOException, InterruptedException, TimeoutException {
OzoneManager omLeader = cluster.getOMLeader();
OzoneManager omFollower;

if (omLeader != cluster.getOzoneManager(0)) {
omFollower = cluster.getOzoneManager(0);
} else {
omFollower = cluster.getOzoneManager(1);
}

int numKeys = 5;
List<String> keys = new ArrayList<>();
for (int i = 0; i < numKeys; i++) {
String keyName = "key-" + RandomStringUtils.randomNumeric(10);
createFileKey(ozoneBucket, keyName);
keys.add(keyName);
}

// Stop the key deletion service so that deleted keys get trapped in the snapshots.
omLeader.getKeyManager().getDeletingService().suspend();
// Stop the snapshot deletion service so that deleted keys get trapped in the snapshots.
omLeader.getKeyManager().getSnapshotDeletingService().suspend();

// Delete half of the keys
for (int i = 0; i < numKeys / 2; i++) {
ozoneBucket.deleteKey(keys.get(i));
}

String snapshotName = "snap-" + RandomStringUtils.randomNumeric(10);
createSnapshot(volumeName, bucketName, snapshotName);

store.deleteSnapshot(volumeName, bucketName, snapshotName);

// Pause double buffer on follower node to accumulate all the key purge, snapshot delete and purge transactions.
omFollower.getOmRatisServer().getOmStateMachine().getOzoneManagerDoubleBuffer().stopDaemon();

long keyDeleteServiceCount = omLeader.getKeyManager().getDeletingService().getRunCount().get();
omLeader.getKeyManager().getDeletingService().resume();

GenericTestUtils.waitFor(
() -> omLeader.getKeyManager().getDeletingService().getRunCount().get() > keyDeleteServiceCount,
1000, 60000);

long snapshotDeleteServiceCount = omLeader.getKeyManager().getSnapshotDeletingService().getRunCount().get();
omLeader.getKeyManager().getSnapshotDeletingService().resume();

GenericTestUtils.waitFor(
() -> omLeader.getKeyManager().getSnapshotDeletingService().getRunCount().get() > snapshotDeleteServiceCount,
1000, 60000);

String tableKey = SnapshotInfo.getTableKey(volumeName, bucketName, snapshotName);
checkSnapshotIsPurgedFromDB(omLeader, tableKey);

// Resume the DoubleBuffer and flush the pending transactions.
OzoneManagerDoubleBuffer omDoubleBuffer =
omFollower.getOmRatisServer().getOmStateMachine().getOzoneManagerDoubleBuffer();
omDoubleBuffer.resume();
CompletableFuture.supplyAsync(() -> {
omDoubleBuffer.flushTransactions();
return null;
});
omDoubleBuffer.awaitFlush();
checkSnapshotIsPurgedFromDB(omFollower, tableKey);
}

private void createSnapshot(String volName, String buckName, String snapName) throws IOException {
store.createSnapshot(volName, buckName, snapName);

String tableKey = SnapshotInfo.getTableKey(volName, buckName, snapName);
SnapshotInfo snapshotInfo = SnapshotUtils.getSnapshotInfo(cluster.getOMLeader(), tableKey);
String fileName = getSnapshotPath(cluster.getOMLeader().getConfiguration(), snapshotInfo);
File snapshotDir = new File(fileName);
if (!RDBCheckpointUtils.waitForCheckpointDirectoryExist(snapshotDir)) {
throw new IOException("Snapshot directory doesn't exist");
}
}

private void checkSnapshotIsPurgedFromDB(OzoneManager ozoneManager, String snapshotTableKey)
throws InterruptedException, TimeoutException {
GenericTestUtils.waitFor(() -> {
try {
return ozoneManager.getMetadataManager().getSnapshotInfoTable().get(snapshotTableKey) == null;
} catch (IOException e) {
throw new RuntimeException(e);
}
}, 1000, 60000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_REPORT_MAX_PAGE_SIZE;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_REPORT_MAX_PAGE_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_KEY_NAME;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_SNAPSHOT_ERROR;
import static org.apache.hadoop.ozone.om.snapshot.SnapshotDiffManager.getSnapshotRootPath;
import static org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.checkSnapshotActive;
import static org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.dropColumnFamilyHandle;
Expand Down Expand Up @@ -674,19 +675,38 @@ private ReferenceCounted<OmSnapshot> getSnapshot(String snapshotTableKey, boolea
}

/**
* Returns true if the snapshot is in given status.
* @param key DB snapshot table key
* @param status SnapshotStatus
* @return true if the snapshot is in given status, false otherwise
* Returns OmSnapshot object and skips active check.
* This should only be used for API calls initiated by background service e.g. purgeKeys, purgeSnapshot,
* snapshotMoveDeletedKeys, and SetSnapshotProperty.
*/
public boolean isSnapshotStatus(String key,
SnapshotInfo.SnapshotStatus status)
throws IOException {
return getSnapshotInfo(key).getSnapshotStatus().equals(status);
public ReferenceCounted<OmSnapshot> getSnapshot(UUID snapshotId) throws IOException {
return snapshotCache.get(snapshotId);
}

public SnapshotInfo getSnapshotInfo(String key) throws IOException {
return SnapshotUtils.getSnapshotInfo(ozoneManager, key);
/**
* Returns snapshotInfo from cache if it is present in cache, otherwise it checks RocksDB and return value from there.
* #################################################
* NOTE: THIS SHOULD BE USED BY SNAPSHOT CACHE ONLY.
* #################################################
* Sometimes, the follower OM node may be lagging that it gets purgeKeys or snapshotMoveDeletedKeys from a Snapshot,
* and purgeSnapshot for the same Snapshot one after another. And purgeSnapshot's validateAndUpdateCache gets
* executed before doubleBuffer flushes purgeKeys or snapshotMoveDeletedKeys from that Snapshot.
* This should not be a case on the leader node because SnapshotDeletingService checks that deletedTable and
* deletedDirectoryTable in DB don't have entries for the bucket before it sends a purgeSnapshot on a snapshot.
* If that happens, and we just look into the cache, the addToBatch operation will fail when it tries to open
* the DB and purgeKeys from the Snapshot because snapshot is already purged from the SnapshotInfoTable cache.
* Hence, it is needed to look into the table to make sure that snapshot exists somewhere either in cache or in DB.
*/
private SnapshotInfo getSnapshotInfo(String snapshotKey) throws IOException {
SnapshotInfo snapshotInfo = ozoneManager.getMetadataManager().getSnapshotInfoTable().get(snapshotKey);

if (snapshotInfo == null) {
snapshotInfo = ozoneManager.getMetadataManager().getSnapshotInfoTable().getSkipCache(snapshotKey);
}
if (snapshotInfo == null) {
throw new OMException("Snapshot '" + snapshotKey + "' is not found.", INVALID_SNAPSHOT_ERROR);
}
return snapshotInfo;
}

public static String getSnapshotPrefix(String snapshotName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,13 +362,16 @@ public synchronized void updateSnapshot(SnapshotInfo snapshotInfo) {
public synchronized boolean deleteSnapshot(SnapshotInfo snapshotInfo)
throws IOException {
validateSnapshotChain();
boolean status = deleteSnapshotGlobal(snapshotInfo.getSnapshotId()) &&
deleteSnapshotPath(snapshotInfo.getSnapshotPath(),
snapshotInfo.getSnapshotId());
if (status) {
snapshotIdToTableKey.remove(snapshotInfo.getSnapshotId());
}
return status;
return deleteSnapshotGlobal(snapshotInfo.getSnapshotId()) &&
deleteSnapshotPath(snapshotInfo.getSnapshotPath(), snapshotInfo.getSnapshotId());
}

/**
* Remove the snapshot from snapshotIdToSnapshotTableKey map.
*/
public synchronized void removeFromSnapshotIdToTable(UUID snapshotId) throws IOException {
validateSnapshotChain();
snapshotIdToTableKey.remove(snapshotId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OzoneManager;
Expand Down Expand Up @@ -74,9 +75,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
OMMetrics omMetrics = ozoneManager.getMetrics();
try {
if (fromSnapshot != null) {
fromSnapshotInfo = ozoneManager.getMetadataManager()
.getSnapshotInfoTable()
.get(fromSnapshot);
fromSnapshotInfo = SnapshotUtils.getSnapshotInfo(ozoneManager, fromSnapshot);
}

for (OzoneManagerProtocolProtos.PurgePathRequest path : purgeRequests) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
Expand Down Expand Up @@ -74,14 +75,12 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
try {
SnapshotInfo fromSnapshotInfo = null;
if (fromSnapshot != null) {
fromSnapshotInfo = ozoneManager.getMetadataManager()
.getSnapshotInfoTable().get(fromSnapshot);
fromSnapshotInfo = SnapshotUtils.getSnapshotInfo(ozoneManager, fromSnapshot);
}
omClientResponse = new OMKeyPurgeResponse(omResponse.build(),
keysToBePurgedList, fromSnapshotInfo, keysToUpdateList);
} catch (IOException ex) {
omClientResponse = new OMKeyPurgeResponse(
createErrorOMResponse(omResponse, ex));
omClientResponse = new OMKeyPurgeResponse(createErrorOMResponse(omResponse, ex));
}

return omClientResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.SnapshotChainManager;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
Expand Down Expand Up @@ -60,7 +59,6 @@ public OMSnapshotMoveDeletedKeysRequest(OMRequest omRequest) {
@Override
@DisallowedUntilLayoutVersion(FILESYSTEM_SNAPSHOT)
public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIndex termIndex) {
OmSnapshotManager omSnapshotManager = ozoneManager.getOmSnapshotManager();
OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl)
ozoneManager.getMetadataManager();
SnapshotChainManager snapshotChainManager =
Expand All @@ -78,8 +76,10 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
OzoneManagerProtocolProtos.OMResponse.Builder omResponse =
OmResponseUtil.getOMResponseBuilder(getOmRequest());
try {
nextSnapshot = SnapshotUtils.getNextActiveSnapshot(fromSnapshot,
snapshotChainManager, omSnapshotManager);
// Check the snapshot exists.
SnapshotUtils.getSnapshotInfo(ozoneManager, fromSnapshot.getTableKey());

nextSnapshot = SnapshotUtils.getNextActiveSnapshot(fromSnapshot, snapshotChainManager, ozoneManager);

// Get next non-deleted snapshot.
List<SnapshotMoveKeyInfos> nextDBKeysList =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.SnapshotChainManager;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
Expand Down Expand Up @@ -72,7 +71,6 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn

final long trxnLogIndex = termIndex.getIndex();

OmSnapshotManager omSnapshotManager = ozoneManager.getOmSnapshotManager();
OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl)
ozoneManager.getMetadataManager();
SnapshotChainManager snapshotChainManager =
Expand Down Expand Up @@ -106,14 +104,12 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
}

SnapshotInfo nextSnapshot =
SnapshotUtils.getNextActiveSnapshot(fromSnapshot, snapshotChainManager, omSnapshotManager);
SnapshotUtils.getNextActiveSnapshot(fromSnapshot, snapshotChainManager, ozoneManager);

// Step 1: Update the deep clean flag for the next active snapshot
updateSnapshotInfoAndCache(nextSnapshot, omMetadataManager, trxnLogIndex);
// Step 2: Update the snapshot chain.
updateSnapshotChainAndCache(omMetadataManager, fromSnapshot, trxnLogIndex);
// Remove and close snapshot's RocksDB instance from SnapshotCache.
omSnapshotManager.invalidateCacheEntry(fromSnapshot.getSnapshotId());
// Step 3: Purge the snapshot from SnapshotInfoTable cache.
omMetadataManager.getSnapshotInfoTable()
.addCacheEntry(new CacheKey<>(fromSnapshot.getTableKey()), CacheValue.get(trxnLogIndex));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,7 @@ public void addToDBBatch(OMMetadataManager metadataManager,
.getOzoneManager().getOmSnapshotManager();

try (ReferenceCounted<OmSnapshot>
rcFromSnapshotInfo = omSnapshotManager.getSnapshot(
fromSnapshotInfo.getVolumeName(),
fromSnapshotInfo.getBucketName(),
fromSnapshotInfo.getName())) {
rcFromSnapshotInfo = omSnapshotManager.getSnapshot(fromSnapshotInfo.getSnapshotId())) {
OmSnapshot fromSnapshot = rcFromSnapshotInfo.get();
DBStore fromSnapshotStore = fromSnapshot.getMetadataManager()
.getStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,13 @@ public void addToDBBatch(OMMetadataManager omMetadataManager,

if (fromSnapshot != null) {
OmSnapshotManager omSnapshotManager =
((OmMetadataManagerImpl) omMetadataManager)
.getOzoneManager().getOmSnapshotManager();
((OmMetadataManagerImpl) omMetadataManager).getOzoneManager().getOmSnapshotManager();

try (ReferenceCounted<OmSnapshot> rcOmFromSnapshot =
omSnapshotManager.getSnapshot(
fromSnapshot.getVolumeName(),
fromSnapshot.getBucketName(),
fromSnapshot.getName())) {
omSnapshotManager.getSnapshot(fromSnapshot.getSnapshotId())) {

OmSnapshot fromOmSnapshot = rcOmFromSnapshot.get();
DBStore fromSnapshotStore =
fromOmSnapshot.getMetadataManager().getStore();
DBStore fromSnapshotStore = fromOmSnapshot.getMetadataManager().getStore();
// Init Batch Operation for snapshot db.
try (BatchOperation writeBatch =
fromSnapshotStore.initBatchOperation()) {
Expand Down
Loading

0 comments on commit 23f3e5b

Please sign in to comment.