Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48589][SQL][SS] Add option snapshotStartBatchId and snapshotPartitionId to state data source #46944

Closed

Conversation

eason-yuchen-liu
Copy link
Contributor

@eason-yuchen-liu eason-yuchen-liu commented Jun 11, 2024

What changes were proposed in this pull request?

This PR defines two new options, snapshotStartBatchId and snapshotPartitionId, for the existing state reader. Both of them should be provided at the same time.

  1. When there is no snapshot file at snapshotStartBatch (note there is an off-by-one issue between version and batch Id), throw an exception.
  2. Otherwise, the reader should continue to rebuild the state by reading delta files only, and ignore all snapshot files afterwards.
  3. Note that if a batchId option is already specified. That batchId is the ending batchId, we should then end at that batchId.
  4. This feature supports state generated by HDFS state store provider and RocksDB state store provider with changelog checkpointing enabled. It does not support RocksDB with changelog disabled which is the default for RocksDB.

Why are the changes needed?

Sometimes when a snapshot is corrupted, users want to bypass it when reading a later state. This PR gives user ability to specify the starting snapshot version and partition. This feature can be useful for debugging purpose.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Created test cases for testing edge cases for the input of new options. Created test for the new public function replayReadStateFromSnapshot. Created integration test for the new options against four stateful operators: limit, aggregation, deduplication, stream-stream join. Instead of generating states within the tests which is unstable, I prepare golden files for the integration test.

Was this patch authored or co-authored using generative AI tooling?

No.

@eason-yuchen-liu eason-yuchen-liu changed the title [SPARK-48588][SS] Add option snapshotStartBatchId and snapshotPartitionId to state data source [SPARK-48589][SS] Add option snapshotStartBatchId and snapshotPartitionId to state data source Jun 11, 2024
@eason-yuchen-liu eason-yuchen-liu changed the title [SPARK-48589][SS] Add option snapshotStartBatchId and snapshotPartitionId to state data source [SPARK-48589][SQL][SS] Add option snapshotStartBatchId and snapshotPartitionId to state data source Jun 11, 2024
@eason-yuchen-liu eason-yuchen-liu marked this pull request as ready for review June 12, 2024 20:23
@eason-yuchen-liu
Copy link
Contributor Author

eason-yuchen-liu commented Jun 12, 2024

Is there necessity to add an end-to-end test for the options? If so, I can create another PR. The way to construct it is probably by sleeping for a sufficiently long time for maintenance task to run. @anishshri-db @HeartSaVioR

throw QueryExecutionErrors.failedToReadSnapshotFileNotExistsError(
snapshotFile(startVersion), toString(), null)
}
synchronized { putStateIntoStateCacheMap(startVersion, startVersionMap.get) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to refactor this with existing loadMap fcn? or add helper function for shared logic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For HDFS, it is hard because the common part is really small. But for RocksDB, there is room for refactoring. For example, this is PR is to test whether we can extract a common part of both load function. #46927

* @param endVersion checkpoint version to end with
*/
def getStore(startVersion: Long, endVersion: Long): StateStore =
throw new SparkUnsupportedOperationException("getStore with startVersion and endVersion " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just put nothing here? like

def getStore(version: Long): StateStore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we cannot, because to make this method optional, it has to have a default implementation, otherwise a build error will be thrown.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm - what error do you see here ? can you paste it please ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Building on the assumption that when users create custom state store provider and they do not implement this method because it is optional. They will see errors like

Missing implementation for member of trait StateStoreProvider

import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders, Row}
import org.apache.spark.sql.catalyst.expressions.{BoundReference, GenericInternalRow}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, OffsetSeqLog}
import org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, RocksDBStateStoreProvider, StateStore}
import org.apache.spark.sql.execution.streaming.state._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this because these three are everything in that pkg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The reason is I use three new classes in this pkg. I think it would be too long to include them all. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea this should be good

@WweiL
Copy link
Contributor

WweiL commented Jun 13, 2024

@WweiL
Tagging myself so it shows on my dashboard


