Skip to content

Commit

Permalink
[Kernel] [3.3] Remove Coordinated Commits from public API (#3938) (#3973
Browse files Browse the repository at this point in the history
)

Cherry-pick fc81d12 to `branch-3.3`
  • Loading branch information
scottsand-db authored Dec 13, 2024
1 parent f29aea0 commit 9c967b9
Show file tree
Hide file tree
Showing 26 changed files with 36 additions and 1,500 deletions.
22 changes: 0 additions & 22 deletions kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.delta.kernel.engine;

import io.delta.kernel.annotation.Evolving;
import java.util.Map;

/**
* Interface encapsulating all clients needed by the Delta Kernel in order to read the Delta table.
Expand Down Expand Up @@ -55,25 +54,4 @@ public interface Engine {
* @return An implementation of {@link ParquetHandler}.
*/
ParquetHandler getParquetHandler();

/**
* Retrieves a {@link CommitCoordinatorClientHandler} for the specified commit coordinator client.
*
* <p>{@link CommitCoordinatorClientHandler} helps Kernel perform commits to a table which is
* owned by a commit coordinator.
*
* @see <a
* href="https://github.com/delta-io/delta/blob/master/protocol_rfcs/managed-commits.md#sample-commit-owner-api">Coordinated
* commit protocol table feature</a>.
* <p>This method creates and returns an implementation of {@link
* CommitCoordinatorClientHandler} based on the provided name and configuration of the
* underlying commit coordinator client.
* @param name The identifier or name of the underlying commit coordinator client
* @param conf The configuration settings for the underlying commit coordinator client
* @return An implementation of {@link CommitCoordinatorClientHandler} configured for the
* specified client
* @since 3.3.0
*/
CommitCoordinatorClientHandler getCommitCoordinatorClientHandler(
String name, Map<String, String> conf);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Snapshot;
import io.delta.kernel.engine.CommitCoordinatorClientHandler;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.DomainMetadata;
Expand Down Expand Up @@ -180,18 +179,6 @@ public Optional<Long> getLatestTransactionVersion(Engine engine, String applicat
*/
public Optional<TableCommitCoordinatorClientHandler> getTableCommitCoordinatorClientHandlerOpt(
Engine engine) {
return COORDINATED_COMMITS_COORDINATOR_NAME
.fromMetadata(metadata)
.map(
commitCoordinatorStr -> {
CommitCoordinatorClientHandler handler =
engine.getCommitCoordinatorClientHandler(
commitCoordinatorStr,
COORDINATED_COMMITS_COORDINATOR_CONF.fromMetadata(metadata));
return new TableCommitCoordinatorClientHandler(
handler,
logPath.toString(),
COORDINATED_COMMITS_TABLE_CONF.fromMetadata(metadata));
});
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits;
package io.delta.kernel.internal.coordinatedcommits;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.utils.FileStatus;

/**
* Representation of a commit file. It contains the version of the commit, the file status of the
* commit, and the timestamp of the commit. This is used when we want to get the commit information
* from the {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit} and {@link
* io.delta.kernel.engine.CommitCoordinatorClientHandler#getCommits} APIs.
* from the {@link CommitCoordinatorClientHandler#commit} and {@link
* CommitCoordinatorClientHandler#getCommits} APIs.
*
* @since 3.3.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
* limitations under the License.
*/

package io.delta.kernel.engine;
package io.delta.kernel.internal.coordinatedcommits;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.coordinatedcommits.*;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
import io.delta.kernel.internal.coordinatedcommits.actions.AbstractMetadata;
import io.delta.kernel.internal.coordinatedcommits.actions.AbstractProtocol;
import io.delta.kernel.utils.CloseableIterator;
import java.io.IOException;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits;
package io.delta.kernel.internal.coordinatedcommits;

import io.delta.kernel.annotation.Evolving;

/**
* Exception raised by {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit}
* Exception raised by {@link CommitCoordinatorClientHandler#commit}
*
* <pre>
* | retryable | conflict | meaning |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits;
package io.delta.kernel.internal.coordinatedcommits;

import io.delta.kernel.annotation.Evolving;

/**
* Response container for {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit}.
* Response container for {@link CommitCoordinatorClientHandler#commit}.
*
* @since 3.3.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits;
package io.delta.kernel.internal.coordinatedcommits;

import io.delta.kernel.annotation.Evolving;
import java.util.List;
import java.util.Map;

/**
* Response container for {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#getCommits(
* String, Map, Long, Long)}. Holds all the commits that have not been backfilled as per the commit
* coordinator.
* Response container for {@link CommitCoordinatorClientHandler#getCommits( String, Map, Long,
* Long)}. Holds all the commits that have not been backfilled as per the commit coordinator.
*
* @since 3.3.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits;
package io.delta.kernel.internal.coordinatedcommits;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractCommitInfo;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
import io.delta.kernel.internal.coordinatedcommits.actions.AbstractCommitInfo;
import io.delta.kernel.internal.coordinatedcommits.actions.AbstractMetadata;
import io.delta.kernel.internal.coordinatedcommits.actions.AbstractProtocol;

/**
* A container class to inform the {@link io.delta.kernel.engine.CommitCoordinatorClientHandler}
* about any changes in Protocol/Metadata
* A container class to inform the {@link CommitCoordinatorClientHandler} about any changes in
* Protocol/Metadata
*
* @since 3.3.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits.actions;
package io.delta.kernel.internal.coordinatedcommits.actions;

import io.delta.kernel.annotation.Evolving;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits.actions;
package io.delta.kernel.internal.coordinatedcommits.actions;

import io.delta.kernel.annotation.Evolving;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.delta.kernel.engine.coordinatedcommits.actions;
package io.delta.kernel.internal.coordinatedcommits.actions;

import io.delta.kernel.annotation.Evolving;
import java.util.Set;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@

import io.delta.kernel.*;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.engine.coordinatedcommits.Commit;
import io.delta.kernel.exceptions.CheckpointAlreadyExistsException;
import io.delta.kernel.exceptions.InvalidTableException;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.*;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.checkpoints.*;
import io.delta.kernel.internal.coordinatedcommits.Commit;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.ListUtils;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package io.delta.kernel.internal.snapshot;

import io.delta.kernel.data.Row;
import io.delta.kernel.engine.CommitCoordinatorClientHandler;
import io.delta.kernel.engine.coordinatedcommits.CommitFailedException;
import io.delta.kernel.engine.coordinatedcommits.CommitResponse;
import io.delta.kernel.engine.coordinatedcommits.GetCommitsResponse;
import io.delta.kernel.engine.coordinatedcommits.UpdatedActions;
import io.delta.kernel.internal.coordinatedcommits.CommitCoordinatorClientHandler;
import io.delta.kernel.internal.coordinatedcommits.CommitFailedException;
import io.delta.kernel.internal.coordinatedcommits.CommitResponse;
import io.delta.kernel.internal.coordinatedcommits.GetCommitsResponse;
import io.delta.kernel.internal.coordinatedcommits.UpdatedActions;
import io.delta.kernel.utils.CloseableIterator;
import java.io.IOException;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
*/
package io.delta.kernel.internal.util;

import io.delta.kernel.engine.coordinatedcommits.actions.AbstractCommitInfo;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata;
import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.coordinatedcommits.actions.AbstractCommitInfo;
import io.delta.kernel.internal.coordinatedcommits.actions.AbstractMetadata;
import io.delta.kernel.internal.coordinatedcommits.actions.AbstractProtocol;
import java.util.*;

public class CoordinatedCommitsUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ import java.{lang => javaLang}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import io.delta.kernel.data.{ColumnarBatch, ColumnVector}
import io.delta.kernel.engine.CommitCoordinatorClientHandler
import io.delta.kernel.engine.coordinatedcommits.{Commit, CommitResponse, GetCommitsResponse}
import io.delta.kernel.exceptions.InvalidTableException
import io.delta.kernel.expressions.Predicate
import io.delta.kernel.internal.actions.CommitInfo
import io.delta.kernel.internal.checkpoints.{CheckpointInstance, SidecarFile}
import io.delta.kernel.internal.coordinatedcommits.{Commit, CommitCoordinatorClientHandler, CommitResponse, GetCommitsResponse}
import io.delta.kernel.internal.fs.Path
import io.delta.kernel.internal.snapshot.{LogSegment, SnapshotManager, TableCommitCoordinatorClientHandler}
import io.delta.kernel.internal.util.{FileNames, Utils}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.delta.kernel.test
import io.delta.kernel.engine._
import io.delta.kernel.data.{ColumnVector, ColumnarBatch, FilteredColumnarBatch, Row}
import io.delta.kernel.expressions.{Column, Expression, ExpressionEvaluator, Predicate, PredicateEvaluator}
import io.delta.kernel.internal.coordinatedcommits.CommitCoordinatorClientHandler
import io.delta.kernel.types.{DataType, StructType}
import io.delta.kernel.utils.{CloseableIterator, DataFileStatus, FileStatus}

Expand Down Expand Up @@ -70,11 +71,6 @@ trait MockEngineUtils {
override def getParquetHandler: ParquetHandler =
Option(parquetHandler).getOrElse(
throw new UnsupportedOperationException("not supported in this test suite"))

override def getCommitCoordinatorClientHandler(name: String, conf: util.Map[String, String]):
CommitCoordinatorClientHandler =
Option(commitCoordinatorClientHandler).getOrElse(
throw new UnsupportedOperationException("not supported in this test suite"))
}
}
}
Expand Down
Loading

0 comments on commit 9c967b9

Please sign in to comment.