Skip to content

Commit

Permalink
resolve all conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
jingz-db committed Jul 11, 2024
1 parent c8b5b70 commit 61cf525
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class StateDataSource extends TableProvider with DataSourceRegister {

// if the operator is transformWithState, but the operator properties are empty, then
// the user has not defined any state variables for the operator
val operatorProperties = opMetadata.operatorProperties
val operatorProperties = opMetadata.operatorPropertiesJson
if (operatorProperties.isEmpty) {
throw StateDataSourceErrors.invalidOptionValue(STATE_VAR_NAME,
"No state variable names are defined for the transformWithState operator")
Expand Down Expand Up @@ -210,9 +210,6 @@ object StateSourceOptions extends DataSourceOptions {
val JOIN_SIDE = newOption("joinSide")
val SNAPSHOT_START_BATCH_ID = newOption("snapshotStartBatchId")
val SNAPSHOT_PARTITION_ID = newOption("snapshotPartitionId")
val READ_CHANGE_FEED = newOption("readChangeFeed")
val CHANGE_START_BATCH_ID = newOption("changeStartBatchId")
val CHANGE_END_BATCH_ID = newOption("changeEndBatchId")
val STATE_VAR_NAME = newOption("stateVarName")
val READ_CHANGE_FEED = newOption("readChangeFeed")
val CHANGE_START_BATCH_ID = newOption("changeStartBatchId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ abstract class StatePartitionReaderBase(
} else {
require(stateStoreMetadata.length == 1)
require(stateStoreMetadata.head.version == 1)
stateStoreMetadata.head.numColsPrefixKey.get
stateStoreMetadata.head.numColsPrefixKey
}

// TODO: currently we don't support RangeKeyScanStateEncoderSpec. Support for this will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources.v2.state.metadata

import java.util

import scala.jdk.CollectionConverters._

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -225,7 +226,8 @@ class StateMetadataPartitionReader(
allOperatorStateMetadata.flatMap { operatorStateMetadata =>
operatorStateMetadata._1.version match {
case 1 =>
val operatorStateMetadataV1 = operatorStateMetadata.asInstanceOf[OperatorStateMetadataV1]
val operatorStateMetadataV1 =
operatorStateMetadata._1.asInstanceOf[OperatorStateMetadataV1]
operatorStateMetadataV1.stateStoreInfo.map { stateStoreMetadata =>
StateMetadataTableEntry(
operatorStateMetadata._1.version,
Expand All @@ -241,7 +243,8 @@ class StateMetadataPartitionReader(
}

case 2 =>
val operatorStateMetadataV2 = operatorStateMetadata.asInstanceOf[OperatorStateMetadataV2]
val operatorStateMetadataV2 =
operatorStateMetadata._1.asInstanceOf[OperatorStateMetadataV2]
operatorStateMetadataV2.stateStoreInfo.map { stateStoreMetadata =>
StateMetadataTableEntry(
operatorStateMetadata._1.version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ object OperatorStateMetadataUtils {
Serialization.read[OperatorStateMetadataV1](in)
case 2 =>
Serialization.read[OperatorStateMetadataV2](in)

case 2 =>
Serialization.read[OperatorStateMetadataV2](in)

case _ =>
throw new IllegalArgumentException(s"Failed to deserialize operator metadata with " +
s"version=$version")
Expand Down

0 comments on commit 61cf525

Please sign in to comment.