Skip to content

Commit

Permalink
[Kernel] [Cleanup] Move SnapshotManager::checkpoint to `Checkpointe…
Browse files Browse the repository at this point in the history
…r` class (delta-io#4147)

#### Which Delta project/connector is this regarding?

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [ ] Other (fill in here)

## Description

This PR moves `SnapshotManager::checkpoint` to `Checkpointer` class.
This reduces coupling and increases cohesion.

I also tidy up some code and logger statements along the way.

## How was this patch tested?

Just a refactor. Existing UTs.

## Does this PR introduce _any_ user-facing changes?

No.
  • Loading branch information
scottsand-db authored Feb 12, 2025
1 parent 34740f2 commit b02edc8
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.delta.kernel.exceptions.KernelException;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.checkpoints.Checkpointer;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
Expand Down Expand Up @@ -74,15 +75,17 @@ public static Table forPath(Engine engine, String path, Clock clock) {
return new TableImpl(resolvedPath, clock);
}

private final SnapshotManager snapshotManager;
private final String tablePath;
private final Checkpointer checkpointer;
private final SnapshotManager snapshotManager;
private final Clock clock;

public TableImpl(String tablePath, Clock clock) {
this.tablePath = tablePath;
final Path dataPath = new Path(tablePath);
final Path logPath = new Path(dataPath, "_delta_log");
this.snapshotManager = new SnapshotManager(logPath, dataPath);
this.checkpointer = new Checkpointer(logPath);
this.snapshotManager = new SnapshotManager(dataPath);
this.clock = clock;
}

Expand Down Expand Up @@ -131,7 +134,9 @@ public Snapshot getSnapshotAsOfTimestamp(Engine engine, long millisSinceEpochUTC
@Override
public void checkpoint(Engine engine, long version)
throws TableNotFoundException, CheckpointAlreadyExistsException, IOException {
snapshotManager.checkpoint(engine, clock, version);
final SnapshotImpl snapshotToCheckpoint =
(SnapshotImpl) getSnapshotAsOfVersion(engine, version);
checkpointer.checkpoint(engine, clock, snapshotToCheckpoint);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,104 @@
package io.delta.kernel.internal.checkpoints;

import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO;
import static io.delta.kernel.internal.TableConfig.EXPIRED_LOG_CLEANUP_ENABLED;
import static io.delta.kernel.internal.TableConfig.LOG_RETENTION;
import static io.delta.kernel.internal.TableFeatures.validateWriteSupportedTable;
import static io.delta.kernel.internal.snapshot.MetadataCleanup.cleanupExpiredLogs;
import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;

import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.CheckpointAlreadyExistsException;
import io.delta.kernel.exceptions.KernelEngineException;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
import io.delta.kernel.internal.util.*;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
import java.io.*;
import java.nio.file.FileAlreadyExistsException;
import java.util.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Class to load and write the {@link CheckpointMetaData} from `_last_checkpoint` file. */
public class Checkpointer {

////////////////////////////////
// Static variables / methods //
////////////////////////////////

private static final Logger logger = LoggerFactory.getLogger(Checkpointer.class);

private static final int READ_LAST_CHECKPOINT_FILE_MAX_RETRIES = 3;

/** The name of the last checkpoint file */
public static final String LAST_CHECKPOINT_FILE_NAME = "_last_checkpoint";

public static void checkpoint(Engine engine, Clock clock, SnapshotImpl snapshot)
throws TableNotFoundException, IOException {
final Path tablePath = snapshot.getDataPath();
final Path logPath = snapshot.getLogPath();
final long version = snapshot.getVersion();

logger.info("{}: Starting checkpoint for version: {}", tablePath, version);

// Check if writing to the given table protocol version/features is supported in Kernel
validateWriteSupportedTable(
snapshot.getProtocol(),
snapshot.getMetadata(),
snapshot.getSchema(),
snapshot.getDataPath().toString());

final Path checkpointPath = FileNames.checkpointFileSingular(logPath, version);

long numberOfAddFiles = 0;
try (CreateCheckpointIterator checkpointDataIter =
snapshot.getCreateCheckpointIterator(engine)) {
// Write the iterator actions to the checkpoint using the Parquet handler
wrapEngineExceptionThrowsIO(
() -> {
engine
.getParquetHandler()
.writeParquetFileAtomically(checkpointPath.toString(), checkpointDataIter);

logger.info("{}: Finished writing checkpoint file for version: {}", tablePath, version);

return null;
},
"Writing checkpoint file %s",
checkpointPath.toString());

// Get the metadata of the checkpoint file
numberOfAddFiles = checkpointDataIter.getNumberOfAddActions();
} catch (FileAlreadyExistsException faee) {
throw new CheckpointAlreadyExistsException(version);
}

final CheckpointMetaData checkpointMetaData =
new CheckpointMetaData(version, numberOfAddFiles, Optional.empty());

new Checkpointer(logPath).writeLastCheckpointFile(engine, checkpointMetaData);

logger.info(
"{}: Finished writing last checkpoint metadata file for version: {}", tablePath, version);

// Clean up delta log files if enabled.
final Metadata metadata = snapshot.getMetadata();
if (EXPIRED_LOG_CLEANUP_ENABLED.fromMetadata(metadata)) {
cleanupExpiredLogs(engine, clock, tablePath, LOG_RETENTION.fromMetadata(metadata));
} else {
logger.info(
"{}: Log cleanup is disabled. Skipping the deletion of expired log files", tablePath);
}
}

/**
* Given a list of checkpoint files, pick the latest complete checkpoint instance which is not
* later than `notLaterThan`.
Expand Down Expand Up @@ -82,8 +157,8 @@ public static Optional<CheckpointInstance> findLastCompleteCheckpointBefore(
* Helper method for `findLastCompleteCheckpointBefore` which also return the number of files
* searched. This helps in testing
*/
protected static Tuple2<Optional<CheckpointInstance>, Long>
findLastCompleteCheckpointBeforeHelper(Engine engine, Path tableLogPath, long version) {
public static Tuple2<Optional<CheckpointInstance>, Long> findLastCompleteCheckpointBeforeHelper(
Engine engine, Path tableLogPath, long version) {
CheckpointInstance upperBoundCheckpoint = new CheckpointInstance(version);
logger.info("Try to find the last complete checkpoint before version {}", version);

Expand Down Expand Up @@ -166,11 +241,15 @@ private static boolean validCheckpointFile(FileStatus fileStatus) {
&& fileStatus.getSize() > 0;
}

////////////////////////////////
// Member variables / methods //
////////////////////////////////

/** The path to the file that holds metadata about the most recent checkpoint. */
private final Path lastCheckpointFilePath;

public Checkpointer(Path tableLogPath) {
this.lastCheckpointFilePath = new Path(tableLogPath, LAST_CHECKPOINT_FILE_NAME);
public Checkpointer(Path logPath) {
this.lastCheckpointFilePath = new Path(logPath, LAST_CHECKPOINT_FILE_NAME);
}

/** Returns information about the most recent checkpoint. */
Expand Down Expand Up @@ -204,19 +283,24 @@ public void writeLastCheckpointFile(Engine engine, CheckpointMetaData checkpoint
/**
* Loads the checkpoint metadata from the _last_checkpoint file.
*
* <p>
*
* @param engine {@link Engine instance to use}
* @param tries Number of times already tried to load the metadata before this call.
*/
private Optional<CheckpointMetaData> loadMetadataFromFile(Engine engine, int tries) {
if (tries >= 3) {
if (tries >= READ_LAST_CHECKPOINT_FILE_MAX_RETRIES) {
// We have tried 3 times and failed. Assume the checkpoint metadata file is corrupt.
logger.warn(
"Failed to load checkpoint metadata from file {} after 3 attempts.",
lastCheckpointFilePath);
"Failed to load checkpoint metadata from file {} after {} attempts.",
lastCheckpointFilePath,
READ_LAST_CHECKPOINT_FILE_MAX_RETRIES);
return Optional.empty();
}

logger.info(
"Loading last checkpoint from the _last_checkpoint file. Attempt: {} / {}",
tries + 1,
READ_LAST_CHECKPOINT_FILE_MAX_RETRIES);

try {
// Use arbitrary values for size and mod time as they are not available.
// We could list and find the values, but it is an unnecessary FS call.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,25 @@

package io.delta.kernel.internal.snapshot;

import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO;
import static io.delta.kernel.internal.TableConfig.EXPIRED_LOG_CLEANUP_ENABLED;
import static io.delta.kernel.internal.TableConfig.LOG_RETENTION;
import static io.delta.kernel.internal.TableFeatures.validateWriteSupportedTable;
import static io.delta.kernel.internal.replay.LogReplayUtils.assertLogFilesBelongToTable;
import static io.delta.kernel.internal.snapshot.MetadataCleanup.cleanupExpiredLogs;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.lang.String.format;

import io.delta.kernel.*;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.CheckpointAlreadyExistsException;
import io.delta.kernel.exceptions.InvalidTableException;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.*;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.annotation.VisibleForTesting;
import io.delta.kernel.internal.checkpoints.*;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.ListUtils;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.util.Clock;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.FileNames.DeltaLogFileType;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.utils.FileStatus;
import java.io.*;
import java.nio.file.FileAlreadyExistsException;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand All @@ -60,13 +49,13 @@ public class SnapshotManager {
*/
private final AtomicReference<SnapshotHint> latestSnapshotHint;

private final Path logPath;
private final Path tablePath;
private final Path logPath;

public SnapshotManager(Path logPath, Path tablePath) {
public SnapshotManager(Path tablePath) {
this.latestSnapshotHint = new AtomicReference<>();
this.logPath = logPath;
this.tablePath = tablePath;
this.logPath = new Path(tablePath, "_delta_log");
}

private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);
Expand Down Expand Up @@ -102,7 +91,8 @@ public Snapshot buildLatestSnapshot(Engine engine, SnapshotQueryContext snapshot
* @throws TableNotFoundException if the table does not exist
* @throws InvalidTableException if the table is in an invalid state
*/
public Snapshot getSnapshotAt(Engine engine, long version, SnapshotQueryContext snapshotContext)
public SnapshotImpl getSnapshotAt(
Engine engine, long version, SnapshotQueryContext snapshotContext)
throws TableNotFoundException {
final LogSegment logSegment =
getLogSegmentForVersion(engine, Optional.of(version) /* versionToLoadOpt */);
Expand Down Expand Up @@ -148,65 +138,6 @@ public Snapshot getSnapshotForTimestamp(
return getSnapshotAt(engine, versionToRead, snapshotContext);
}

public void checkpoint(Engine engine, Clock clock, long version)
throws TableNotFoundException, IOException {
logger.info("{}: Starting checkpoint for version: {}", tablePath, version);
// Get the snapshot corresponding the version
SnapshotImpl snapshot =
(SnapshotImpl)
getSnapshotAt(
engine,
version,
SnapshotQueryContext.forVersionSnapshot(tablePath.toString(), version));

// Check if writing to the given table protocol version/features is supported in Kernel
validateWriteSupportedTable(
snapshot.getProtocol(), snapshot.getMetadata(), snapshot.getSchema(), tablePath.toString());

Path checkpointPath = FileNames.checkpointFileSingular(logPath, version);

long numberOfAddFiles = 0;
try (CreateCheckpointIterator checkpointDataIter =
snapshot.getCreateCheckpointIterator(engine)) {
// Write the iterator actions to the checkpoint using the Parquet handler
wrapEngineExceptionThrowsIO(
() -> {
engine
.getParquetHandler()
.writeParquetFileAtomically(checkpointPath.toString(), checkpointDataIter);
return null;
},
"Writing checkpoint file %s",
checkpointPath.toString());

logger.info("{}: Checkpoint file is written for version: {}", tablePath, version);

// Get the metadata of the checkpoint file
numberOfAddFiles = checkpointDataIter.getNumberOfAddActions();
} catch (FileAlreadyExistsException faee) {
throw new CheckpointAlreadyExistsException(version);
}

CheckpointMetaData checkpointMetaData =
new CheckpointMetaData(version, numberOfAddFiles, Optional.empty());

Checkpointer checkpointer = new Checkpointer(logPath);
checkpointer.writeLastCheckpointFile(engine, checkpointMetaData);

logger.info("{}: Last checkpoint metadata file is written for version: {}", tablePath, version);

logger.info("{}: Finished checkpoint for version: {}", tablePath, version);

// Clean up delta log files if enabled.
Metadata metadata = snapshot.getMetadata();
if (EXPIRED_LOG_CLEANUP_ENABLED.fromMetadata(metadata)) {
cleanupExpiredLogs(engine, clock, tablePath, LOG_RETENTION.fromMetadata(metadata));
} else {
logger.info(
"{}: Log cleanup is disabled. Skipping the deletion of expired log files", tablePath);
}
}

////////////////////
// Helper Methods //
////////////////////
Expand Down Expand Up @@ -602,10 +533,7 @@ private Optional<Long> getStartCheckpointVersion(Engine engine, Optional<Long> v
});
})
.orElseGet(
() -> {
logger.info("Loading last checkpoint from the _last_checkpoint file");
return new Checkpointer(logPath).readLastCheckpointFile(engine).map(x -> x.version);
});
() -> new Checkpointer(logPath).readLastCheckpointFile(engine).map(x -> x.version));
}

private void logDebugFileStatuses(String varName, List<FileStatus> fileStatuses) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
// getLogSegmentForVersion tests
//////////////////////////////////////////////////////////////////////////////////

private val snapshotManager = new SnapshotManager(logPath, dataPath)
private val snapshotManager = new SnapshotManager(dataPath)

/* ------------------HELPER METHODS------------------ */

Expand Down

0 comments on commit b02edc8

Please sign in to comment.