From 7a460803f5093e300dc30ab4130c77f720fd956b Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran <47532440+swamirishi@users.noreply.github.com> Date: Fri, 13 Dec 2024 08:53:00 -0800 Subject: [PATCH] HDDS-11914. Snapshot diff should not filter SST Files based by reading SST file reader (#7563) --- .../hdds/utils/db/managed/ManagedRocksDB.java | 13 + .../compaction/log/CompactionFileInfo.java | 12 + .../ozone/rocksdiff/CompactionNode.java | 7 + .../rocksdiff/RocksDBCheckpointDiffer.java | 137 ++++------ .../ozone/rocksdiff/RocksDiffUtils.java | 73 +++-- .../TestRocksDBCheckpointDiffer.java | 250 +++++++++++------- .../ozone/rocksdiff/TestRocksDiffUtils.java | 121 +++++++++ .../om/snapshot/SnapshotDiffManager.java | 25 +- .../om/snapshot/TestSnapshotDiffManager.java | 31 ++- 9 files changed, 429 insertions(+), 240 deletions(-) diff --git a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java index 5a5a577351b..ead43e9aaf8 100644 --- a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java +++ b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hdds.utils.db.managed; +import org.apache.commons.io.FilenameUtils; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.DBOptions; @@ -31,6 +32,8 @@ import java.io.IOException; import java.time.Duration; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; /** * Managed {@link RocksDB}. @@ -102,4 +105,14 @@ public void deleteFile(LiveFileMetaData fileToBeDeleted) File file = new File(fileToBeDeleted.path(), fileToBeDeleted.fileName()); ManagedRocksObjectUtils.waitForFileDelete(file, Duration.ofSeconds(60)); } + + public static Map getLiveMetadataForSSTFiles(RocksDB db) { + return db.getLiveFilesMetaData().stream().collect( + Collectors.toMap(liveFileMetaData -> FilenameUtils.getBaseName(liveFileMetaData.fileName()), + liveFileMetaData -> liveFileMetaData)); + } + + public Map getLiveMetadataForSSTFiles() { + return getLiveMetadataForSSTFiles(this.get()); + } } diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionFileInfo.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionFileInfo.java index fa0d1f5491d..2d67d5003ae 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionFileInfo.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionFileInfo.java @@ -20,7 +20,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.StringUtils; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.rocksdb.LiveFileMetaData; import java.util.Objects; @@ -128,6 +130,16 @@ public Builder setColumnFamily(String columnFamily) { return this; } + public Builder setValues(LiveFileMetaData fileMetaData) { + if (fileMetaData != null) { + String columnFamilyName = StringUtils.bytes2String(fileMetaData.columnFamilyName()); + String startRangeValue = StringUtils.bytes2String(fileMetaData.smallestKey()); + String endRangeValue = StringUtils.bytes2String(fileMetaData.largestKey()); + this.setColumnFamily(columnFamilyName).setStartRange(startRangeValue).setEndRange(endRangeValue); + } + return this; + } + public CompactionFileInfo build() { if ((startRange != null || endRange != null || columnFamily != null) && (startRange == null || endRange == null || columnFamily == null)) { diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionNode.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionNode.java index f8133e6b92f..45a21970966 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionNode.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionNode.java @@ -17,6 +17,8 @@ */ package org.apache.ozone.rocksdiff; +import org.apache.ozone.compaction.log.CompactionFileInfo; + /** * Node in the compaction DAG that represents an SST file. */ @@ -48,6 +50,11 @@ public CompactionNode(String file, long numKeys, long seqNum, this.columnFamily = columnFamily; } + public CompactionNode(CompactionFileInfo compactionFileInfo) { + this(compactionFileInfo.getFileName(), -1, -1, compactionFileInfo.getStartKey(), + compactionFileInfo.getEndKey(), compactionFileInfo.getColumnFamily()); + } + @Override public String toString() { return String.format("Node{%s}", fileName); diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java index 548623f2594..930c2a269b5 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java @@ -26,6 +26,7 @@ import java.io.FileNotFoundException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentMap; @@ -34,6 +35,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.StringUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; @@ -42,11 +44,9 @@ import org.apache.hadoop.hdds.utils.Scheduler; import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions; -import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB; import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReader; -import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReaderIterator; import org.apache.ozone.compaction.log.CompactionFileInfo; import org.apache.ozone.compaction.log.CompactionLogEntry; import org.apache.ozone.rocksdb.util.RdbUtil; @@ -74,7 +74,6 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -174,6 +173,7 @@ public class RocksDBCheckpointDiffer implements AutoCloseable, private ColumnFamilyHandle compactionLogTableCFHandle; private ManagedRocksDB activeRocksDB; + private ConcurrentMap inflightCompactions; /** * For snapshot diff calculation we only need to track following column @@ -245,6 +245,7 @@ public class RocksDBCheckpointDiffer implements AutoCloseable, } else { this.scheduler = null; } + this.inflightCompactions = new ConcurrentHashMap<>(); } private String createCompactionLogDir(String metadataDirName, @@ -463,7 +464,7 @@ public void onCompactionBegin(RocksDB db, return; } } - + inflightCompactions.putAll(toFileInfoList(compactionJobInfo.inputFiles(), db)); for (String file : compactionJobInfo.inputFiles()) { createLink(Paths.get(sstBackupDir, new File(file).getName()), Paths.get(file)); @@ -484,17 +485,21 @@ public void onCompactionCompleted(RocksDB db, } long trxId = db.getLatestSequenceNumber(); - + Map inputFileCompactions = toFileInfoList(compactionJobInfo.inputFiles(), db); CompactionLogEntry.Builder builder; - try (ManagedOptions options = new ManagedOptions(); - ManagedReadOptions readOptions = new ManagedReadOptions()) { - builder = new CompactionLogEntry.Builder(trxId, - System.currentTimeMillis(), - toFileInfoList(compactionJobInfo.inputFiles(), options, - readOptions), - toFileInfoList(compactionJobInfo.outputFiles(), options, - readOptions)); - } + builder = new CompactionLogEntry.Builder(trxId, + System.currentTimeMillis(), + inputFileCompactions.keySet().stream() + .map(inputFile -> { + if (!inflightCompactions.containsKey(inputFile)) { + LOG.warn("Input file not found in inflightCompactionsMap : {} which should have been added on " + + "compactionBeginListener.", + inputFile); + } + return inflightCompactions.getOrDefault(inputFile, inputFileCompactions.get(inputFile)); + }) + .collect(Collectors.toList()), + new ArrayList<>(toFileInfoList(compactionJobInfo.outputFiles(), db).values())); if (LOG.isDebugEnabled()) { builder = builder.setCompactionReason( @@ -502,7 +507,6 @@ public void onCompactionCompleted(RocksDB db, } CompactionLogEntry compactionLogEntry = builder.build(); - synchronized (this) { if (closed) { return; @@ -521,6 +525,9 @@ public void onCompactionCompleted(RocksDB db, populateCompactionDAG(compactionLogEntry.getInputFileInfoList(), compactionLogEntry.getOutputFileInfoList(), compactionLogEntry.getDbSequenceNumber()); + for (String inputFile : inputFileCompactions.keySet()) { + inflightCompactions.remove(inputFile); + } } } }; @@ -789,7 +796,7 @@ private void preconditionChecksForLoadAllCompactionLogs() { * and appends the extension '.sst'. */ private String getSSTFullPath(String sstFilenameWithoutExtension, - String dbPath) { + String... dbPaths) { // Try to locate the SST in the backup dir first final Path sstPathInBackupDir = Paths.get(sstBackupDir, @@ -800,11 +807,13 @@ private String getSSTFullPath(String sstFilenameWithoutExtension, // SST file does not exist in the SST backup dir, this means the SST file // has not gone through any compactions yet and is only available in the - // src DB directory - final Path sstPathInDBDir = Paths.get(dbPath, - sstFilenameWithoutExtension + SST_FILE_EXTENSION); - if (Files.exists(sstPathInDBDir)) { - return sstPathInDBDir.toString(); + // src DB directory or destDB directory + for (String dbPath : dbPaths) { + final Path sstPathInDBDir = Paths.get(dbPath, + sstFilenameWithoutExtension + SST_FILE_EXTENSION); + if (Files.exists(sstPathInDBDir)) { + return sstPathInDBDir.toString(); + } } // TODO: More graceful error handling? @@ -825,18 +834,16 @@ private String getSSTFullPath(String sstFilenameWithoutExtension, * e.g. ["/path/to/sstBackupDir/000050.sst", * "/path/to/sstBackupDir/000060.sst"] */ - public synchronized Optional> getSSTDiffListWithFullPath( - DifferSnapshotInfo src, - DifferSnapshotInfo dest, - String sstFilesDirForSnapDiffJob - ) throws IOException { + public synchronized Optional> getSSTDiffListWithFullPath(DifferSnapshotInfo src, + DifferSnapshotInfo dest, + String sstFilesDirForSnapDiffJob) { Optional> sstDiffList = getSSTDiffList(src, dest); return sstDiffList.map(diffList -> diffList.stream() .map( sst -> { - String sstFullPath = getSSTFullPath(sst, src.getDbPath()); + String sstFullPath = getSSTFullPath(sst, src.getDbPath(), dest.getDbPath()); Path link = Paths.get(sstFilesDirForSnapDiffJob, sst + SST_FILE_EXTENSION); Path srcFile = Paths.get(sstFullPath); @@ -858,7 +865,7 @@ public synchronized Optional> getSSTDiffListWithFullPath( * @return A list of SST files without extension. e.g. ["000050", "000060"] */ public synchronized Optional> getSSTDiffList(DifferSnapshotInfo src, - DifferSnapshotInfo dest) throws IOException { + DifferSnapshotInfo dest) { // TODO: Reject or swap if dest is taken after src, once snapshot chain // integration is done. @@ -900,29 +907,12 @@ public synchronized Optional> getSSTDiffList(DifferSnapshotInfo src } if (src.getTablePrefixes() != null && !src.getTablePrefixes().isEmpty()) { - filterRelevantSstFilesFullPath(fwdDAGDifferentFiles, - src.getTablePrefixes()); + RocksDiffUtils.filterRelevantSstFiles(fwdDAGDifferentFiles, src.getTablePrefixes(), compactionNodeMap, + src.getRocksDB(), dest.getRocksDB()); } return Optional.of(new ArrayList<>(fwdDAGDifferentFiles)); } - /** - * construct absolute sst file path first and - * filter the files. - */ - public void filterRelevantSstFilesFullPath(Set inputFiles, - Map tableToPrefixMap) throws IOException { - for (Iterator fileIterator = - inputFiles.iterator(); fileIterator.hasNext();) { - String filename = fileIterator.next(); - String filepath = getAbsoluteSstFilePath(filename); - if (!RocksDiffUtils.doesSstFileContainKeyRange(filepath, - tableToPrefixMap)) { - fileIterator.remove(); - } - } - } - /** * Core getSSTDiffList logic. *

@@ -1476,55 +1466,22 @@ public void pngPrintMutableGraph(String filePath, GraphType graphType) graph.generateImage(filePath); } - private List toFileInfoList(List sstFiles, - ManagedOptions options, - ManagedReadOptions readOptions - ) { + private Map toFileInfoList(List sstFiles, RocksDB db) { if (CollectionUtils.isEmpty(sstFiles)) { - return Collections.emptyList(); + return Collections.emptyMap(); } - - List response = new ArrayList<>(); - + Map liveFileMetaDataMap = ManagedRocksDB.getLiveMetadataForSSTFiles(db); + Map response = new HashMap<>(); for (String sstFile : sstFiles) { - CompactionFileInfo fileInfo = toFileInfo(sstFile, options, readOptions); - response.add(fileInfo); + String fileName = FilenameUtils.getBaseName(sstFile); + CompactionFileInfo fileInfo = + new CompactionFileInfo.Builder(fileName).setValues(liveFileMetaDataMap.get(fileName)).build(); + response.put(sstFile, fileInfo); } return response; } - private CompactionFileInfo toFileInfo(String sstFile, - ManagedOptions options, - ManagedReadOptions readOptions) { - final int fileNameOffset = sstFile.lastIndexOf("/") + 1; - String fileName = sstFile.substring(fileNameOffset, - sstFile.length() - SST_FILE_EXTENSION_LENGTH); - CompactionFileInfo.Builder fileInfoBuilder = - new CompactionFileInfo.Builder(fileName); - - try (ManagedSstFileReader fileReader = new ManagedSstFileReader(options)) { - fileReader.open(sstFile); - String columnFamily = StringUtils.bytes2String(fileReader.getTableProperties().getColumnFamilyName()); - try (ManagedSstFileReaderIterator iterator = - ManagedSstFileReaderIterator.managed(fileReader.newIterator(readOptions))) { - iterator.get().seekToFirst(); - String startKey = StringUtils.bytes2String(iterator.get().key()); - iterator.get().seekToLast(); - String endKey = StringUtils.bytes2String(iterator.get().key()); - fileInfoBuilder.setStartRange(startKey) - .setEndRange(endKey) - .setColumnFamily(columnFamily); - } - } catch (RocksDBException rocksDBException) { - // Ideally it should not happen. If it does just log the exception. - // And let the compaction complete without the exception. - // Throwing exception in compaction listener could fail the RocksDB. - // In case of exception, compaction node will be missing start key, - // end key and column family. And during diff calculation it will - // continue the traversal as it was before HDDS-8940. - LOG.warn("Failed to read SST file: {}.", sstFile, rocksDBException); - } - return fileInfoBuilder.build(); + ConcurrentMap getInflightCompactions() { + return inflightCompactions; } - } diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java index 576b932891b..6f044e165a0 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java @@ -19,22 +19,20 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.collections.MapUtils; +import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions; -import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions; -import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReader; -import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReaderIterator; -import org.rocksdb.TableProperties; -import org.rocksdb.RocksDBException; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB; +import org.apache.ozone.compaction.log.CompactionFileInfo; +import org.rocksdb.LiveFileMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; /** @@ -75,46 +73,39 @@ public static String constructBucketKey(String keyName) { } public static void filterRelevantSstFiles(Set inputFiles, - Map tableToPrefixMap) throws IOException { + Map tableToPrefixMap, + ManagedRocksDB... dbs) { + filterRelevantSstFiles(inputFiles, tableToPrefixMap, Collections.emptyMap(), dbs); + } + + /** + * Filter sst files based on prefixes. + */ + public static void filterRelevantSstFiles(Set inputFiles, + Map tableToPrefixMap, + Map preExistingCompactionNodes, + ManagedRocksDB... dbs) { + Map liveFileMetaDataMap = new HashMap<>(); + int dbIdx = 0; for (Iterator fileIterator = inputFiles.iterator(); fileIterator.hasNext();) { - String filepath = fileIterator.next(); - if (!RocksDiffUtils.doesSstFileContainKeyRange(filepath, - tableToPrefixMap)) { - fileIterator.remove(); + String filename = FilenameUtils.getBaseName(fileIterator.next()); + while (!preExistingCompactionNodes.containsKey(filename) && !liveFileMetaDataMap.containsKey(filename) + && dbIdx < dbs.length) { + liveFileMetaDataMap.putAll(dbs[dbIdx].getLiveMetadataForSSTFiles()); + dbIdx += 1; } - } - } - - public static boolean doesSstFileContainKeyRange(String filepath, - Map tableToPrefixMap) throws IOException { - - try ( - ManagedOptions options = new ManagedOptions(); - ManagedSstFileReader sstFileReader = new ManagedSstFileReader(options)) { - sstFileReader.open(filepath); - TableProperties properties = sstFileReader.getTableProperties(); - String tableName = new String(properties.getColumnFamilyName(), UTF_8); - if (tableToPrefixMap.containsKey(tableName)) { - String prefix = tableToPrefixMap.get(tableName); - - try ( - ManagedReadOptions readOptions = new ManagedReadOptions(); - ManagedSstFileReaderIterator iterator = ManagedSstFileReaderIterator.managed( - sstFileReader.newIterator(readOptions))) { - iterator.get().seek(prefix.getBytes(UTF_8)); - String seekResultKey = new String(iterator.get().key(), UTF_8); - return seekResultKey.startsWith(prefix); - } + CompactionNode compactionNode = preExistingCompactionNodes.get(filename); + if (compactionNode == null) { + compactionNode = new CompactionNode(new CompactionFileInfo.Builder(filename) + .setValues(liveFileMetaDataMap.get(filename)).build()); + } + if (shouldSkipNode(compactionNode, tableToPrefixMap)) { + fileIterator.remove(); } - return false; - } catch (RocksDBException e) { - LOG.error("Failed to read SST File ", e); - throw new IOException(e); } } - @VisibleForTesting static boolean shouldSkipNode(CompactionNode node, Map columnFamilyToPrefixMap) { diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java index 438a0e74f23..4f04abb8b5b 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java @@ -21,6 +21,7 @@ import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.MINUTES; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.graph.GraphBuilder; import java.io.File; @@ -49,10 +50,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import com.google.common.graph.MutableGraph; +import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.StringUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; @@ -107,6 +110,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -237,26 +241,26 @@ public void cleanUp() { } } - private static List getPrunedCompactionEntries(boolean prune) { + private static List getPrunedCompactionEntries(boolean prune, Map metadata) { List entries = new ArrayList<>(); if (!prune) { entries.add(createCompactionEntry(1, now(), Arrays.asList("1", "2"), - Arrays.asList("4", "5"))); + Arrays.asList("4", "5"), metadata)); } entries.addAll(Arrays.asList(createCompactionEntry(2, now(), Arrays.asList("4", "5"), - Collections.singletonList("10")), + Collections.singletonList("10"), metadata), createCompactionEntry(3, now(), Arrays.asList("3", "13", "14"), - Arrays.asList("6", "7")), + Arrays.asList("6", "7"), metadata), createCompactionEntry(4, now(), Arrays.asList("6", "7"), - Collections.singletonList("11")))); + Collections.singletonList("11"), metadata))); return entries; } @@ -342,10 +346,12 @@ private static Stream casesGetSSTDiffListWithoutDB() { "/path/to/dbcp3", UUID.randomUUID(), 17975L, null, Mockito.mock(ManagedRocksDB.class)); DifferSnapshotInfo snapshotInfo4 = new DifferSnapshotInfo( "/path/to/dbcp4", UUID.randomUUID(), 18000L, null, Mockito.mock(ManagedRocksDB.class)); + + Map prefixMap = ImmutableMap.of("col1", "c", "col2", "d"); DifferSnapshotInfo snapshotInfo5 = new DifferSnapshotInfo( - "/path/to/dbcp2", UUID.randomUUID(), 0L, null, Mockito.mock(ManagedRocksDB.class)); + "/path/to/dbcp2", UUID.randomUUID(), 0L, prefixMap, Mockito.mock(ManagedRocksDB.class)); DifferSnapshotInfo snapshotInfo6 = new DifferSnapshotInfo( - "/path/to/dbcp2", UUID.randomUUID(), 100L, null, Mockito.mock(ManagedRocksDB.class)); + "/path/to/dbcp2", UUID.randomUUID(), 100L, prefixMap, Mockito.mock(ManagedRocksDB.class)); Set snapshotSstFiles1 = ImmutableSet.of("000059", "000053"); Set snapshotSstFiles2 = ImmutableSet.of("000088", "000059", @@ -377,7 +383,7 @@ private static Stream casesGetSSTDiffListWithoutDB() { "000095"), ImmutableSet.of("000066", "000105", "000080", "000087", "000073", "000095"), - false), + false, Collections.emptyMap()), Arguments.of("Test 2: Compaction log file crafted input: " + "One source ('to' snapshot) SST file is never compacted " + "(newly flushed)", @@ -390,7 +396,7 @@ private static Stream casesGetSSTDiffListWithoutDB() { ImmutableSet.of("000088", "000105", "000059", "000053", "000095"), ImmutableSet.of("000108"), ImmutableSet.of("000108"), - false), + false, Collections.emptyMap()), Arguments.of("Test 3: Compaction log file crafted input: " + "Same SST files found during SST expansion", compactionLog, @@ -402,7 +408,7 @@ private static Stream casesGetSSTDiffListWithoutDB() { ImmutableSet.of("000066", "000059", "000053"), ImmutableSet.of("000080", "000087", "000073", "000095"), ImmutableSet.of("000080", "000087", "000073", "000095"), - false), + false, Collections.emptyMap()), Arguments.of("Test 4: Compaction log file crafted input: " + "Skipping known processed SST.", compactionLog, @@ -414,7 +420,7 @@ private static Stream casesGetSSTDiffListWithoutDB() { Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), - true), + true, Collections.emptyMap()), Arguments.of("Test 5: Compaction log file hit snapshot" + " generation early exit condition", compactionLog, @@ -426,7 +432,7 @@ private static Stream casesGetSSTDiffListWithoutDB() { ImmutableSet.of("000059", "000053"), ImmutableSet.of("000066", "000080", "000087", "000073", "000062"), ImmutableSet.of("000066", "000080", "000087", "000073", "000062"), - false), + false, Collections.emptyMap()), Arguments.of("Test 6: Compaction log table regular case. " + "Expands expandable SSTs in the initial diff.", null, @@ -440,7 +446,7 @@ private static Stream casesGetSSTDiffListWithoutDB() { "000095"), ImmutableSet.of("000066", "000105", "000080", "000087", "000073", "000095"), - false), + false, Collections.emptyMap()), Arguments.of("Test 7: Compaction log table crafted input: " + "One source ('to' snapshot) SST file is never compacted " + "(newly flushed)", @@ -453,7 +459,7 @@ private static Stream casesGetSSTDiffListWithoutDB() { ImmutableSet.of("000088", "000105", "000059", "000053", "000095"), ImmutableSet.of("000108"), ImmutableSet.of("000108"), - false), + false, Collections.emptyMap()), Arguments.of("Test 8: Compaction log table crafted input: " + "Same SST files found during SST expansion", null, @@ -465,7 +471,7 @@ private static Stream casesGetSSTDiffListWithoutDB() { ImmutableSet.of("000066", "000059", "000053"), ImmutableSet.of("000080", "000087", "000073", "000095"), ImmutableSet.of("000080", "000087", "000073", "000095"), - false), + false, Collections.emptyMap()), Arguments.of("Test 9: Compaction log table crafted input: " + "Skipping known processed SST.", null, @@ -477,7 +483,7 @@ private static Stream casesGetSSTDiffListWithoutDB() { Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), - true), + true, Collections.emptyMap()), Arguments.of("Test 10: Compaction log table hit snapshot " + "generation early exit condition", null, @@ -489,11 +495,11 @@ private static Stream casesGetSSTDiffListWithoutDB() { ImmutableSet.of("000059", "000053"), ImmutableSet.of("000066", "000080", "000087", "000073", "000062"), ImmutableSet.of("000066", "000080", "000087", "000073", "000062"), - false), + false, Collections.emptyMap()), Arguments.of("Test 11: Older Compaction log got pruned and source snapshot delta files would be " + "unreachable", null, - getPrunedCompactionEntries(false), + getPrunedCompactionEntries(false, Collections.emptyMap()), snapshotInfo6, snapshotInfo5, ImmutableSet.of("10", "11", "8", "9", "12"), @@ -501,11 +507,11 @@ private static Stream casesGetSSTDiffListWithoutDB() { ImmutableSet.of("1", "3", "13", "14"), ImmutableSet.of("2", "8", "9", "12"), ImmutableSet.of("2", "8", "9", "12"), - false), + false, Collections.emptyMap()), Arguments.of("Test 12: Older Compaction log got pruned and source snapshot delta files would be " + "unreachable", null, - getPrunedCompactionEntries(true), + getPrunedCompactionEntries(true, Collections.emptyMap()), snapshotInfo6, snapshotInfo5, ImmutableSet.of("10", "11", "8", "9", "12"), @@ -513,7 +519,38 @@ private static Stream casesGetSSTDiffListWithoutDB() { ImmutableSet.of("3", "13", "14"), ImmutableSet.of("4", "5", "8", "9", "12"), null, - false) + false, Collections.emptyMap()), + Arguments.of("Test 13: Compaction log to test filtering logic based on range and column family", + null, + getPrunedCompactionEntries(false, + new HashMap() {{ + put("1", new String[]{"a", "c", "col1"}); + put("3", new String[]{"a", "d", "col2"}); + put("13", new String[]{"a", "c", "col13"}); + put("14", new String[]{"a", "c", "col1"}); + put("2", new String[]{"a", "c", "col1"}); + put("4", new String[]{"a", "b", "col1"}); + put("5", new String[]{"b", "b", "col1"}); + put("10", new String[]{"a", "b", "col1"}); + put("8", new String[]{"a", "b", "col1"}); + put("6", new String[]{"a", "z", "col13"}); + put("7", new String[]{"a", "z", "col13"}); + }}), + snapshotInfo6, + snapshotInfo5, + ImmutableSet.of("10", "11", "8", "9", "12", "15"), + ImmutableSet.of("1", "3", "13", "14"), + ImmutableSet.of("1", "13", "3", "14"), + ImmutableSet.of("2", "8", "9", "12", "15"), + ImmutableSet.of("2", "9", "12"), + false, + ImmutableMap.of( + "2", new String[]{"a", "b", "col1"}, + "12", new String[]{"a", "d", "col2"}, + "8", new String[]{"a", "b", "col1"}, + "9", new String[]{"a", "c", "col1"}, + "15", new String[]{"a", "z", "col13"} + )) ); } @@ -535,68 +572,36 @@ public void testGetSSTDiffListWithoutDB(String description, Set expectedSameSstFiles, Set expectedDiffSstFiles, Set expectedSSTDiffFiles, - boolean expectingException) throws IOException { - - boolean exceptionThrown = false; - if (compactionLog != null) { - // Construct DAG from compaction log input - Arrays.stream(compactionLog.split("\n")).forEach( - rocksDBCheckpointDiffer::processCompactionLogLine); - } else if (compactionLogEntries != null) { - compactionLogEntries.forEach(entry -> - rocksDBCheckpointDiffer.addToCompactionLogTable(entry)); - } else { - throw new IllegalArgumentException("One of compactionLog and " + - "compactionLogEntries should be non-null."); - } - rocksDBCheckpointDiffer.loadAllCompactionLogs(); - - Set actualSameSstFiles = new HashSet<>(); - Set actualDiffSstFiles = new HashSet<>(); - - try { - rocksDBCheckpointDiffer.internalGetSSTDiffList( - srcSnapshot, - destSnapshot, - srcSnapshotSstFiles, - destSnapshotSstFiles, - actualSameSstFiles, - actualDiffSstFiles); - } catch (RuntimeException rtEx) { - if (!expectingException) { - fail("Unexpected exception thrown in test."); + boolean expectingException, + Map metaDataMap) { + try (MockedStatic mockedRocksdiffUtil = Mockito.mockStatic(RocksDiffUtils.class, + Mockito.CALLS_REAL_METHODS)) { + mockedRocksdiffUtil.when(() -> RocksDiffUtils.constructBucketKey(anyString())).thenAnswer(i -> i.getArgument(0)); + boolean exceptionThrown = false; + if (compactionLog != null) { + // Construct DAG from compaction log input + Arrays.stream(compactionLog.split("\n")).forEach( + rocksDBCheckpointDiffer::processCompactionLogLine); + } else if (compactionLogEntries != null) { + compactionLogEntries.forEach(entry -> + rocksDBCheckpointDiffer.addToCompactionLogTable(entry)); } else { - exceptionThrown = true; + throw new IllegalArgumentException("One of compactionLog and " + + "compactionLogEntries should be non-null."); } - } + rocksDBCheckpointDiffer.loadAllCompactionLogs(); - if (expectingException && !exceptionThrown) { - fail("Expecting exception but none thrown."); - } + Set actualSameSstFiles = new HashSet<>(); + Set actualDiffSstFiles = new HashSet<>(); - // Check same and different SST files result - assertEquals(expectedSameSstFiles, actualSameSstFiles); - assertEquals(expectedDiffSstFiles, actualDiffSstFiles); - try (MockedStatic mockedHandler = Mockito.mockStatic(RdbUtil.class, Mockito.CALLS_REAL_METHODS)) { - RocksDB rocksDB = Mockito.mock(RocksDB.class); - Mockito.when(rocksDB.getName()).thenReturn("dummy"); - Mockito.when(srcSnapshot.getRocksDB().get()).thenReturn(rocksDB); - Mockito.when(destSnapshot.getRocksDB().get()).thenReturn(rocksDB); - mockedHandler.when(() -> RdbUtil.getLiveSSTFilesForCFs(any(), any())) - .thenAnswer(i -> { - Set sstFiles = i.getArgument(0).equals(srcSnapshot.getRocksDB()) ? srcSnapshotSstFiles - : destSnapshotSstFiles; - return sstFiles.stream().map(fileName -> { - LiveFileMetaData liveFileMetaData = Mockito.mock(LiveFileMetaData.class); - Mockito.when(liveFileMetaData.fileName()).thenReturn("/" + fileName + SST_FILE_EXTENSION); - return liveFileMetaData; - }).collect(Collectors.toList()); - }); try { - Assertions.assertEquals(Optional.ofNullable(expectedSSTDiffFiles) - .map(files -> files.stream().sorted().collect(Collectors.toList())).orElse(null), - rocksDBCheckpointDiffer.getSSTDiffList(srcSnapshot, destSnapshot) - .map(i -> i.stream().sorted().collect(Collectors.toList())).orElse(null)); + rocksDBCheckpointDiffer.internalGetSSTDiffList( + srcSnapshot, + destSnapshot, + srcSnapshotSstFiles, + destSnapshotSstFiles, + actualSameSstFiles, + actualDiffSstFiles); } catch (RuntimeException rtEx) { if (!expectingException) { fail("Unexpected exception thrown in test."); @@ -604,9 +609,56 @@ public void testGetSSTDiffListWithoutDB(String description, exceptionThrown = true; } } - } - if (expectingException && !exceptionThrown) { - fail("Expecting exception but none thrown."); + + if (expectingException && !exceptionThrown) { + fail("Expecting exception but none thrown."); + } + + // Check same and different SST files result + assertEquals(expectedSameSstFiles, actualSameSstFiles); + assertEquals(expectedDiffSstFiles, actualDiffSstFiles); + try (MockedStatic mockedHandler = Mockito.mockStatic(RdbUtil.class, Mockito.CALLS_REAL_METHODS)) { + RocksDB rocksDB = Mockito.mock(RocksDB.class); + Mockito.when(rocksDB.getName()).thenReturn("dummy"); + Mockito.when(srcSnapshot.getRocksDB().get()).thenReturn(rocksDB); + Mockito.when(destSnapshot.getRocksDB().get()).thenReturn(rocksDB); + Mockito.when(srcSnapshot.getRocksDB().getLiveMetadataForSSTFiles()) + .thenAnswer(invocation -> srcSnapshotSstFiles.stream().filter(metaDataMap::containsKey).map(file -> { + LiveFileMetaData liveFileMetaData = Mockito.mock(LiveFileMetaData.class); + String[] metaData = metaDataMap.get(file); + Mockito.when(liveFileMetaData.fileName()).thenReturn("/" + file + SST_FILE_EXTENSION); + Mockito.when(liveFileMetaData.smallestKey()).thenReturn(metaData[0].getBytes(UTF_8)); + Mockito.when(liveFileMetaData.largestKey()).thenReturn(metaData[1].getBytes(UTF_8)); + Mockito.when(liveFileMetaData.columnFamilyName()).thenReturn(metaData[2].getBytes(UTF_8)); + return liveFileMetaData; + }).collect(Collectors.toMap(liveFileMetaData -> FilenameUtils.getBaseName(liveFileMetaData.fileName()), + Function.identity()))); + mockedHandler.when(() -> RdbUtil.getLiveSSTFilesForCFs(any(), any())) + .thenAnswer(i -> { + Set sstFiles = i.getArgument(0).equals(srcSnapshot.getRocksDB()) ? srcSnapshotSstFiles + : destSnapshotSstFiles; + return sstFiles.stream().map(fileName -> { + LiveFileMetaData liveFileMetaData = Mockito.mock(LiveFileMetaData.class); + Mockito.when(liveFileMetaData.fileName()).thenReturn("/" + fileName + SST_FILE_EXTENSION); + return liveFileMetaData; + }).collect(Collectors.toList()); + }); + try { + Assertions.assertEquals(Optional.ofNullable(expectedSSTDiffFiles) + .map(files -> files.stream().sorted().collect(Collectors.toList())).orElse(null), + rocksDBCheckpointDiffer.getSSTDiffList(srcSnapshot, destSnapshot) + .map(i -> i.stream().sorted().collect(Collectors.toList())).orElse(null)); + } catch (RuntimeException rtEx) { + if (!expectingException) { + fail("Unexpected exception thrown in test."); + } else { + exceptionThrown = true; + } + } + } + if (expectingException && !exceptionThrown) { + fail("Expecting exception but none thrown."); + } } } @@ -640,7 +692,12 @@ void testDifferWithDB() throws Exception { "000017.sst", "000019.sst", "000021.sst", "000023.sst", "000024.sst", "000026.sst", "000029.sst")); } - + rocksDBCheckpointDiffer.getForwardCompactionDAG().nodes().stream().forEach(compactionNode -> { + Assertions.assertNotNull(compactionNode.getStartKey()); + Assertions.assertNotNull(compactionNode.getEndKey()); + }); + GenericTestUtils.waitFor(() -> rocksDBCheckpointDiffer.getInflightCompactions().isEmpty(), 1000, + 10000); if (LOG.isDebugEnabled()) { rocksDBCheckpointDiffer.dumpCompactionNodeTable(); } @@ -1553,19 +1610,30 @@ private static Stream sstFilePruningScenarios() { ); } - private static CompactionLogEntry createCompactionEntry( - long dbSequenceNumber, - long compactionTime, - List inputFiles, - List outputFiles - ) { + private static CompactionLogEntry createCompactionEntry(long dbSequenceNumber, + long compactionTime, + List inputFiles, + List outputFiles) { + return createCompactionEntry(dbSequenceNumber, compactionTime, inputFiles, outputFiles, Collections.emptyMap()); + } + + private static CompactionLogEntry createCompactionEntry(long dbSequenceNumber, + long compactionTime, + List inputFiles, + List outputFiles, + Map metadata) { return new CompactionLogEntry.Builder(dbSequenceNumber, compactionTime, - toFileInfoList(inputFiles), toFileInfoList(outputFiles)).build(); + toFileInfoList(inputFiles, metadata), toFileInfoList(outputFiles, metadata)).build(); } - private static List toFileInfoList(List files) { + private static List toFileInfoList(List files, + Map metadata) { return files.stream() - .map(fileName -> new CompactionFileInfo.Builder(fileName).build()) + .map(fileName -> new CompactionFileInfo.Builder(fileName) + .setStartRange(Optional.ofNullable(metadata.get(fileName)).map(meta -> meta[0]).orElse(null)) + .setEndRange(Optional.ofNullable(metadata.get(fileName)).map(meta -> meta[1]).orElse(null)) + .setColumnFamily(Optional.ofNullable(metadata.get(fileName)).map(meta -> meta[2]).orElse(null)) + .build()) .collect(Collectors.toList()); } diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java index 67233676f0b..ef92aa2c17c 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java @@ -18,10 +18,32 @@ package org.apache.ozone.rocksdiff; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB; +import org.assertj.core.util.Sets; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.rocksdb.LiveFileMetaData; +import org.rocksdb.RocksDB; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.params.provider.Arguments.arguments; +import static org.mockito.ArgumentMatchers.anyString; /** * Class to test RocksDiffUtils. @@ -54,4 +76,103 @@ public void testFilterFunction() { "/volume/bucket/key-1", "/volume/bucket2/key-97")); } + + public static Stream values() { + return Stream.of( + arguments("validColumnFamily", "invalidColumnFamily", "a", "d", "b", "f"), + arguments("validColumnFamily", "invalidColumnFamily", "a", "d", "e", "f"), + arguments("validColumnFamily", "invalidColumnFamily", "a", "d", "a", "f"), + arguments("validColumnFamily", "validColumnFamily", "a", "d", "e", "g"), + arguments("validColumnFamily", "validColumnFamily", "e", "g", "a", "d"), + arguments("validColumnFamily", "validColumnFamily", "b", "b", "e", "g"), + arguments("validColumnFamily", "validColumnFamily", "a", "d", "e", "e") + ); + } + + @ParameterizedTest + @MethodSource("values") + public void testFilterRelevantSstFilesWithPreExistingCompactionInfo(String validSSTColumnFamilyName, + String invalidColumnFamilyName, + String validSSTFileStartRange, + String validSSTFileEndRange, + String invalidSSTFileStartRange, + String invalidSSTFileEndRange) { + try (MockedStatic mockedHandler = Mockito.mockStatic(RocksDiffUtils.class, + Mockito.CALLS_REAL_METHODS)) { + mockedHandler.when(() -> RocksDiffUtils.constructBucketKey(anyString())).thenAnswer(i -> i.getArgument(0)); + String validSstFile = "filePath/validSSTFile.sst"; + String invalidSstFile = "filePath/invalidSSTFile.sst"; + String untrackedSstFile = "filePath/untrackedSSTFile.sst"; + String expectedPrefix = String.valueOf((char)(((int)validSSTFileEndRange.charAt(0) + + validSSTFileStartRange.charAt(0)) / 2)); + Set sstFile = Sets.newTreeSet(validSstFile, invalidSstFile, untrackedSstFile); + RocksDiffUtils.filterRelevantSstFiles(sstFile, ImmutableMap.of(validSSTColumnFamilyName, expectedPrefix), + ImmutableMap.of("validSSTFile", new CompactionNode(validSstFile, 0, 0, validSSTFileStartRange, + validSSTFileEndRange, validSSTColumnFamilyName), "invalidSSTFile", + new CompactionNode(invalidSstFile, 0, 0, invalidSSTFileStartRange, + invalidSSTFileEndRange, invalidColumnFamilyName))); + Assertions.assertEquals(Sets.newTreeSet(validSstFile, untrackedSstFile), sstFile); + } + + } + + private LiveFileMetaData getMockedLiveFileMetadata(String columnFamilyName, String startRange, + String endRange, + String name) { + LiveFileMetaData liveFileMetaData = Mockito.mock(LiveFileMetaData.class); + Mockito.when(liveFileMetaData.largestKey()).thenReturn(endRange.getBytes(StandardCharsets.UTF_8)); + Mockito.when(liveFileMetaData.columnFamilyName()).thenReturn(columnFamilyName.getBytes(StandardCharsets.UTF_8)); + Mockito.when(liveFileMetaData.smallestKey()).thenReturn(startRange.getBytes(StandardCharsets.UTF_8)); + Mockito.when(liveFileMetaData.fileName()).thenReturn("basePath/" + name + ".sst"); + return liveFileMetaData; + } + + @ParameterizedTest + @MethodSource("values") + public void testFilterRelevantSstFilesFromDB(String validSSTColumnFamilyName, + String invalidColumnFamilyName, + String validSSTFileStartRange, + String validSSTFileEndRange, + String invalidSSTFileStartRange, + String invalidSSTFileEndRange) { + try (MockedStatic mockedHandler = Mockito.mockStatic(RocksDiffUtils.class, + Mockito.CALLS_REAL_METHODS)) { + mockedHandler.when(() -> RocksDiffUtils.constructBucketKey(anyString())).thenAnswer(i -> i.getArgument(0)); + for (int numberOfDBs = 1; numberOfDBs < 10; numberOfDBs++) { + String validSstFile = "filePath/validSSTFile.sst"; + String invalidSstFile = "filePath/invalidSSTFile.sst"; + String untrackedSstFile = "filePath/untrackedSSTFile.sst"; + int expectedDBKeyIndex = numberOfDBs / 2; + ManagedRocksDB[] rocksDBs = + IntStream.range(0, numberOfDBs).mapToObj(i -> Mockito.mock(ManagedRocksDB.class)) + .collect(Collectors.toList()).toArray(new ManagedRocksDB[numberOfDBs]); + for (int i = 0; i < numberOfDBs; i++) { + ManagedRocksDB managedRocksDB = rocksDBs[i]; + RocksDB mockedRocksDB = Mockito.mock(RocksDB.class); + Mockito.when(managedRocksDB.get()).thenReturn(mockedRocksDB); + if (i == expectedDBKeyIndex) { + LiveFileMetaData validLiveFileMetaData = getMockedLiveFileMetadata(validSSTColumnFamilyName, + validSSTFileStartRange, validSSTFileEndRange, "validSSTFile"); + LiveFileMetaData invalidLiveFileMetaData = getMockedLiveFileMetadata(invalidColumnFamilyName, + invalidSSTFileStartRange, invalidSSTFileEndRange, "invalidSSTFile"); + List liveFileMetaDatas = Arrays.asList(validLiveFileMetaData, invalidLiveFileMetaData); + Mockito.when(mockedRocksDB.getLiveFilesMetaData()).thenReturn(liveFileMetaDatas); + } else { + Mockito.when(mockedRocksDB.getLiveFilesMetaData()).thenReturn(Collections.emptyList()); + } + Mockito.when(managedRocksDB.getLiveMetadataForSSTFiles()) + .thenAnswer(invocation -> ManagedRocksDB.getLiveMetadataForSSTFiles(mockedRocksDB)); + } + + String expectedPrefix = String.valueOf((char)(((int)validSSTFileEndRange.charAt(0) + + validSSTFileStartRange.charAt(0)) / 2)); + Set sstFile = Sets.newTreeSet(validSstFile, invalidSstFile, untrackedSstFile); + RocksDiffUtils.filterRelevantSstFiles(sstFile, ImmutableMap.of(validSSTColumnFamilyName, expectedPrefix), + Collections.emptyMap(), rocksDBs); + Assertions.assertEquals(Sets.newTreeSet(validSstFile, untrackedSstFile), sstFile); + } + + } + + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java index b3d47fece9d..8add87f0633 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java @@ -1032,8 +1032,10 @@ private void getDeltaFilesAndDiffKeysToObjectIdToKeyMap( // tombstone is not loaded. // TODO: [SNAPSHOT] Update Rocksdb SSTFileIterator to read tombstone if (skipNativeDiff || !isNativeLibsLoaded) { - deltaFiles.addAll(getSSTFileListForSnapshot(fromSnapshot, - tablesToLookUp)); + Set inputFiles = getSSTFileListForSnapshot(fromSnapshot, tablesToLookUp); + ManagedRocksDB fromDB = ((RDBStore)fromSnapshot.getMetadataManager().getStore()).getDb().getManagedRocksDb(); + RocksDiffUtils.filterRelevantSstFiles(inputFiles, tablePrefixes, fromDB); + deltaFiles.addAll(inputFiles); } addToObjectIdMap(fsTable, tsTable, deltaFiles, !skipNativeDiff && isNativeLibsLoaded, @@ -1153,21 +1155,16 @@ Set getDeltaFiles(OmSnapshot fromSnapshot, LOG.warn("RocksDBCheckpointDiffer is not available, falling back to" + " slow path"); } - - Set fromSnapshotFiles = - RdbUtil.getSSTFilesForComparison( - ((RDBStore)fromSnapshot.getMetadataManager().getStore()) - .getDb().getManagedRocksDb(), - tablesToLookUp); - Set toSnapshotFiles = - RdbUtil.getSSTFilesForComparison( - ((RDBStore)toSnapshot.getMetadataManager().getStore()).getDb() - .getManagedRocksDb(), - tablesToLookUp); + ManagedRocksDB fromDB = ((RDBStore)fromSnapshot.getMetadataManager().getStore()) + .getDb().getManagedRocksDb(); + ManagedRocksDB toDB = ((RDBStore)toSnapshot.getMetadataManager().getStore()) + .getDb().getManagedRocksDb(); + Set fromSnapshotFiles = getSSTFileListForSnapshot(fromSnapshot, tablesToLookUp); + Set toSnapshotFiles = getSSTFileListForSnapshot(toSnapshot, tablesToLookUp); Set diffFiles = new HashSet<>(); diffFiles.addAll(fromSnapshotFiles); diffFiles.addAll(toSnapshotFiles); - RocksDiffUtils.filterRelevantSstFiles(diffFiles, tablePrefixes); + RocksDiffUtils.filterRelevantSstFiles(diffFiles, tablePrefixes, fromDB, toDB); deltaFiles = Optional.of(diffFiles); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java index 75a635cc887..037e54d0008 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java @@ -69,6 +69,7 @@ import org.apache.ratis.util.TimeDuration; import jakarta.annotation.Nonnull; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -93,7 +94,6 @@ import org.rocksdb.RocksIterator; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -483,7 +483,8 @@ public void testGetDeltaFilesWithFullDiff(int numberOfFiles, }); mockedRocksDiffUtils.when(() -> - RocksDiffUtils.filterRelevantSstFiles(anySet(), anyMap())) + RocksDiffUtils.filterRelevantSstFiles(anySet(), anyMap(), anyMap(), any(ManagedRocksDB.class), + any(ManagedRocksDB.class))) .thenAnswer((Answer) invocationOnMock -> { invocationOnMock.getArgument(0, Set.class).stream() .findAny().ifPresent(val -> { @@ -550,7 +551,8 @@ public void testGetDeltaFilesWithDifferThrowException(int numberOfFiles) }); mockedRocksDiffUtils.when(() -> - RocksDiffUtils.filterRelevantSstFiles(anySet(), anyMap())) + RocksDiffUtils.filterRelevantSstFiles(anySet(), anyMap(), anyMap(), any(ManagedRocksDB.class), + any(ManagedRocksDB.class))) .thenAnswer((Answer) invocationOnMock -> { invocationOnMock.getArgument(0, Set.class).stream() .findAny().ifPresent(val -> { @@ -567,7 +569,7 @@ public void testGetDeltaFilesWithDifferThrowException(int numberOfFiles) when(snapshotInfoTable.get(SnapshotInfo.getTableKey(VOLUME_NAME, BUCKET_NAME, snap2.toString()))) .thenReturn(getSnapshotInfoInstance(VOLUME_NAME, BUCKET_NAME, snap2.toString(), snap2)); - doThrow(new FileNotFoundException("File not found exception.")) + doThrow(new RuntimeException("File not found exception.")) .when(differ) .getSSTDiffListWithFullPath( any(DifferSnapshotInfo.class), @@ -1518,6 +1520,27 @@ private void setupMocksForRunningASnapDiff( when(bucketInfoTable.get(bucketKey)).thenReturn(bucketInfo); } + @Test + public void testGetDeltaFilesWithFullDiff() throws IOException { + SnapshotDiffManager spy = spy(snapshotDiffManager); + OmSnapshot fromSnapshot = getMockedOmSnapshot(UUID.randomUUID()); + OmSnapshot toSnapshot = getMockedOmSnapshot(UUID.randomUUID()); + Mockito.doAnswer(invocation -> { + OmSnapshot snapshot = invocation.getArgument(0); + if (snapshot == fromSnapshot) { + return Sets.newHashSet("1", "2", "3"); + } + if (snapshot == toSnapshot) { + return Sets.newHashSet("3", "4", "5"); + } + return Sets.newHashSet("6", "7", "8"); + }).when(spy).getSSTFileListForSnapshot(Mockito.any(OmSnapshot.class), + Mockito.anyList()); + Set deltaFiles = spy.getDeltaFiles(fromSnapshot, toSnapshot, Collections.emptyList(), snapshotInfo, + snapshotInfo, true, Collections.emptyMap(), null); + Assertions.assertEquals(Sets.newHashSet("1", "2", "3", "4", "5"), deltaFiles); + } + @Test public void testGetSnapshotDiffReportHappyCase() throws Exception { SnapshotInfo fromSnapInfo = snapshotInfo;