Skip to content

Commit

Permalink
solve format issue
Browse files Browse the repository at this point in the history
  • Loading branch information
eason-yuchen-liu committed Jun 29, 2024
1 parent 15a8316 commit 3834cc9
Showing 1 changed file with 13 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType




class HDFSBackedStateDataSourceReadCDCSuite extends StateDataSourceChangeDataReadSuite {
override protected def newStateStoreProvider(): HDFSBackedStateStoreProvider =
new HDFSBackedStateStoreProvider
Expand Down Expand Up @@ -93,42 +90,34 @@ abstract class StateDataSourceChangeDataReadSuite extends StateDataSourceTestBas
}

test("new getChangeDataReader API of state store provider") {
def withNewStateStore (provider: StateStoreProvider, version: Int, f: StateStore => Unit):
def withNewStateStore(provider: StateStoreProvider, version: Int)(f: StateStore => Unit):
Unit = {
val stateStore = provider.getStore(version)
f(stateStore)
stateStore.commit()
}

withTempDir { tempDir =>
val provider = getNewStateStoreProvider(tempDir.getAbsolutePath)
withNewStateStore(provider, 0, stateStore =>
put(stateStore, "a", 1, 1)
)
withNewStateStore(provider, 1, stateStore =>
put(stateStore, "b", 2, 2)
)
withNewStateStore(provider, 2, stateStore =>
stateStore.remove(dataToKeyRow("a", 1))
)
withNewStateStore(provider, 3, stateStore =>
stateStore.remove(dataToKeyRow("b", 2))
)
withNewStateStore(provider, 0) { stateStore =>
put(stateStore, "a", 1, 1) }
withNewStateStore(provider, 1) { stateStore =>
put(stateStore, "b", 2, 2) }
withNewStateStore(provider, 2) { stateStore =>
stateStore.remove(dataToKeyRow("a", 1)) }
withNewStateStore(provider, 3) { stateStore =>
stateStore.remove(dataToKeyRow("b", 2)) }

val reader =
provider.asInstanceOf[SupportsFineGrainedReplay].getStateStoreChangeDataReader(1, 4)
// assert(reader.getNext() === (RecordType.PUT_RECORD, Row()))
// println(reader.next())
// println(reader.next())
// println(reader.next())
// println(reader.next())

assert(reader.next() === (RecordType.PUT_RECORD, dataToKeyRow("a", 1), dataToValueRow(1), 0))
assert(reader.next() === (RecordType.PUT_RECORD, dataToKeyRow("b", 2), dataToValueRow(2), 1))
assert(reader.next() ===
(RecordType.DELETE_RECORD, dataToKeyRow("a", 1), null, 2))
assert(reader.next() ===
(RecordType.DELETE_RECORD, dataToKeyRow("b", 2), null, 3))
}

}

test("cdc read limit state") {
Expand All @@ -150,12 +139,11 @@ abstract class StateDataSourceChangeDataReadSuite extends StateDataSourceTestBas
.option(StateSourceOptions.CDC_START_BATCH_ID, 0)
.option(StateSourceOptions.CDC_END_BATCH_ID, 2)
.load(tempDir.getAbsolutePath)
stateDf.show()

val expectedDf = Seq(
Row(Row(null), Row(4), "PUT", 0, 0),
Row(Row(null), Row(8), "PUT", 1, 0),
Row(Row(null), Row(10), "PUT", 2, 0),
Row(Row(null), Row(10), "PUT", 2, 0)
)

checkAnswer(stateDf, expectedDf)
Expand Down Expand Up @@ -241,8 +229,8 @@ abstract class StateDataSourceChangeDataReadSuite extends StateDataSourceTestBas
inputData.toDF().select(col("_1").as("leftKey"), col("_2").as("leftValue"))
val rightDf =
inputData.toDF().select((col("_1") * 2).as("rightKey"), col("_2").as("rightValue"))
// val df = getStreamStreamJoinQuery(inputData)
val df = leftDf.join(rightDf).where("leftKey == rightKey")

testStream(df)(
StartStream(checkpointLocation = tempDir.getAbsolutePath),
AddData(inputData, (1, 1L), (2, 2L)),
Expand All @@ -257,7 +245,6 @@ abstract class StateDataSourceChangeDataReadSuite extends StateDataSourceTestBas
.option(StateSourceOptions.CDC_START_BATCH_ID, 0)
.option(StateSourceOptions.CDC_END_BATCH_ID, 1)
.load(tempDir.getAbsolutePath)
keyWithIndexToValueDf.show()

val keyWithIndexToValueExpectedDf = Seq(
Row(Row(3, 0), Row(3, 3, false), "PUT", 1, 1),
Expand All @@ -275,7 +262,6 @@ abstract class StateDataSourceChangeDataReadSuite extends StateDataSourceTestBas
.option(StateSourceOptions.CDC_START_BATCH_ID, 0)
.option(StateSourceOptions.CDC_END_BATCH_ID, 1)
.load(tempDir.getAbsolutePath)
keyToNumValuesDf.show()

val keyToNumValuesDfExpectedDf = Seq(
Row(Row(3), Row(1), "PUT", 1, 1),
Expand Down

0 comments on commit 3834cc9

Please sign in to comment.