Skip to content

Commit

Permalink
enhance flint spark configuration module (#1727)
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo committed Jun 19, 2023
1 parent 43d3566 commit 255f040
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 84 deletions.
24 changes: 16 additions & 8 deletions flint/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,18 +274,26 @@ trait FlintSparkSkippingStrategy {
Here is an example for read index data from AWS OpenSearch domain.

```scala
val aos = Map(
"host" -> "yourdomain.us-west-2.es.amazonaws.com",
"port" -> "-1",
"scheme" -> "https",
"auth" -> "sigv4",
"region" -> "us-west-2")

spark.conf.set("spark.datasource.flint.host", "yourdomain.us-west-2.es.amazonaws.com")
spark.conf.set("spark.datasource.flint.port", "-1")
spark.conf.set("spark.datasource.flint.scheme", "https")
spark.conf.set("spark.datasource.flint.auth", "sigv4")
spark.conf.set("spark.datasource.flint.region", "us-west-2")
spark.conf.set("spark.datasource.flint.refresh_policy", "wait_for")

val df = spark.range(15).toDF("aInt")

val re = df.coalesce(1)
.write
.format("flint")
.mode("overwrite")
.save("t001")

val df = new SQLContext(sc).read
.format("flint")
.options(aos)
.schema("aInt int")
.load("t001")

```

## Benchmarks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class FlintPartitionReader(reader: FlintReader, schema: StructType, options: Fli

lazy val parser = new FlintJacksonParser(
schema,
new JSONOptionsInRead(CaseInsensitiveMap(DATE_FORMAT_PARAMETERS), options.timeZone(), ""),
new JSONOptionsInRead(CaseInsensitiveMap(DATE_FORMAT_PARAMETERS), options.timeZone, ""),
allowArrayAsStructs = true)
lazy val stringParser: (JsonFactory, String) => JsonParser =
CreateJacksonParser.string(_: JsonFactory, _: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ case class FlintPartitionWriter(
with Logging {

private lazy val jsonOptions = {
new JSONOptions(CaseInsensitiveMap(DATE_FORMAT_PARAMETERS), options.timeZone(), "")
new JSONOptions(CaseInsensitiveMap(DATE_FORMAT_PARAMETERS), options.timeZone, "")
}
private lazy val gen = FlintJacksonGenerator(dataSchema, flintWriter, jsonOptions)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.apache.spark.sql.flint.config

import org.apache.spark.internal.config.ConfigReader
import org.apache.spark.sql.internal.SQLConf

/**
* Similar to SPARK ConfigEntry. ConfigEntry register the configuration which can not been
Expand All @@ -15,44 +16,87 @@ private case class FlintConfig(key: String) {

private var doc = ""

private var dataSourcePrefix: Option[String] = None

val DATASOURCE_FLINT_PREFIX = "spark.datasource.flint."

def doc(s: String): FlintConfig = {
doc = s
this
}

/**
* if the configuration is datasource option also. which means user could define it using
* dataframe.option() interface. for example, sql.read.format("flint").options(Map("host" ->
* "localhost"))
* @return
*/
def datasourceOption(): FlintConfig = {
dataSourcePrefix = Some(DATASOURCE_FLINT_PREFIX)
this
}

def createWithDefault(defaultValue: String): FlintConfigEntry[String] = {
new FlintConfigEntryWithDefault(key, defaultValue, doc)
new FlintConfigEntryWithDefault(key, defaultValue, doc, dataSourcePrefix)
}

def createOptional(): FlintConfigEntry[Option[String]] = {
new FlintOptionalConfigEntry(key, doc)
new FlintOptionalConfigEntry(key, doc, dataSourcePrefix)
}
}

abstract class FlintConfigEntry[T](val key: String, val doc: String) {
protected def readString(reader: ConfigReader): Option[String] = {
reader.get(key)
}
abstract class FlintConfigEntry[T](val key: String, val doc: String, val prefix: Option[String]) {

protected val DATASOURCE_FLINT_PREFIX = "spark.datasource.flint."

protected def readOptionKeyString(reader: ConfigReader): Option[String] = reader.get(optionKey)

/**
* Get configuration from {@link ConfigReader}
*/
def readFrom(reader: ConfigReader): T

def defaultValue: Option[String] = None
/**
* Get configuration defined by key from {@link SQLConf}.
*/
protected def readFromConf(): Option[String] = {
if (SQLConf.get.contains(key)) {
Some(SQLConf.get.getConfString(key))
} else {
None
}
}

def defaultValue: Option[T] = None

/**
* DataSource option key. for example, the key = spark.datasource.flint.host, prefix = spark
* .datasource.flint. the optionKey is host.
*/
def optionKey: String = prefix.map(key.stripPrefix(_)).getOrElse(key)
}

private class FlintConfigEntryWithDefault(key: String, defaultValue: String, doc: String)
extends FlintConfigEntry[String](key, doc) {
private class FlintConfigEntryWithDefault(
key: String,
defaultValue: String,
doc: String,
prefix: Option[String])
extends FlintConfigEntry[String](key, doc, prefix) {

override def defaultValue: Option[String] = Some(defaultValue)

def readFrom(reader: ConfigReader): String = {
readString(reader).getOrElse(defaultValue)
override def readFrom(reader: ConfigReader): String = {
readOptionKeyString(reader)
.orElse(readFromConf())
.getOrElse(defaultValue)
}
}

private class FlintOptionalConfigEntry(key: String, doc: String)
extends FlintConfigEntry[Option[String]](key, doc) {
private class FlintOptionalConfigEntry(key: String, doc: String, prefix: Option[String])
extends FlintConfigEntry[Option[String]](key, doc, prefix) {

def readFrom(reader: ConfigReader): Option[String] = {
readString(reader)
override def readFrom(reader: ConfigReader): Option[String] = {
readOptionKeyString(reader)
.orElse(readFromConf())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

package org.apache.spark.sql.flint.config

import java.util
import java.util.{Map => JMap, NoSuchElementException}

import scala.collection.JavaConverters._

import org.opensearch.flint.core.FlintOptions

import org.apache.spark.internal.config.ConfigReader
import org.apache.spark.sql.RuntimeConfig
import org.apache.spark.sql.flint.config.FlintSparkConf._
import org.apache.spark.sql.internal.SQLConf

Expand All @@ -27,57 +27,60 @@ import org.apache.spark.sql.internal.SQLConf
*/
object FlintSparkConf {

val PREFIX = "spark.datasource.flint."

def apply(conf: JMap[String, String]): FlintSparkConf = new FlintSparkConf(conf)
val DATASOURCE_FLINT_PREFIX = "spark.datasource.flint."

/**
* Helper class, create {@link FlintOptions} from spark conf.
* Create FlintSparkConf from Datasource options. if no options provided, FlintSparkConf will
* read configuraiton from SQLConf.
*/
def apply(sparkConf: RuntimeConfig): FlintOptions = new FlintOptions(
Seq(HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY, SCROLL_SIZE, SCHEME, AUTH, REGION)
.map(conf => (conf.key, sparkConf.get(PREFIX + conf.key, conf.defaultValue.get)))
.toMap
.asJava)

def sparkConf(key: String): String = PREFIX + key
def apply(options: JMap[String, String] = new util.HashMap[String, String]()): FlintSparkConf =
new FlintSparkConf(options)

val HOST_ENDPOINT = FlintConfig("host")
val HOST_ENDPOINT = FlintConfig("spark.datasource.flint.host")
.datasourceOption()
.createWithDefault("localhost")

val HOST_PORT = FlintConfig("port")
val HOST_PORT = FlintConfig("spark.datasource.flint.port")
.datasourceOption()
.createWithDefault("9200")

val SCHEME = FlintConfig("scheme")
val SCHEME = FlintConfig("spark.datasource.flint.scheme")
.datasourceOption()
.doc("http or https")
.createWithDefault("http")

val AUTH = FlintConfig("auth")
val AUTH = FlintConfig("spark.datasource.flint.auth")
.datasourceOption()
.doc("authentication type. supported value: NONE_AUTH(false), SIGV4_AUTH(sigv4)")
.createWithDefault(FlintOptions.NONE_AUTH)

val REGION = FlintConfig("region")
val REGION = FlintConfig("spark.datasource.flint.region")
.datasourceOption()
.doc("AWS service region")
.createWithDefault(FlintOptions.DEFAULT_REGION)

val DOC_ID_COLUMN_NAME = FlintConfig("write.id_name")
val DOC_ID_COLUMN_NAME = FlintConfig("spark.datasource.flint.write.id_name")
.datasourceOption()
.doc(
"spark write task use spark.flint.write.id.name defined column as doc id when write to " +
"flint. if not provided, use system generated random id")
.createOptional()

val BATCH_SIZE = FlintConfig("write.batch_size")
val BATCH_SIZE = FlintConfig("spark.datasource.flint.write.batch_size")
.datasourceOption()
.doc(
"The number of documents written to Flint in a single batch request is determined by the " +
"overall size of the HTTP request, which should not exceed 100MB. The actual number of " +
"documents will vary depending on the individual size of each document.")
.createWithDefault("1000")

val REFRESH_POLICY = FlintConfig("write.refresh_policy")
val REFRESH_POLICY = FlintConfig("spark.datasource.flint.write.refresh_policy")
.datasourceOption()
.doc("refresh_policy, possible value are NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)")
.createWithDefault("false")

val SCROLL_SIZE = FlintConfig("read.scroll_size")
val SCROLL_SIZE = FlintConfig("spark.datasource.flint.read.scroll_size")
.datasourceOption()
.doc("scroll read size")
.createWithDefault("100")

Expand All @@ -86,6 +89,9 @@ object FlintSparkConf {
.createWithDefault("true")
}

/**
* if no options provided, FlintSparkConf read configuration from SQLConf.
*/
class FlintSparkConf(properties: JMap[String, String]) extends Serializable {

@transient lazy val reader = new ConfigReader(properties)
Expand All @@ -94,22 +100,25 @@ class FlintSparkConf(properties: JMap[String, String]) extends Serializable {

def docIdColumnName(): Option[String] = DOC_ID_COLUMN_NAME.readFrom(reader)

def timeZone(): String = SQLConf.get.sessionLocalTimeZone

def tableName(): String = {
if (properties.containsKey("path")) properties.get("path")
else throw new NoSuchElementException("index or path not found")
}

def isOptimizerEnabled: Boolean = OPTIMIZER_RULE_ENABLED.readFrom(reader).toBoolean

/**
* spark.sql.session.timeZone
*/
def timeZone: String = SQLConf.get.sessionLocalTimeZone

/**
* Helper class, create {@link FlintOptions}.
*/
def flintOptions(): FlintOptions = {
new FlintOptions(
Seq(HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY, SCROLL_SIZE, SCHEME, AUTH, REGION)
.map(conf => (conf.key, conf.readFrom(reader)))
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.toMap
.asJava)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.streaming.OutputMode.Append
class FlintSpark(val spark: SparkSession) {

/** Flint client for low-level index operation */
private val flintClient: FlintClient = FlintClientBuilder.build(FlintSparkConf(spark.conf))
private val flintClient: FlintClient = FlintClientBuilder.build(FlintSparkConf().flintOptions())

/** Required by json4s parse function */
implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer
Expand Down Expand Up @@ -185,9 +185,8 @@ class FlintSpark(val spark: SparkSession) {
object FlintSpark {

/**
* Index refresh mode:
* FULL: refresh on current source data in batch style at one shot
* INCREMENTAL: auto refresh on new data in continuous streaming style
* Index refresh mode: FULL: refresh on current source data in batch style at one shot
* INCREMENTAL: auto refresh on new data in continuous streaming style
*/
object RefreshMode extends Enumeration {
type RefreshMode = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.opensearch.flint.spark

import scala.collection.JavaConverters._

import org.opensearch.flint.spark.skipping.ApplyFlintSparkSkippingIndex

import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -36,7 +34,6 @@ class FlintSparkOptimizer(spark: SparkSession) extends Rule[LogicalPlan] {
}

private def isOptimizerEnabled: Boolean = {
val flintConf = new FlintSparkConf(spark.conf.getAll.asJava)
flintConf.isOptimizerEnabled
FlintSparkConf().isOptimizerEnabled
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.opensearch.flint.spark.FlintSparkExtensions

import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.flint.config.{FlintConfigEntry, FlintSparkConf}
import org.apache.spark.sql.flint.config.FlintConfigEntry
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession

Expand All @@ -32,6 +32,6 @@ trait FlintSuite extends SharedSparkSession {
* Set Flint Spark configuration. (Generic "value: T" has problem with FlintConfigEntry[Any])
*/
protected def setFlintSparkConf[T](config: FlintConfigEntry[T], value: Any): Unit = {
spark.conf.set(FlintSparkConf.sparkConf(config.key), value.toString)
spark.conf.set(config.key, value.toString)
}
}
Loading

0 comments on commit 255f040

Please sign in to comment.