Skip to content

Commit

Permalink
Create SnapshotHint; use it to bound P & M loading
Browse files Browse the repository at this point in the history
  • Loading branch information
scottsand-db committed Nov 1, 2023
1 parent 7b5761d commit af4ca35
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.delta.kernel.internal;

import java.util.Optional;

import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Snapshot;
import io.delta.kernel.client.TableClient;
Expand All @@ -26,6 +28,8 @@
import io.delta.kernel.internal.lang.Lazy;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
import io.delta.kernel.internal.snapshot.SnapshotHint;
import io.delta.kernel.internal.snapshot.SnapshotManager;
import io.delta.kernel.internal.util.Tuple2;

/**
Expand All @@ -36,23 +40,33 @@ public class SnapshotImpl implements Snapshot {
private final long version;
private final LogReplay logReplay;
private final Lazy<Tuple2<Protocol, Metadata>> protocolAndMetadata;
private final Optional<SnapshotHint> snapshotHint;

public SnapshotImpl(
Path logPath,
Path dataPath,
long version,
LogSegment logSegment,
TableClient tableClient,
long timestamp) {
Path logPath,
Path dataPath,
long version,
LogSegment logSegment,
TableClient tableClient,
long timestamp,
SnapshotManager snapshotManager,
Optional<SnapshotHint> snapshotHint) {
this.dataPath = dataPath;
this.version = version;

this.logReplay = new LogReplay(
logPath,
dataPath,
tableClient,
logSegment);
this.protocolAndMetadata = new Lazy<>(logReplay::loadProtocolAndMetadata);
logSegment,
snapshotHint);
this.protocolAndMetadata = new Lazy<>(() -> {
// Construct the SnapshotHint lazily. i.e. do not eagerly load the protocol and metadata
final Tuple2<Protocol, Metadata> pAndM = logReplay.loadProtocolAndMetadata();
final SnapshotHint hint = new SnapshotHint(version, pAndM._1, pAndM._2);
snapshotManager.registerHint(hint);
return pAndM;
});
this.snapshotHint = snapshotHint;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package io.delta.kernel.internal.replay;

import io.delta.kernel.data.FileDataReadResult;

class ActionIterElem {
private final FileDataReadResult fileDataReadResult;
private final boolean isFromCheckpoint;
private final long version;

ActionIterElem(FileDataReadResult data, boolean isFromCheckpoint, long version) {
this.fileDataReadResult = data;
this.isFromCheckpoint = isFromCheckpoint;
this.version = version;
}

public FileDataReadResult getFileDataReadResult() {
return fileDataReadResult;
}

public boolean isFromCheckpoint() {
return isFromCheckpoint;
}

public long getVersion() {
return version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import static io.delta.kernel.expressions.AlwaysTrue.ALWAYS_TRUE;

import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.Utils;

/**
Expand All @@ -46,7 +46,7 @@
* <p>
* Users must pass in a `readSchema` to select which actions and sub-fields they want to consume.
*/
class ActionsIterator implements CloseableIterator<Tuple2<FileDataReadResult, Boolean>> {
class ActionsIterator implements CloseableIterator<ActionIterElem> {
private final TableClient tableClient;

/**
Expand All @@ -64,14 +64,14 @@ class ActionsIterator implements CloseableIterator<Tuple2<FileDataReadResult, Bo
* <p>
* If it is ever empty, that means there are no more batches to produce.
*/
private Optional<CloseableIterator<Tuple2<FileDataReadResult, Boolean>>> actionsIter;
private Optional<CloseableIterator<ActionIterElem>> actionsIter;

private boolean closed;

ActionsIterator(
TableClient tableClient,
List<FileStatus> files,
StructType readSchema) {
TableClient tableClient,
List<FileStatus> files,
StructType readSchema) {
this.tableClient = tableClient;
this.filesIter = files.iterator();
this.readSchema = readSchema;
Expand All @@ -97,7 +97,7 @@ public boolean hasNext() {
* to the instance {@link #readSchema}.
*/
@Override
public Tuple2<FileDataReadResult, Boolean> next() {
public ActionIterElem next() {
if (closed) {
throw new IllegalStateException("Can't call `next` on a closed iterator.");
}
Expand Down Expand Up @@ -160,7 +160,7 @@ private void tryEnsureNextActionsIterIsReady() {
* <p>
* Requires that `filesIter.hasNext` is true.
*/
private CloseableIterator<Tuple2<FileDataReadResult, Boolean>> getNextActionsIter() {
private CloseableIterator<ActionIterElem> getNextActionsIter() {
final FileStatus nextFile = filesIter.next();
final Closeable[] iteratorsToClose = new Closeable[2];

Expand All @@ -170,6 +170,7 @@ private CloseableIterator<Tuple2<FileDataReadResult, Boolean>> getNextActionsIte
try {
if (nextFile.getPath().endsWith(".json")) {
final JsonHandler jsonHandler = tableClient.getJsonHandler();
final long fileVersion = FileNames.deltaVersion(nextFile.getPath());

// Convert the `nextFile` FileStatus into an internal ScanFile Row and then
// allow the connector to contextualize it (i.e. perform any splitting)
Expand All @@ -193,9 +194,10 @@ private CloseableIterator<Tuple2<FileDataReadResult, Boolean>> getNextActionsIte

iteratorsToClose[1] = fileReadDataIter;

return combine(fileReadDataIter, false /* isFromCheckpoint */);
return combine(fileReadDataIter, false /* isFromCheckpoint */, fileVersion);
} else if (nextFile.getPath().endsWith(".parquet")) {
final ParquetHandler parquetHandler = tableClient.getParquetHandler();
final long fileVersion = FileNames.checkpointVersion(nextFile.getPath());

// Convert the `nextFile` FileStatus into an internal ScanFile Row and then
// allow the connector to contextualize it (i.e. perform any splitting)
Expand All @@ -218,7 +220,7 @@ private CloseableIterator<Tuple2<FileDataReadResult, Boolean>> getNextActionsIte

iteratorsToClose[1] = fileReadDataIter;

return combine(fileReadDataIter, true /* isFromCheckpoint */);
return combine(fileReadDataIter, true /* isFromCheckpoint */, fileVersion);
} else {
throw new IllegalStateException(
String.format("Unexpected log file path: %s", nextFile.getPath())
Expand All @@ -234,18 +236,19 @@ private CloseableIterator<Tuple2<FileDataReadResult, Boolean>> getNextActionsIte
/**
* Take input (iterator<T>, boolean) and produce an iterator<T, boolean>.
*/
private CloseableIterator<Tuple2<FileDataReadResult, Boolean>> combine(
CloseableIterator<FileDataReadResult> fileReadDataIter,
boolean isFromCheckpoint) {
return new CloseableIterator<Tuple2<FileDataReadResult, Boolean>>() {
private CloseableIterator<ActionIterElem> combine(
CloseableIterator<FileDataReadResult> fileReadDataIter,
boolean isFromCheckpoint,
long version) {
return new CloseableIterator<ActionIterElem>() {
@Override
public boolean hasNext() {
return fileReadDataIter.hasNext();
}

@Override
public Tuple2<FileDataReadResult, Boolean> next() {
return new Tuple2<>(fileReadDataIter.next(), isFromCheckpoint);
public ActionIterElem next() {
return new ActionIterElem(fileReadDataIter.next(), isFromCheckpoint, version);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private static class UniqueFileActionTuple extends Tuple2<URI, Optional<String>>

private final TableClient tableClient;
private final Path tableRoot;
private final CloseableIterator<Tuple2<FileDataReadResult, Boolean>> iter;
private final CloseableIterator<ActionIterElem> iter;
private final Set<UniqueFileActionTuple> tombstonesFromJson;
private final Set<UniqueFileActionTuple> addFilesFromJson;

Expand All @@ -71,9 +71,9 @@ private static class UniqueFileActionTuple extends Tuple2<URI, Optional<String>>
private boolean closed;

ActiveAddFilesIterator(
TableClient tableClient,
CloseableIterator<Tuple2<FileDataReadResult, Boolean>> iter,
Path tableRoot) {
TableClient tableClient,
CloseableIterator<ActionIterElem> iter,
Path tableRoot) {
this.tableClient = tableClient;
this.tableRoot = tableRoot;
this.iter = iter;
Expand Down Expand Up @@ -144,9 +144,9 @@ private void prepareNext() {
return; // no next result, and no batches to read
}

final Tuple2<FileDataReadResult, Boolean> _next = iter.next();
final FileDataReadResult fileDataReadResult = _next._1;
final boolean isFromCheckpoint = _next._2;
final ActionIterElem _next = iter.next();
final FileDataReadResult fileDataReadResult = _next.getFileDataReadResult();
final boolean isFromCheckpoint = _next.isFromCheckpoint();
final ColumnarBatch addRemoveColumnarBatch = fileDataReadResult.getData();

assert (addRemoveColumnarBatch.getSchema().equals(LogReplay.ADD_REMOVE_READ_SCHEMA));
Expand Down
Loading

0 comments on commit af4ca35

Please sign in to comment.