From 8826848720f0a349461da6d502c50e739a771dce Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Tue, 9 Jul 2024 14:12:53 -0700 Subject: [PATCH 1/4] doc --- docs/structured-streaming-state-data-source.md | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/docs/structured-streaming-state-data-source.md b/docs/structured-streaming-state-data-source.md index 9866991306690..812b5f1026fae 100644 --- a/docs/structured-streaming-state-data-source.md +++ b/docs/structured-streaming-state-data-source.md @@ -42,7 +42,7 @@ Users can read an instance of state store, which is matched to a single stateful Note that there could be an exception, e.g. stream-stream join, which leverages multiple state store instances internally. The data source abstracts the internal representation away from users and provides a user-friendly approach to read the state. See the section for stream-stream join for more details. -### Creating a State store for Batch Queries (all defaults) +### Creating a state store for batch queries (all defaults)
@@ -144,9 +144,22 @@ The following configurations are optional: (none) Represents the target side to read from. This option is used when users want to read the state from stream-stream join. + + snapshotStartBatchId + numeric value + + If specified, force to read the snapshot at this batch ID, then changelogs will be replayed until 'batchId' or its default. Note that snapshot batch ID starts with 0 and equals to snapshot version ID minus 1. This option must be used together with 'snapshotPartitionId'. + + + snapshotPartitionId + numeric value + + If specified, only this specific partition will be read. Note that partition ID starts with 0. This option must be used together with 'snapshotStartBatchId'. + -### Reading state for Stream-stream join + +### Reading state for stream-stream join Structured Streaming implements the stream-stream join feature via leveraging multiple instances of state store internally. These instances logically compose buffers to store the input rows for left and right. @@ -179,7 +192,6 @@ df = spark \
{% highlight scala %} - val df = spark .read .format("state-metadata") From ccd00b6ca8fd9c5f96145f02ec941f72eeafeeea Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Tue, 9 Jul 2024 14:17:05 -0700 Subject: [PATCH 2/4] minor --- docs/structured-streaming-state-data-source.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/structured-streaming-state-data-source.md b/docs/structured-streaming-state-data-source.md index 812b5f1026fae..34c2de8884adc 100644 --- a/docs/structured-streaming-state-data-source.md +++ b/docs/structured-streaming-state-data-source.md @@ -158,7 +158,6 @@ The following configurations are optional: - ### Reading state for stream-stream join Structured Streaming implements the stream-stream join feature via leveraging multiple instances of state store internally. From 0b104cb99889e3fe34bff41a3e54e5e5cb397caf Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Wed, 10 Jul 2024 14:45:42 -0700 Subject: [PATCH 3/4] readChangeFeed options --- .../structured-streaming-state-data-source.md | 99 ++++++++++++++++++- 1 file changed, 98 insertions(+), 1 deletion(-) diff --git a/docs/structured-streaming-state-data-source.md b/docs/structured-streaming-state-data-source.md index 34c2de8884adc..f0bfb51e06107 100644 --- a/docs/structured-streaming-state-data-source.md +++ b/docs/structured-streaming-state-data-source.md @@ -48,7 +48,6 @@ provides a user-friendly approach to read the state. See the section for stream-
{% highlight python %} - df = spark \ .read \ .format("statestore") \ @@ -156,8 +155,27 @@ The following configurations are optional: If specified, only this specific partition will be read. Note that partition ID starts with 0. This option must be used together with 'snapshotStartBatchId'. + + readChangeFeed + boolean + false + If set to true, will read the change of state over microbatches. The output table schema will also change. Two columns 'batch_id'(long) and 'change_type'(string) will be appended to the front. Option 'changeStartBatchId' must be specified with this option. Option 'batchId', 'joinSide', 'snapshotStartBatchId', 'snapshotPartitionId' is conflict with this option. An example usage of this option can be found below. + + + changeStartBatchId + numeric value + + Represents the first batch to read in the read change feed mode. This option requires 'readChangeFeed' to be set to true. + + + changeEndBatchId + numeric value + latest commited batchId + Represents the last batch to read in the read change feed mode. This option requires 'readChangeFeed' to be set to true. + + ### Reading state for stream-stream join Structured Streaming implements the stream-stream join feature via leveraging multiple instances of state store internally. @@ -166,6 +184,85 @@ These instances logically compose buffers to store the input rows for left and r Since it is more obvious to users to reason about, the data source provides the option 'joinSide' to read the buffered input for specific side of the join. To enable the functionality to read the internal state store instance directly, we also allow specifying the option 'storeName', with restriction that 'storeName' and 'joinSide' cannot be specified together. +### Reading state change over microbatches + +If we want to understand the change of state store over microbatches instead of the whole state store at a particular microbatch, 'readChangeFeed' is the option to use. +For example, this is the code to read the change of state from batch 2 to the latest committed batch. + +
+ +
+{% highlight python %} + +df = spark \ +.read \ +.format("statestore") \ +.option("readChangeFeed", true) \ +.option("changeStartBatchId", 2) \ +.load("") + +{% endhighlight %} +
+ +
+{% highlight scala %} + +val df = spark +.read +.format("statestore") +.option("readChangeFeed", true) +.option("changeStartBatchId", 2) +.load("") + +{% endhighlight %} +
+ +
+{% highlight java %} + +Dataset df = spark +.read() +.format("statestore") +.option("readChangeFeed", true) +.option("changeStartBatchId", 2) +.load(""); + +{% endhighlight %} +
+ +
+ +The output schema will also be different from the normal output. + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
ColumnTypeNote
batch_idlong
change_typestringThere are two possible values: 'update' and 'delete'. Update represents either inserting a non-existing key-value pair or updating an existing key with new value. The 'value' field will be null for delete records.
keystruct (depends on the type for state key)
valuestruct (depends on the type for state value)
partition_idint
+ ## State metadata source Before querying the state from existing checkpoint via state data source, users would like to understand the information for the checkpoint, especially about state operator. This includes which operators and state store instances are available in the checkpoint, available range of batch IDs, etc. From 1ce86d9ee0e24e74f2ea76be8c4dd9ee5f40835f Mon Sep 17 00:00:00 2001 From: Yuchen Liu Date: Thu, 11 Jul 2024 11:02:21 -0700 Subject: [PATCH 4/4] address comments from Jungtaek --- docs/structured-streaming-state-data-source.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/structured-streaming-state-data-source.md b/docs/structured-streaming-state-data-source.md index f0bfb51e06107..1fac39566c1da 100644 --- a/docs/structured-streaming-state-data-source.md +++ b/docs/structured-streaming-state-data-source.md @@ -48,6 +48,7 @@ provides a user-friendly approach to read the state. See the section for stream-
{% highlight python %} + df = spark \ .read \ .format("statestore") \ @@ -159,7 +160,7 @@ The following configurations are optional: readChangeFeed boolean false - If set to true, will read the change of state over microbatches. The output table schema will also change. Two columns 'batch_id'(long) and 'change_type'(string) will be appended to the front. Option 'changeStartBatchId' must be specified with this option. Option 'batchId', 'joinSide', 'snapshotStartBatchId', 'snapshotPartitionId' is conflict with this option. An example usage of this option can be found below. + If set to true, will read the change of state over microbatches. The output table schema will also differ. Details can be found in section "Reading state changes over microbatches". Option 'changeStartBatchId' must be specified with this option. Option 'batchId', 'joinSide', 'snapshotStartBatchId' and 'snapshotPartitionId' cannot be used together with this option. changeStartBatchId @@ -184,7 +185,7 @@ These instances logically compose buffers to store the input rows for left and r Since it is more obvious to users to reason about, the data source provides the option 'joinSide' to read the buffered input for specific side of the join. To enable the functionality to read the internal state store instance directly, we also allow specifying the option 'storeName', with restriction that 'storeName' and 'joinSide' cannot be specified together. -### Reading state change over microbatches +### Reading state changes over microbatches If we want to understand the change of state store over microbatches instead of the whole state store at a particular microbatch, 'readChangeFeed' is the option to use. For example, this is the code to read the change of state from batch 2 to the latest committed batch.