diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala index b4358607d324b..12d418ceeb7b9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala @@ -27,9 +27,6 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType - - - class HDFSBackedStateDataSourceReadCDCSuite extends StateDataSourceChangeDataReadSuite { override protected def newStateStoreProvider(): HDFSBackedStateStoreProvider = new HDFSBackedStateStoreProvider @@ -93,34 +90,27 @@ abstract class StateDataSourceChangeDataReadSuite extends StateDataSourceTestBas } test("new getChangeDataReader API of state store provider") { - def withNewStateStore (provider: StateStoreProvider, version: Int, f: StateStore => Unit): + def withNewStateStore(provider: StateStoreProvider, version: Int)(f: StateStore => Unit): Unit = { val stateStore = provider.getStore(version) f(stateStore) stateStore.commit() } + withTempDir { tempDir => val provider = getNewStateStoreProvider(tempDir.getAbsolutePath) - withNewStateStore(provider, 0, stateStore => - put(stateStore, "a", 1, 1) - ) - withNewStateStore(provider, 1, stateStore => - put(stateStore, "b", 2, 2) - ) - withNewStateStore(provider, 2, stateStore => - stateStore.remove(dataToKeyRow("a", 1)) - ) - withNewStateStore(provider, 3, stateStore => - stateStore.remove(dataToKeyRow("b", 2)) - ) + withNewStateStore(provider, 0) { stateStore => + put(stateStore, "a", 1, 1) } + withNewStateStore(provider, 1) { stateStore => + put(stateStore, "b", 2, 2) } + withNewStateStore(provider, 2) { stateStore => + stateStore.remove(dataToKeyRow("a", 1)) } + withNewStateStore(provider, 3) { stateStore => + stateStore.remove(dataToKeyRow("b", 2)) } val reader = provider.asInstanceOf[SupportsFineGrainedReplay].getStateStoreChangeDataReader(1, 4) -// assert(reader.getNext() === (RecordType.PUT_RECORD, Row())) -// println(reader.next()) -// println(reader.next()) -// println(reader.next()) -// println(reader.next()) + assert(reader.next() === (RecordType.PUT_RECORD, dataToKeyRow("a", 1), dataToValueRow(1), 0)) assert(reader.next() === (RecordType.PUT_RECORD, dataToKeyRow("b", 2), dataToValueRow(2), 1)) assert(reader.next() === @@ -128,7 +118,6 @@ abstract class StateDataSourceChangeDataReadSuite extends StateDataSourceTestBas assert(reader.next() === (RecordType.DELETE_RECORD, dataToKeyRow("b", 2), null, 3)) } - } test("cdc read limit state") { @@ -150,12 +139,11 @@ abstract class StateDataSourceChangeDataReadSuite extends StateDataSourceTestBas .option(StateSourceOptions.CDC_START_BATCH_ID, 0) .option(StateSourceOptions.CDC_END_BATCH_ID, 2) .load(tempDir.getAbsolutePath) - stateDf.show() val expectedDf = Seq( Row(Row(null), Row(4), "PUT", 0, 0), Row(Row(null), Row(8), "PUT", 1, 0), - Row(Row(null), Row(10), "PUT", 2, 0), + Row(Row(null), Row(10), "PUT", 2, 0) ) checkAnswer(stateDf, expectedDf) @@ -241,8 +229,8 @@ abstract class StateDataSourceChangeDataReadSuite extends StateDataSourceTestBas inputData.toDF().select(col("_1").as("leftKey"), col("_2").as("leftValue")) val rightDf = inputData.toDF().select((col("_1") * 2).as("rightKey"), col("_2").as("rightValue")) -// val df = getStreamStreamJoinQuery(inputData) val df = leftDf.join(rightDf).where("leftKey == rightKey") + testStream(df)( StartStream(checkpointLocation = tempDir.getAbsolutePath), AddData(inputData, (1, 1L), (2, 2L)), @@ -257,7 +245,6 @@ abstract class StateDataSourceChangeDataReadSuite extends StateDataSourceTestBas .option(StateSourceOptions.CDC_START_BATCH_ID, 0) .option(StateSourceOptions.CDC_END_BATCH_ID, 1) .load(tempDir.getAbsolutePath) - keyWithIndexToValueDf.show() val keyWithIndexToValueExpectedDf = Seq( Row(Row(3, 0), Row(3, 3, false), "PUT", 1, 1), @@ -275,7 +262,6 @@ abstract class StateDataSourceChangeDataReadSuite extends StateDataSourceTestBas .option(StateSourceOptions.CDC_START_BATCH_ID, 0) .option(StateSourceOptions.CDC_END_BATCH_ID, 1) .load(tempDir.getAbsolutePath) - keyToNumValuesDf.show() val keyToNumValuesDfExpectedDf = Seq( Row(Row(3), Row(1), "PUT", 1, 1),