Skip to content

Commit

Permalink
HDDS-11914. Snapshot diff should not filter SST Files based by readin…
Browse files Browse the repository at this point in the history
…g SST file reader (#7563)
  • Loading branch information
swamirishi authored Dec 13, 2024
1 parent 1835326 commit 7a46080
Show file tree
Hide file tree
Showing 9 changed files with 429 additions and 240 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hdds.utils.db.managed;

import org.apache.commons.io.FilenameUtils;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
Expand All @@ -31,6 +32,8 @@
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Managed {@link RocksDB}.
Expand Down Expand Up @@ -102,4 +105,14 @@ public void deleteFile(LiveFileMetaData fileToBeDeleted)
File file = new File(fileToBeDeleted.path(), fileToBeDeleted.fileName());
ManagedRocksObjectUtils.waitForFileDelete(file, Duration.ofSeconds(60));
}

public static Map<String, LiveFileMetaData> getLiveMetadataForSSTFiles(RocksDB db) {
return db.getLiveFilesMetaData().stream().collect(
Collectors.toMap(liveFileMetaData -> FilenameUtils.getBaseName(liveFileMetaData.fileName()),
liveFileMetaData -> liveFileMetaData));
}

public Map<String, LiveFileMetaData> getLiveMetadataForSSTFiles() {
return getLiveMetadataForSSTFiles(this.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.rocksdb.LiveFileMetaData;

import java.util.Objects;

Expand Down Expand Up @@ -128,6 +130,16 @@ public Builder setColumnFamily(String columnFamily) {
return this;
}

public Builder setValues(LiveFileMetaData fileMetaData) {
if (fileMetaData != null) {
String columnFamilyName = StringUtils.bytes2String(fileMetaData.columnFamilyName());
String startRangeValue = StringUtils.bytes2String(fileMetaData.smallestKey());
String endRangeValue = StringUtils.bytes2String(fileMetaData.largestKey());
this.setColumnFamily(columnFamilyName).setStartRange(startRangeValue).setEndRange(endRangeValue);
}
return this;
}

public CompactionFileInfo build() {
if ((startRange != null || endRange != null || columnFamily != null) &&
(startRange == null || endRange == null || columnFamily == null)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.ozone.rocksdiff;

import org.apache.ozone.compaction.log.CompactionFileInfo;

/**
* Node in the compaction DAG that represents an SST file.
*/
Expand Down Expand Up @@ -48,6 +50,11 @@ public CompactionNode(String file, long numKeys, long seqNum,
this.columnFamily = columnFamily;
}

public CompactionNode(CompactionFileInfo compactionFileInfo) {
this(compactionFileInfo.getFileName(), -1, -1, compactionFileInfo.getStartKey(),
compactionFileInfo.getEndKey(), compactionFileInfo.getColumnFamily());
}

@Override
public String toString() {
return String.format("Node{%s}", fileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.FileNotFoundException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -34,6 +35,7 @@

import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
Expand All @@ -42,11 +44,9 @@
import org.apache.hadoop.hdds.utils.Scheduler;
import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReader;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReaderIterator;
import org.apache.ozone.compaction.log.CompactionFileInfo;
import org.apache.ozone.compaction.log.CompactionLogEntry;
import org.apache.ozone.rocksdb.util.RdbUtil;
Expand Down Expand Up @@ -74,7 +74,6 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -174,6 +173,7 @@ public class RocksDBCheckpointDiffer implements AutoCloseable,

private ColumnFamilyHandle compactionLogTableCFHandle;
private ManagedRocksDB activeRocksDB;
private ConcurrentMap<String, CompactionFileInfo> inflightCompactions;

/**
* For snapshot diff calculation we only need to track following column
Expand Down Expand Up @@ -245,6 +245,7 @@ public class RocksDBCheckpointDiffer implements AutoCloseable,
} else {
this.scheduler = null;
}
this.inflightCompactions = new ConcurrentHashMap<>();
}

private String createCompactionLogDir(String metadataDirName,
Expand Down Expand Up @@ -463,7 +464,7 @@ public void onCompactionBegin(RocksDB db,
return;
}
}

inflightCompactions.putAll(toFileInfoList(compactionJobInfo.inputFiles(), db));
for (String file : compactionJobInfo.inputFiles()) {
createLink(Paths.get(sstBackupDir, new File(file).getName()),
Paths.get(file));
Expand All @@ -484,25 +485,28 @@ public void onCompactionCompleted(RocksDB db,
}

long trxId = db.getLatestSequenceNumber();

Map<String, CompactionFileInfo> inputFileCompactions = toFileInfoList(compactionJobInfo.inputFiles(), db);
CompactionLogEntry.Builder builder;
try (ManagedOptions options = new ManagedOptions();
ManagedReadOptions readOptions = new ManagedReadOptions()) {
builder = new CompactionLogEntry.Builder(trxId,
System.currentTimeMillis(),
toFileInfoList(compactionJobInfo.inputFiles(), options,
readOptions),
toFileInfoList(compactionJobInfo.outputFiles(), options,
readOptions));
}
builder = new CompactionLogEntry.Builder(trxId,
System.currentTimeMillis(),
inputFileCompactions.keySet().stream()
.map(inputFile -> {
if (!inflightCompactions.containsKey(inputFile)) {
LOG.warn("Input file not found in inflightCompactionsMap : {} which should have been added on " +
"compactionBeginListener.",
inputFile);
}
return inflightCompactions.getOrDefault(inputFile, inputFileCompactions.get(inputFile));
})
.collect(Collectors.toList()),
new ArrayList<>(toFileInfoList(compactionJobInfo.outputFiles(), db).values()));

if (LOG.isDebugEnabled()) {
builder = builder.setCompactionReason(
compactionJobInfo.compactionReason().toString());
}

CompactionLogEntry compactionLogEntry = builder.build();

synchronized (this) {
if (closed) {
return;
Expand All @@ -521,6 +525,9 @@ public void onCompactionCompleted(RocksDB db,
populateCompactionDAG(compactionLogEntry.getInputFileInfoList(),
compactionLogEntry.getOutputFileInfoList(),
compactionLogEntry.getDbSequenceNumber());
for (String inputFile : inputFileCompactions.keySet()) {
inflightCompactions.remove(inputFile);
}
}
}
};
Expand Down Expand Up @@ -789,7 +796,7 @@ private void preconditionChecksForLoadAllCompactionLogs() {
* and appends the extension '.sst'.
*/
private String getSSTFullPath(String sstFilenameWithoutExtension,
String dbPath) {
String... dbPaths) {

// Try to locate the SST in the backup dir first
final Path sstPathInBackupDir = Paths.get(sstBackupDir,
Expand All @@ -800,11 +807,13 @@ private String getSSTFullPath(String sstFilenameWithoutExtension,

// SST file does not exist in the SST backup dir, this means the SST file
// has not gone through any compactions yet and is only available in the
// src DB directory
final Path sstPathInDBDir = Paths.get(dbPath,
sstFilenameWithoutExtension + SST_FILE_EXTENSION);
if (Files.exists(sstPathInDBDir)) {
return sstPathInDBDir.toString();
// src DB directory or destDB directory
for (String dbPath : dbPaths) {
final Path sstPathInDBDir = Paths.get(dbPath,
sstFilenameWithoutExtension + SST_FILE_EXTENSION);
if (Files.exists(sstPathInDBDir)) {
return sstPathInDBDir.toString();
}
}

// TODO: More graceful error handling?
Expand All @@ -825,18 +834,16 @@ private String getSSTFullPath(String sstFilenameWithoutExtension,
* e.g. ["/path/to/sstBackupDir/000050.sst",
* "/path/to/sstBackupDir/000060.sst"]
*/
public synchronized Optional<List<String>> getSSTDiffListWithFullPath(
DifferSnapshotInfo src,
DifferSnapshotInfo dest,
String sstFilesDirForSnapDiffJob
) throws IOException {
public synchronized Optional<List<String>> getSSTDiffListWithFullPath(DifferSnapshotInfo src,
DifferSnapshotInfo dest,
String sstFilesDirForSnapDiffJob) {

Optional<List<String>> sstDiffList = getSSTDiffList(src, dest);

return sstDiffList.map(diffList -> diffList.stream()
.map(
sst -> {
String sstFullPath = getSSTFullPath(sst, src.getDbPath());
String sstFullPath = getSSTFullPath(sst, src.getDbPath(), dest.getDbPath());
Path link = Paths.get(sstFilesDirForSnapDiffJob,
sst + SST_FILE_EXTENSION);
Path srcFile = Paths.get(sstFullPath);
Expand All @@ -858,7 +865,7 @@ public synchronized Optional<List<String>> getSSTDiffListWithFullPath(
* @return A list of SST files without extension. e.g. ["000050", "000060"]
*/
public synchronized Optional<List<String>> getSSTDiffList(DifferSnapshotInfo src,
DifferSnapshotInfo dest) throws IOException {
DifferSnapshotInfo dest) {

// TODO: Reject or swap if dest is taken after src, once snapshot chain
// integration is done.
Expand Down Expand Up @@ -900,29 +907,12 @@ public synchronized Optional<List<String>> getSSTDiffList(DifferSnapshotInfo src
}

if (src.getTablePrefixes() != null && !src.getTablePrefixes().isEmpty()) {
filterRelevantSstFilesFullPath(fwdDAGDifferentFiles,
src.getTablePrefixes());
RocksDiffUtils.filterRelevantSstFiles(fwdDAGDifferentFiles, src.getTablePrefixes(), compactionNodeMap,
src.getRocksDB(), dest.getRocksDB());
}
return Optional.of(new ArrayList<>(fwdDAGDifferentFiles));
}

/**
* construct absolute sst file path first and
* filter the files.
*/
public void filterRelevantSstFilesFullPath(Set<String> inputFiles,
Map<String, String> tableToPrefixMap) throws IOException {
for (Iterator<String> fileIterator =
inputFiles.iterator(); fileIterator.hasNext();) {
String filename = fileIterator.next();
String filepath = getAbsoluteSstFilePath(filename);
if (!RocksDiffUtils.doesSstFileContainKeyRange(filepath,
tableToPrefixMap)) {
fileIterator.remove();
}
}
}

/**
* Core getSSTDiffList logic.
* <p>
Expand Down Expand Up @@ -1476,55 +1466,22 @@ public void pngPrintMutableGraph(String filePath, GraphType graphType)
graph.generateImage(filePath);
}

private List<CompactionFileInfo> toFileInfoList(List<String> sstFiles,
ManagedOptions options,
ManagedReadOptions readOptions
) {
private Map<String, CompactionFileInfo> toFileInfoList(List<String> sstFiles, RocksDB db) {
if (CollectionUtils.isEmpty(sstFiles)) {
return Collections.emptyList();
return Collections.emptyMap();
}

List<CompactionFileInfo> response = new ArrayList<>();

Map<String, LiveFileMetaData> liveFileMetaDataMap = ManagedRocksDB.getLiveMetadataForSSTFiles(db);
Map<String, CompactionFileInfo> response = new HashMap<>();
for (String sstFile : sstFiles) {
CompactionFileInfo fileInfo = toFileInfo(sstFile, options, readOptions);
response.add(fileInfo);
String fileName = FilenameUtils.getBaseName(sstFile);
CompactionFileInfo fileInfo =
new CompactionFileInfo.Builder(fileName).setValues(liveFileMetaDataMap.get(fileName)).build();
response.put(sstFile, fileInfo);
}
return response;
}

private CompactionFileInfo toFileInfo(String sstFile,
ManagedOptions options,
ManagedReadOptions readOptions) {
final int fileNameOffset = sstFile.lastIndexOf("/") + 1;
String fileName = sstFile.substring(fileNameOffset,
sstFile.length() - SST_FILE_EXTENSION_LENGTH);
CompactionFileInfo.Builder fileInfoBuilder =
new CompactionFileInfo.Builder(fileName);

try (ManagedSstFileReader fileReader = new ManagedSstFileReader(options)) {
fileReader.open(sstFile);
String columnFamily = StringUtils.bytes2String(fileReader.getTableProperties().getColumnFamilyName());
try (ManagedSstFileReaderIterator iterator =
ManagedSstFileReaderIterator.managed(fileReader.newIterator(readOptions))) {
iterator.get().seekToFirst();
String startKey = StringUtils.bytes2String(iterator.get().key());
iterator.get().seekToLast();
String endKey = StringUtils.bytes2String(iterator.get().key());
fileInfoBuilder.setStartRange(startKey)
.setEndRange(endKey)
.setColumnFamily(columnFamily);
}
} catch (RocksDBException rocksDBException) {
// Ideally it should not happen. If it does just log the exception.
// And let the compaction complete without the exception.
// Throwing exception in compaction listener could fail the RocksDB.
// In case of exception, compaction node will be missing start key,
// end key and column family. And during diff calculation it will
// continue the traversal as it was before HDDS-8940.
LOG.warn("Failed to read SST file: {}.", sstFile, rocksDBException);
}
return fileInfoBuilder.build();
ConcurrentMap<String, CompactionFileInfo> getInflightCompactions() {
return inflightCompactions;
}

}
Loading

0 comments on commit 7a46080

Please sign in to comment.