Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48370][SPARK-48258][CONNECT][PYTHON][FOLLOW-UP] Refactor local and eager required fields in CheckpointCommand #46712

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3481,7 +3481,7 @@ class Dataset[T] private[sql] (
sparkSession.newDataset(agnosticEncoder) { builder =>
val command = sparkSession.newCommand { builder =>
builder.getCheckpointCommandBuilder
.setLocal(reliableCheckpoint)
.setLocal(!reliableCheckpoint)
.setEager(eager)
.setRelation(this.plan.getRoot)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,10 @@ message CheckpointCommand {
// (Required) The logical plan to checkpoint.
Relation relation = 1;

// (Optional) Locally checkpoint using a local temporary
// (Required) Locally checkpoint using a local temporary
// directory in Spark Connect server (Spark Driver)
optional bool local = 2;
bool local = 2;

// (Optional) Whether to checkpoint this dataframe immediately.
optional bool eager = 3;
// (Required) Whether to checkpoint this dataframe immediately.
bool eager = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3523,15 +3523,9 @@ class SparkConnectPlanner(
responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = {
val target = Dataset
.ofRows(session, transformRelation(checkpointCommand.getRelation))
val checkpointed = if (checkpointCommand.hasLocal && checkpointCommand.hasEager) {
target.localCheckpoint(eager = checkpointCommand.getEager)
} else if (checkpointCommand.hasLocal) {
target.localCheckpoint()
} else if (checkpointCommand.hasEager) {
target.checkpoint(eager = checkpointCommand.getEager)
} else {
target.checkpoint()
}
val checkpointed = target.checkpoint(
eager = checkpointCommand.getEager,
reliableCheckpoint = !checkpointCommand.getLocal)

val dfId = UUID.randomUUID().toString
logInfo(log"Caching DataFrame with id ${MDC(DATAFRAME_ID, dfId)}")
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/connect/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2096,7 +2096,7 @@ def offset(self, n: int) -> ParentDataFrame:
return DataFrame(plan.Offset(child=self._plan, offset=n), session=self._session)

def checkpoint(self, eager: bool = True) -> "DataFrame":
cmd = plan.Checkpoint(child=self._plan, local=True, eager=eager)
cmd = plan.Checkpoint(child=self._plan, local=False, eager=eager)
_, properties = self._session.client.execute_command(cmd.command(self._session.client))
assert "checkpoint_command_result" in properties
checkpointed = properties["checkpoint_command_result"]
Expand Down
10 changes: 5 additions & 5 deletions python/pyspark/sql/connect/proto/commands_pb2.py

Large diffs are not rendered by default.

41 changes: 6 additions & 35 deletions python/pyspark/sql/connect/proto/commands_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2174,55 +2174,26 @@ class CheckpointCommand(google.protobuf.message.Message):
def relation(self) -> pyspark.sql.connect.proto.relations_pb2.Relation:
"""(Required) The logical plan to checkpoint."""
local: builtins.bool
"""(Optional) Locally checkpoint using a local temporary
"""(Required) Locally checkpoint using a local temporary
directory in Spark Connect server (Spark Driver)
"""
eager: builtins.bool
"""(Optional) Whether to checkpoint this dataframe immediately."""
"""(Required) Whether to checkpoint this dataframe immediately."""
def __init__(
self,
*,
relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ...,
local: builtins.bool | None = ...,
eager: builtins.bool | None = ...,
local: builtins.bool = ...,
eager: builtins.bool = ...,
) -> None: ...
def HasField(
self,
field_name: typing_extensions.Literal[
"_eager",
b"_eager",
"_local",
b"_local",
"eager",
b"eager",
"local",
b"local",
"relation",
b"relation",
],
self, field_name: typing_extensions.Literal["relation", b"relation"]
) -> builtins.bool: ...
def ClearField(
self,
field_name: typing_extensions.Literal[
"_eager",
b"_eager",
"_local",
b"_local",
"eager",
b"eager",
"local",
b"local",
"relation",
b"relation",
"eager", b"eager", "local", b"local", "relation", b"relation"
],
) -> None: ...
@typing.overload
def WhichOneof(
self, oneof_group: typing_extensions.Literal["_eager", b"_eager"]
) -> typing_extensions.Literal["eager"] | None: ...
@typing.overload
def WhichOneof(
self, oneof_group: typing_extensions.Literal["_local", b"_local"]
) -> typing_extensions.Literal["local"] | None: ...

global___CheckpointCommand = CheckpointCommand
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ class Dataset[T] private[sql](
* checkpoint directory. If false creates a local checkpoint using
* the caching subsystem
*/
private def checkpoint(eager: Boolean, reliableCheckpoint: Boolean): Dataset[T] = {
private[sql] def checkpoint(eager: Boolean, reliableCheckpoint: Boolean): Dataset[T] = {
val actionName = if (reliableCheckpoint) "checkpoint" else "localCheckpoint"
withAction(actionName, queryExecution) { physicalPlan =>
val internalRdd = physicalPlan.execute().map(_.copy())
Expand Down