Skip to content

Commit

Permalink
tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Jun 14, 2024
1 parent 45e82c9 commit 95d5b2a
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSch
import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, StateStore, StateStoreErrors}
import org.apache.spark.sql.streaming.ListState

object ListStateImpl {
def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = {
new ColumnFamilySchemaV1(
stateName,
KEY_ROW_SCHEMA,
VALUE_ROW_SCHEMA,
NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA),
true)
}
}
/**
* Provides concrete implementation for list of values associated with a state variable
* used in the streaming transformWithState operator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, NoP
import org.apache.spark.sql.streaming.{ListState, TTLConfig}
import org.apache.spark.util.NextIterator

object ListStateImplWithTTL {
def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = {
new ColumnFamilySchemaV1(
stateName,
KEY_ROW_SCHEMA,
VALUE_ROW_SCHEMA_WITH_TTL,
NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA),
true)
}
}
/**
* Class that provides a concrete implementation for a list state state associated with state
* variables (with ttl expiration support) used in the streaming transformWithState operator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSch
import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, PrefixKeyScanStateEncoderSpec, StateStore, StateStoreErrors, UnsafeRowPair}
import org.apache.spark.sql.streaming.MapState

object MapStateImpl {
def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = {
new ColumnFamilySchemaV1(
stateName,
COMPOSITE_KEY_ROW_SCHEMA,
VALUE_ROW_SCHEMA,
PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, 1), false)
}
}

class MapStateImpl[K, V](
store: StateStore,
stateName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, Pre
import org.apache.spark.sql.streaming.{MapState, TTLConfig}
import org.apache.spark.util.NextIterator

object MapStateImplWithTTL {
def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = {
new ColumnFamilySchemaV1(
stateName,
COMPOSITE_KEY_ROW_SCHEMA,
VALUE_ROW_SCHEMA_WITH_TTL,
PrefixKeyScanStateEncoderSpec(COMPOSITE_KEY_ROW_SCHEMA, 1), false)
}
}
/**
* Class that provides a concrete implementation for map state associated with state
* variables (with ttl expiration support) used in the streaming transformWithState operator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class StatefulProcessorHandleImpl(
new ValueStateImpl[T](store, stateName, keyEncoder, valEncoder)
case None =>
stateVariables.add(new StateVariableInfo(stateName, ValueState, false))
val colFamilySchema = resultState.columnFamilySchema
val colFamilySchema = ValueStateImpl.columnFamilySchema(stateName)
columnFamilySchemas.add(colFamilySchema)
null
}
Expand All @@ -163,7 +163,7 @@ class StatefulProcessorHandleImpl(
valueStateWithTTL
case None =>
stateVariables.add(new StateVariableInfo(stateName, ValueState, true))
val colFamilySchema = resultState.columnFamilySchema
val colFamilySchema = ValueStateImplWithTTL.columnFamilySchema(stateName)
columnFamilySchemas.add(colFamilySchema)
null
}
Expand Down Expand Up @@ -268,6 +268,8 @@ class StatefulProcessorHandleImpl(
new ListStateImpl[T](store, stateName, keyEncoder, valEncoder)
case None =>
stateVariables.add(new StateVariableInfo(stateName, ListState, false))
val colFamilySchema = ListStateImpl.columnFamilySchema(stateName)
columnFamilySchemas.add(colFamilySchema)
null
}
}
Expand Down Expand Up @@ -303,7 +305,7 @@ class StatefulProcessorHandleImpl(
listStateWithTTL
case None =>
stateVariables.add(new StateVariableInfo(stateName, ListState, true))
val colFamilySchema = resultState.columnFamilySchema
val colFamilySchema = ListStateImplWithTTL.columnFamilySchema(stateName)
columnFamilySchemas.add(colFamilySchema)
null
}
Expand All @@ -320,7 +322,7 @@ class StatefulProcessorHandleImpl(
new MapStateImpl[K, V](store, stateName, keyEncoder, userKeyEnc, valEncoder)
case None =>
stateVariables.add(new StateVariableInfo(stateName, ValueState, false))
val colFamilySchema = resultState.columnFamilySchema
val colFamilySchema = MapStateImpl.columnFamilySchema(stateName)
columnFamilySchemas.add(colFamilySchema)
null
}
Expand All @@ -342,7 +344,7 @@ class StatefulProcessorHandleImpl(
mapStateWithTTL
case None =>
stateVariables.add(new StateVariableInfo(stateName, MapState, true))
val colFamilySchema = resultState.columnFamilySchema
val colFamilySchema = MapStateImplWithTTL.columnFamilySchema(stateName)
columnFamilySchemas.add(colFamilySchema)
null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSch
import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, StateStore}
import org.apache.spark.sql.streaming.{TTLConfig, ValueState}

object ValueStateImplWithTTL {
def columnFamilySchema(stateName: String): ColumnFamilySchemaV1 = {
new ColumnFamilySchemaV1(
stateName,
KEY_ROW_SCHEMA,
VALUE_ROW_SCHEMA_WITH_TTL,
NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA),
false)
}
}

/**
* Class that provides a concrete implementation for a single value state associated with state
* variables (with ttl expiration support) used in the streaming transformWithState operator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,10 @@ import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, Path}
import org.json4s.{Formats, NoTypeHints}
import org.json4s.JsonAST.JValue
import org.json4s.jackson.Serialization

import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, MetadataVersionUtil}
import org.apache.spark.util.AccumulatorV2

/**
* Metadata for a state store instance.
Expand Down

0 comments on commit 95d5b2a

Please sign in to comment.