case Some(snapshotStartBatchId) =>
if (!provider.isInstanceOf[SupportsFineGrainedReplayFromSnapshot]) {
StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we've been used to throw the exception here explicitly rather than the method to throw?

Copy link
Contributor Author

@eason-yuchen-liu eason-yuchen-liu Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error will be thrown in two places (another is in JoinStateManager), so I create a function for it. This way the error also gets its own error class.

* @param endVersion checkpoint version to end with
* @return [[HDFSBackedStateStore]]
*/
override def replayStateFromSnapshot(startVersion: Long, endVersion: Long): StateStore = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please apply the same: for all methods in this PR, if the meaning of startVersion is actually the snapshot version to begin with, let's use snapshotVersion which is more clearer about the intention.

if (startVersion < 1) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion)
}
if (endVersion < startVersion || endVersion < 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we'd like to give a different error for invalid value (negative) vs criteria (endVersion has to be equal or later than startVersion). The error message wouldn't give the context on why it failed. Users could check the option value by themselves but ideally better to kindly tell them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the former check already covers the latter one; startVersion has to be equal or higher than 1, so endVersion also has to be equal or higher than 1. The latter is only needed when we want to produce different error on different pattern of invalid value.

Copy link
Contributor Author

@eason-yuchen-liu eason-yuchen-liu Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we'd like to give a different error for invalid value (negative) vs criteria (endVersion has to be equal or later than startVersion). The error message wouldn't give the context on why it failed. Users could check the option value by themselves but ideally better to kindly tell them.

There is a better error message in StateDataSource where the users' input is verified and it is the only usage of this function. I think the error message here will not matter too much since I would not expect users to call this method directly.

if (endVersion < startVersion) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion)
}
rocksDB.loadFromSnapshot(startVersion, endVersion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to double check, readOnly flag is not needed unlike the path of load(), do I understand correctly? If then shall we just implement one of two and call other to reduce redundant code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Good catch.

*/
trait SupportsFineGrainedReplayFromSnapshot {
/**
* Used by snapshotStartBatchId option when reading state generated by join operation as data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not couple too much with implementation details, especially the current implementation of Spark codebase. 3rd party state store provider does not need to know about this. If we think this is needed to please them to implement the two different methods, let's just leave this method for the interface level and wrap the state store with read-only in caller side.

* @param snapshotVersion checkpoint version of the snapshot to start with
* @param endVersion checkpoint version to end with
*/
def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long): ReadStateStore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we just provide the default implementation to wrap the read-write state store to read-only? We wouldn't need to let state store provider to implement this except the case they can optimize specifically for read-only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can update the comment like the way why they may want to implement this or they would just leave it as default, instead of describing where will call this method.

@@ -264,3 +296,8 @@ class StateStoreValueRowFormatValidationFailure(errorMsg: String)
extends SparkRuntimeException(
errorClass = "STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE",
messageParameters = Map("errorMsg" -> errorMsg))

class StateStoreProviderDoesNotSupportFineGrainedReplay(inputClass: String)
extends SparkUnsupportedOperationException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: one more space

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where to insert?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before e, it's only one space.

stateStoreProvider.getStore(stateInfo.get.storeVersion)
if (snapshotStartVersion.isDefined) {
if (!stateStoreProvider.isInstanceOf[SupportsFineGrainedReplayFromSnapshot]) {
StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -796,4 +973,141 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
testForSide("right")
}
}

protected def testSnapshotNotFound(): Unit = {
withTempDir(tempDir => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: according to Databricks scala style, this should be withTempDir { tempDir =>, could save one indentation (curly brace)

provider.asInstanceOf[SupportsFineGrainedReplayFromSnapshot]
.replayReadStateFromSnapshot(1, 2)
}
checkError(exc, "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we can provide users the better error message e.g. snapshot file does not exist, but I'm OK with addressing this later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put it later along with the changelog file not found exception.

}

protected def testGetReadStoreWithStartVersion(): Unit = {
withTempDir(tempDir => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}

protected def testSnapshotPartitionId(): Unit = {
withTempDir(tempDir => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.option(StateSourceOptions.SNAPSHOT_START_BATCH_ID, 0)
.option(
StateSourceOptions.SNAPSHOT_PARTITION_ID,
spark.sessionState.conf.numShufflePartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just need to be > 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, it is because of limit operator.

})
}

// Todo: Should also test against state generated by 3.5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it remaining TODO, or does not need to be done at all? If we don't need to, let's remove the golden files for 3.5. I guess it's not intentional to test cross version compatibility.

checkAnswer(stateSnapshotDf, stateDf)
}

protected def testSnapshotOnLimitState(providerName: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General comment for tests using golden file: please leave the code as comment or so how you build the golden file (the query you used), to let other be able to re-build the golden file if needed.

}

/**
* Consturct the state at endVersion from snapshot from snapshotVersion.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Construct the state at

@@ -367,6 +368,22 @@ private[sql] class RocksDBStateStoreProvider
private def verify(condition: => Boolean, msg: String): Unit = {
if (!condition) { throw new IllegalStateException(msg) }
}

override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a small function comment here ?

errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_MISSING_SNAPSHOT_FILE",
messageParameters = Map(
"fileToRead" -> fileToRead,
"clazz" -> clazz))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a common convention for the parameter naming ? this will be visible in the error message that is thrown, correct ?

Copy link
Contributor Author

@eason-yuchen-liu eason-yuchen-liu Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


protected def testSnapshotOnDeduplicateState(providerName: String): Unit = {
/** The golden files are generated by:
withSQLConf({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent seems odd in these places, but maybe not a big deal for such comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will move one tab right.

}
*/
val resourceUri = this.getClass.getResource(
s"/structured-streaming/checkpoint-version-4.0.0/$providerName/limit/"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we were going to run against 3.5.1 and then run the query once to generate the operator metadata. Did we decide against that ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly saying, the test about checkpoint with no operator metadata to create operator metadata should have been done in state metadata testing. If we don't have one, we'd better to have one, but no need to couple with this PR.

Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm - pending some minor comments

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only nits and minors. Thanks for the patience!

@@ -900,7 +900,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
*/
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion)
logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} to " +
logInfo(log"Retrieved snapshot at version " +
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after version, as the next string does not start with space.

@@ -917,9 +918,10 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
override def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long):
ReadStateStore = {
val newMap = replayLoadedMapForStoreFromSnapshot(snapshotVersion, endVersion)
logInfo(log"Retrieved version ${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} to " +
logInfo(log"Retrieved snapshot at version " +
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same here

}
*/
val resourceUri = this.getClass.getResource(
s"/structured-streaming/checkpoint-version-4.0.0/$providerName/limit/"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly saying, the test about checkpoint with no operator metadata to create operator metadata should have been done in state metadata testing. If we don't have one, we'd better to have one, but no need to couple with this PR.

@@ -264,3 +296,8 @@ class StateStoreValueRowFormatValidationFailure(errorMsg: String)
extends SparkRuntimeException(
errorClass = "STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE",
messageParameters = Map("errorMsg" -> errorMsg))

class StateStoreProviderDoesNotSupportFineGrainedReplay(inputClass: String)
extends SparkUnsupportedOperationException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before e, it's only one space.

@eason-yuchen-liu
Copy link
Contributor Author

Thanks for all the careful checks by @HeartSaVioR @anishshri-db @WweiL. This PR is ready to merge.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

HeartSaVioR pushed a commit that referenced this pull request Jul 11, 2024
…to State Data Source

### What changes were proposed in this pull request?

In #46944 and #47188, we introduced some new options to the State Data Source. This PR aims to explain these new features in the documentation.

### Why are the changes needed?

It is necessary to reflect the latest change in the documentation website.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

The API Doc website can be rendered correctly.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47274 from eason-yuchen-liu/snapshot-doc.

Authored-by: Yuchen Liu <yuchen.liu@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
jingz-db pushed a commit to jingz-db/spark that referenced this pull request Jul 22, 2024
…to State Data Source

### What changes were proposed in this pull request?

In apache#46944 and apache#47188, we introduced some new options to the State Data Source. This PR aims to explain these new features in the documentation.

### Why are the changes needed?

It is necessary to reflect the latest change in the documentation website.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

The API Doc website can be rendered correctly.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47274 from eason-yuchen-liu/snapshot-doc.

Authored-by: Yuchen Liu <yuchen.liu@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
…rtitionId to state data source

### What changes were proposed in this pull request?

This PR defines two new options, snapshotStartBatchId and snapshotPartitionId, for the existing state reader. Both of them should be provided at the same time.
1. When there is no snapshot file at `snapshotStartBatch` (note there is an off-by-one issue between version and batch Id), throw an exception.
2. Otherwise, the reader should continue to rebuild the state by reading delta files only, and ignore all snapshot files afterwards.
3. Note that if a `batchId` option is already specified. That batchId is the ending batchId, we should then end at that batchId.
4. This feature supports state generated by HDFS state store provider and RocksDB state store provider with changelog checkpointing enabled. **It does not support RocksDB with changelog disabled which is the default for RocksDB.**

### Why are the changes needed?

Sometimes when a snapshot is corrupted, users want to bypass it when reading a later state. This PR gives user ability to specify the starting snapshot version and partition. This feature can be useful for debugging purpose.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Created test cases for testing edge cases for the input of new options. Created test for the new public function `replayReadStateFromSnapshot`. Created integration test for the new options against four stateful operators: limit, aggregation, deduplication, stream-stream join. Instead of generating states within the tests which is unstable, I prepare golden files for the integration test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46944 from eason-yuchen-liu/skipSnapshotAtBatch.

Lead-authored-by: Yuchen Liu <yuchen.liu@databricks.com>
Co-authored-by: Yuchen Liu <170372783+eason-yuchen-liu@users.noreply.github.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
…to State Data Source

### What changes were proposed in this pull request?

In apache#46944 and apache#47188, we introduced some new options to the State Data Source. This PR aims to explain these new features in the documentation.

### Why are the changes needed?

It is necessary to reflect the latest change in the documentation website.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

The API Doc website can be rendered correctly.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#47274 from eason-yuchen-liu/snapshot-doc.

Authored-by: Yuchen Liu <yuchen.liu@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants