From ef4dfc2d7f2946d6ab03eaef61badc8d6595b107 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Thu, 2 Nov 2023 09:52:25 -0700 Subject: [PATCH] Implement minor PR feedback --- .../kernel/internal/replay/LogReplay.java | 35 +++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java index 4590649c6c..1e5401d86d 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java @@ -109,9 +109,11 @@ public LogReplay( this.dataPath = dataPath; this.logSegment = logSegment; this.tableClient = tableClient; - this.protocolAndMetadata = new Lazy<>( - () -> this.loadTableProtocolAndMetadata(snapshotHint) - ); + this.protocolAndMetadata = new Lazy<>(() -> { + final Tuple2 res = loadTableProtocolAndMetadata(snapshotHint); + validateSupportedTable(res._1, res._2); + return res; + }); } ///////////////// @@ -139,6 +141,14 @@ public CloseableIterator getAddFilesAsColumnarBatches() { // Helper Methods // //////////////////// + /** + * Returns the latest Protocol and Metadata from the delta files in the `logSegment`. + * Does *not* validate that this delta-kernel connector understands the table at that protocol. + * + * Uses the `snapshotHint` to bound how many delta files it reads. i.e. we only need to read + * delta files newer than the hint to search for any new P & M. If we don't find them, we can + * just use the P and/or M from the hint. + */ private Tuple2 loadTableProtocolAndMetadata( Optional snapshotHint) { Protocol protocol = null; @@ -151,13 +161,13 @@ private Tuple2 loadTableProtocolAndMetadata( PROTOCOL_METADATA_READ_SCHEMA)) { while (reverseIter.hasNext()) { final ActionIterElem nextElem = reverseIter.next(); - final ColumnarBatch columnarBatch = nextElem.getFileDataReadResult().getData(); final long version = nextElem.getVersion(); - assert(columnarBatch.getSchema().equals(PROTOCOL_METADATA_READ_SCHEMA)); + // Load this lazily (as needed). We may be able to just use the hint. + ColumnarBatch columnarBatch = null; if (protocol == null) { - if (snapshotHint.isPresent() && version <= snapshotHint.get().getVersion()) { + if (snapshotHint.isPresent() && version == snapshotHint.get().getVersion()) { // Our snapshot hint already tells us the latest Protocol at that hint's // table version. If we haven't yet found a newer protocol, then we can // short circuit and use the hint's protocol. @@ -165,10 +175,12 @@ private Tuple2 loadTableProtocolAndMetadata( if (metadata != null) { // Stop since we have found the latest Protocol and Metadata. - validateSupportedTable(protocol, metadata); return new Tuple2<>(protocol, metadata); } } else { + columnarBatch = nextElem.getFileDataReadResult().getData(); + assert(columnarBatch.getSchema().equals(PROTOCOL_METADATA_READ_SCHEMA)); + final ColumnVector protocolVector = columnarBatch.getColumnVector(0); for (int i = 0; i < protocolVector.getSize(); i++) { @@ -177,11 +189,10 @@ private Tuple2 loadTableProtocolAndMetadata( if (metadata != null) { // Stop since we have found the latest Protocol and Metadata. - validateSupportedTable(protocol, metadata); return new Tuple2<>(protocol, metadata); } - break; // We already found the protocol, exit this for-loop + break; // We just found the protocol, exit this for-loop } } } @@ -199,6 +210,10 @@ private Tuple2 loadTableProtocolAndMetadata( return new Tuple2<>(protocol, metadata); } } else { + if (columnarBatch == null) { + columnarBatch = nextElem.getFileDataReadResult().getData(); + assert(columnarBatch.getSchema().equals(PROTOCOL_METADATA_READ_SCHEMA)); + } final ColumnVector metadataVector = columnarBatch.getColumnVector(1); for (int i = 0; i < metadataVector.getSize(); i++) { @@ -212,7 +227,7 @@ private Tuple2 loadTableProtocolAndMetadata( return new Tuple2<>(protocol, metadata); } - break; // We already found the metadata, exit this for-loop + break; // We just found the metadata, exit this for-loop } } }