Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48850][DOCS][SS][SQL] Add documentation for new options added to State Data Source #47274

Closed
wants to merge 4 commits into from
Closed
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 112 additions & 4 deletions docs/structured-streaming-state-data-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,12 @@ Users can read an instance of state store, which is matched to a single stateful
Note that there could be an exception, e.g. stream-stream join, which leverages multiple state store instances internally. The data source abstracts the internal representation away from users and
provides a user-friendly approach to read the state. See the section for stream-stream join for more details.

### Creating a State store for Batch Queries (all defaults)
### Creating a state store for batch queries (all defaults)

<div class="codetabs">

<div data-lang="python" markdown="1">
{% highlight python %}

df = spark \
.read \
.format("statestore") \
Expand Down Expand Up @@ -144,16 +143,126 @@ The following configurations are optional:
<td>(none)</td>
<td>Represents the target side to read from. This option is used when users want to read the state from stream-stream join.</td>
</tr>
<tr>
<td>snapshotStartBatchId</td>
<td>numeric value</td>
<td></td>
<td>If specified, force to read the snapshot at this batch ID, then changelogs will be replayed until 'batchId' or its default. Note that snapshot batch ID starts with 0 and equals to snapshot version ID minus 1. This option must be used together with 'snapshotPartitionId'.</td>
</tr>
<tr>
<td>snapshotPartitionId</td>
<td>numeric value</td>
<td></td>
<td>If specified, only this specific partition will be read. Note that partition ID starts with 0. This option must be used together with 'snapshotStartBatchId'.</td>
</tr>
<tr>
<td>readChangeFeed</td>
<td>boolean</td>
<td>false</td>
<td>If set to true, will read the change of state over microbatches. The output table schema will also change. Two columns 'batch_id'(long) and 'change_type'(string) will be appended to the front. Option 'changeStartBatchId' must be specified with this option. Option 'batchId', 'joinSide', 'snapshotStartBatchId', 'snapshotPartitionId' is conflict with this option. An example usage of this option can be found below.</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output table schema will also change. Two columns 'batch_id'(long) and 'change_type'(string) will be appended to the front.

We could simply defer to the next section, to make the explanation to be concise.

is conflict with this option

cannot be used with this option (probably more clearer)

An example usage of this option can be found below.

probably better to explicitly mention the section name? link would be even better.

</tr>
<tr>
<td>changeStartBatchId</td>
<td>numeric value</td>
<td></td>
<td>Represents the first batch to read in the read change feed mode. This option requires 'readChangeFeed' to be set to true.</td>
</tr>
<tr>
<td>changeEndBatchId</td>
<td>numeric value</td>
<td>latest commited batchId</td>
<td>Represents the last batch to read in the read change feed mode. This option requires 'readChangeFeed' to be set to true.</td>
</tr>
</table>

### Reading state for Stream-stream join

### Reading state for stream-stream join

Structured Streaming implements the stream-stream join feature via leveraging multiple instances of state store internally.
These instances logically compose buffers to store the input rows for left and right.

Since it is more obvious to users to reason about, the data source provides the option 'joinSide' to read the buffered input for specific side of the join.
To enable the functionality to read the internal state store instance directly, we also allow specifying the option 'storeName', with restriction that 'storeName' and 'joinSide' cannot be specified together.

### Reading state change over microbatches

If we want to understand the change of state store over microbatches instead of the whole state store at a particular microbatch, 'readChangeFeed' is the option to use.
For example, this is the code to read the change of state from batch 2 to the latest committed batch.

<div class="codetabs">

<div data-lang="python" markdown="1">
{% highlight python %}

df = spark \
.read \
.format("statestore") \
.option("readChangeFeed", true) \
.option("changeStartBatchId", 2) \
.load("<checkpointLocation>")

{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}

val df = spark
.read
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")

{% endhighlight %}
</div>

<div data-lang="java" markdown="1">
{% highlight java %}

Dataset<Row> df = spark
.read()
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>");

{% endhighlight %}
</div>

</div>

The output schema will also be different from the normal output.

<table>
<thead><tr><th>Column</th><th>Type</th><th>Note</th></tr></thead>
<tr>
<td>batch_id</td>
<td>long</td>
<td></td>
</tr>
<tr>
<td>change_type</td>
<td>string</td>
<td>There are two possible values: 'update' and 'delete'. Update represents either inserting a non-existing key-value pair or updating an existing key with new value. The 'value' field will be null for delete records.</td>
</tr>
<tr>
<td>key</td>
<td>struct (depends on the type for state key)</td>
<td></td>
</tr>
<tr>
<td>value</td>
<td>struct (depends on the type for state value)</td>
<td></td>
</tr>
<tr>
<td>partition_id</td>
<td>int</td>
<td></td>
</tr>
</table>

## State metadata source

Before querying the state from existing checkpoint via state data source, users would like to understand the information for the checkpoint, especially about state operator. This includes which operators and state store instances are available in the checkpoint, available range of batch IDs, etc.
Expand All @@ -179,7 +288,6 @@ df = spark \

<div data-lang="scala" markdown="1">
{% highlight scala %}

val df = spark
.read
.format("state-metadata")
Expand Down