Skip to content

Commit

Permalink
Implement minor PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
scottsand-db committed Nov 2, 2023
1 parent f5865b7 commit ef4dfc2
Showing 1 changed file with 25 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,11 @@ public LogReplay(
this.dataPath = dataPath;
this.logSegment = logSegment;
this.tableClient = tableClient;
this.protocolAndMetadata = new Lazy<>(
() -> this.loadTableProtocolAndMetadata(snapshotHint)
);
this.protocolAndMetadata = new Lazy<>(() -> {
final Tuple2<Protocol, Metadata> res = loadTableProtocolAndMetadata(snapshotHint);
validateSupportedTable(res._1, res._2);
return res;
});
}

/////////////////
Expand Down Expand Up @@ -139,6 +141,14 @@ public CloseableIterator<FilteredColumnarBatch> getAddFilesAsColumnarBatches() {
// Helper Methods //
////////////////////

/**
* Returns the latest Protocol and Metadata from the delta files in the `logSegment`.
* Does *not* validate that this delta-kernel connector understands the table at that protocol.
*
* Uses the `snapshotHint` to bound how many delta files it reads. i.e. we only need to read
* delta files newer than the hint to search for any new P & M. If we don't find them, we can
* just use the P and/or M from the hint.
*/
private Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(
Optional<SnapshotHint> snapshotHint) {
Protocol protocol = null;
Expand All @@ -151,24 +161,26 @@ private Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(
PROTOCOL_METADATA_READ_SCHEMA)) {
while (reverseIter.hasNext()) {
final ActionIterElem nextElem = reverseIter.next();
final ColumnarBatch columnarBatch = nextElem.getFileDataReadResult().getData();
final long version = nextElem.getVersion();

assert(columnarBatch.getSchema().equals(PROTOCOL_METADATA_READ_SCHEMA));
// Load this lazily (as needed). We may be able to just use the hint.
ColumnarBatch columnarBatch = null;

if (protocol == null) {
if (snapshotHint.isPresent() && version <= snapshotHint.get().getVersion()) {
if (snapshotHint.isPresent() && version == snapshotHint.get().getVersion()) {
// Our snapshot hint already tells us the latest Protocol at that hint's
// table version. If we haven't yet found a newer protocol, then we can
// short circuit and use the hint's protocol.
protocol = snapshotHint.get().getProtocol();

if (metadata != null) {
// Stop since we have found the latest Protocol and Metadata.
validateSupportedTable(protocol, metadata);
return new Tuple2<>(protocol, metadata);
}
} else {
columnarBatch = nextElem.getFileDataReadResult().getData();
assert(columnarBatch.getSchema().equals(PROTOCOL_METADATA_READ_SCHEMA));

final ColumnVector protocolVector = columnarBatch.getColumnVector(0);

for (int i = 0; i < protocolVector.getSize(); i++) {
Expand All @@ -177,11 +189,10 @@ private Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(

if (metadata != null) {
// Stop since we have found the latest Protocol and Metadata.
validateSupportedTable(protocol, metadata);
return new Tuple2<>(protocol, metadata);
}

break; // We already found the protocol, exit this for-loop
break; // We just found the protocol, exit this for-loop
}
}
}
Expand All @@ -199,6 +210,10 @@ private Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(
return new Tuple2<>(protocol, metadata);
}
} else {
if (columnarBatch == null) {
columnarBatch = nextElem.getFileDataReadResult().getData();
assert(columnarBatch.getSchema().equals(PROTOCOL_METADATA_READ_SCHEMA));
}
final ColumnVector metadataVector = columnarBatch.getColumnVector(1);

for (int i = 0; i < metadataVector.getSize(); i++) {
Expand All @@ -212,7 +227,7 @@ private Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(
return new Tuple2<>(protocol, metadata);
}

break; // We already found the metadata, exit this for-loop
break; // We just found the metadata, exit this for-loop
}
}
}
Expand Down

0 comments on commit ef4dfc2

Please sign in to comment.