Skip to content

Commit

Permalink
[SPARK-48370][SPARK-48258][CONNECT][PYTHON][FOLLOW-UP] Refactor local…
Browse files Browse the repository at this point in the history
… and eager required fields in CheckpointCommand

### What changes were proposed in this pull request?

This PR is a followup of #46683 and #46570 that refactors `local` and `eager` required fields in `CheckpointCommand`

### Why are the changes needed?

To make the code easier to maintain.

### Does this PR introduce _any_ user-facing change?

No, the main change has not been released yet.

### How was this patch tested?

Manually tested.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46712 from HyukjinKwon/SPARK-48370-SPARK-48258-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
HyukjinKwon committed May 23, 2024
1 parent a393d6c commit e8f58a9
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3481,7 +3481,7 @@ class Dataset[T] private[sql] (
sparkSession.newDataset(agnosticEncoder) { builder =>
val command = sparkSession.newCommand { builder =>
builder.getCheckpointCommandBuilder
.setLocal(reliableCheckpoint)
.setLocal(!reliableCheckpoint)
.setEager(eager)
.setRelation(this.plan.getRoot)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,10 @@ message CheckpointCommand {
// (Required) The logical plan to checkpoint.
Relation relation = 1;

// (Optional) Locally checkpoint using a local temporary
// (Required) Locally checkpoint using a local temporary
// directory in Spark Connect server (Spark Driver)
optional bool local = 2;
bool local = 2;

// (Optional) Whether to checkpoint this dataframe immediately.
optional bool eager = 3;
// (Required) Whether to checkpoint this dataframe immediately.
bool eager = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3523,15 +3523,9 @@ class SparkConnectPlanner(
responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = {
val target = Dataset
.ofRows(session, transformRelation(checkpointCommand.getRelation))
val checkpointed = if (checkpointCommand.hasLocal && checkpointCommand.hasEager) {
target.localCheckpoint(eager = checkpointCommand.getEager)
} else if (checkpointCommand.hasLocal) {
target.localCheckpoint()
} else if (checkpointCommand.hasEager) {
target.checkpoint(eager = checkpointCommand.getEager)
} else {
target.checkpoint()
}
val checkpointed = target.checkpoint(
eager = checkpointCommand.getEager,
reliableCheckpoint = !checkpointCommand.getLocal)

val dfId = UUID.randomUUID().toString
logInfo(log"Caching DataFrame with id ${MDC(DATAFRAME_ID, dfId)}")
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/connect/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2096,7 +2096,7 @@ def offset(self, n: int) -> ParentDataFrame:
return DataFrame(plan.Offset(child=self._plan, offset=n), session=self._session)

def checkpoint(self, eager: bool = True) -> "DataFrame":
cmd = plan.Checkpoint(child=self._plan, local=True, eager=eager)
cmd = plan.Checkpoint(child=self._plan, local=False, eager=eager)
_, properties = self._session.client.execute_command(cmd.command(self._session.client))
assert "checkpoint_command_result" in properties
checkpointed = properties["checkpoint_command_result"]
Expand Down
10 changes: 5 additions & 5 deletions python/pyspark/sql/connect/proto/commands_pb2.py

Large diffs are not rendered by default.

41 changes: 6 additions & 35 deletions python/pyspark/sql/connect/proto/commands_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2174,55 +2174,26 @@ class CheckpointCommand(google.protobuf.message.Message):
def relation(self) -> pyspark.sql.connect.proto.relations_pb2.Relation:
"""(Required) The logical plan to checkpoint."""
local: builtins.bool
"""(Optional) Locally checkpoint using a local temporary
"""(Required) Locally checkpoint using a local temporary
directory in Spark Connect server (Spark Driver)
"""
eager: builtins.bool
"""(Optional) Whether to checkpoint this dataframe immediately."""
"""(Required) Whether to checkpoint this dataframe immediately."""
def __init__(
self,
*,
relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ...,
local: builtins.bool | None = ...,
eager: builtins.bool | None = ...,
local: builtins.bool = ...,
eager: builtins.bool = ...,
) -> None: ...
def HasField(
self,
field_name: typing_extensions.Literal[
"_eager",
b"_eager",
"_local",
b"_local",
"eager",
b"eager",
"local",
b"local",
"relation",
b"relation",
],
self, field_name: typing_extensions.Literal["relation", b"relation"]
) -> builtins.bool: ...
def ClearField(
self,
field_name: typing_extensions.Literal[
"_eager",
b"_eager",
"_local",
b"_local",
"eager",
b"eager",
"local",
b"local",
"relation",
b"relation",
"eager", b"eager", "local", b"local", "relation", b"relation"
],
) -> None: ...
@typing.overload
def WhichOneof(
self, oneof_group: typing_extensions.Literal["_eager", b"_eager"]
) -> typing_extensions.Literal["eager"] | None: ...
@typing.overload
def WhichOneof(
self, oneof_group: typing_extensions.Literal["_local", b"_local"]
) -> typing_extensions.Literal["local"] | None: ...

global___CheckpointCommand = CheckpointCommand
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ class Dataset[T] private[sql](
* checkpoint directory. If false creates a local checkpoint using
* the caching subsystem
*/
private def checkpoint(eager: Boolean, reliableCheckpoint: Boolean): Dataset[T] = {
private[sql] def checkpoint(eager: Boolean, reliableCheckpoint: Boolean): Dataset[T] = {
val actionName = if (reliableCheckpoint) "checkpoint" else "localCheckpoint"
withAction(actionName, queryExecution) { physicalPlan =>
val internalRdd = physicalPlan.execute().map(_.copy())
Expand Down

0 comments on commit e8f58a9

Please sign in to comment.