Skip to content

Commit

Permalink
[SPARK-48567][SS][FOLLOWUP] StreamingQuery.lastProgress should return…
Browse files Browse the repository at this point in the history
… the actual StreamingQueryProgress

This reverts commit d067fc6, which reverted 042804a, essentially brings it back. 042804a failed the 3.5 client <> 4.0 server test, but the test was decided to turned off for cross-version test in #47468

### What changes were proposed in this pull request?

This PR is created after discussion in this closed one: #46886
I was trying to fix a bug (in connect, query.lastProgress doesn't have `numInputRows`, `inputRowsPerSecond`, and `processedRowsPerSecond`), and we reached the conclusion that what purposed in this PR should be the ultimate fix.

In python, for both classic spark and spark connect, the return type of `lastProgress` is `Dict` (and `recentProgress` is `List[Dict]`), but in scala it's the actual `StreamingQueryProgress` object:
https://github.com/apache/spark/blob/1a5d22aa2ffe769435be4aa6102ef961c55b9593/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala#L94-L101

This API discrepancy brings some confusion, like in Scala, users can do `query.lastProgress.batchId`, while in Python they have to do `query.lastProgress["batchId"]`.

This PR makes `StreamingQuery.lastProgress` to return the actual `StreamingQueryProgress` (and `StreamingQuery.recentProgress` to return `List[StreamingQueryProgress]`).

To prevent breaking change, we extend `StreamingQueryProgress` to be a subclass of `dict`, so existing code accessing using dictionary method (e.g. `query.lastProgress["id"]`) is still functional.

### Why are the changes needed?

API parity

### Does this PR introduce _any_ user-facing change?

Yes, now `StreamingQuery.lastProgress` returns the actual `StreamingQueryProgress` (and `StreamingQuery.recentProgress` returns `List[StreamingQueryProgress]`).

### How was this patch tested?

Added unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #47470 from WweiL/bring-back-lastProgress.

Authored-by: Wei Liu <wei.liu@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
WweiL authored and HyukjinKwon committed Jul 24, 2024
1 parent 239d77b commit 22eb6c4
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 99 deletions.
9 changes: 5 additions & 4 deletions python/pyspark/sql/connect/streaming/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
QueryProgressEvent,
QueryIdleEvent,
QueryTerminatedEvent,
StreamingQueryProgress,
)
from pyspark.sql.streaming.query import (
StreamingQuery as PySparkStreamingQuery,
Expand Down Expand Up @@ -110,21 +111,21 @@ def status(self) -> Dict[str, Any]:
status.__doc__ = PySparkStreamingQuery.status.__doc__

@property
def recentProgress(self) -> List[Dict[str, Any]]:
def recentProgress(self) -> List[StreamingQueryProgress]:
cmd = pb2.StreamingQueryCommand()
cmd.recent_progress = True
progress = self._execute_streaming_query_cmd(cmd).recent_progress.recent_progress_json
return [json.loads(p) for p in progress]
return [StreamingQueryProgress.fromJson(json.loads(p)) for p in progress]

recentProgress.__doc__ = PySparkStreamingQuery.recentProgress.__doc__

@property
def lastProgress(self) -> Optional[Dict[str, Any]]:
def lastProgress(self) -> Optional[StreamingQueryProgress]:
cmd = pb2.StreamingQueryCommand()
cmd.last_progress = True
progress = self._execute_streaming_query_cmd(cmd).recent_progress.recent_progress_json
if len(progress) > 0:
return json.loads(progress[-1])
return StreamingQueryProgress.fromJson(json.loads(progress[-1]))
else:
return None

Expand Down
Loading

0 comments on commit 22eb6c4

Please sign in to comment.