Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance flint spark configuration #1727

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions flint/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,18 +269,26 @@ trait FlintSparkSkippingStrategy {
Here is an example for read index data from AWS OpenSearch domain.

```scala
val aos = Map(
"host" -> "yourdomain.us-west-2.es.amazonaws.com",
"port" -> "-1",
"scheme" -> "https",
"auth" -> "sigv4",
"region" -> "us-west-2")

spark.conf.set("spark.datasource.flint.host", "yourdomain.us-west-2.es.amazonaws.com")
spark.conf.set("spark.datasource.flint.port", "-1")
spark.conf.set("spark.datasource.flint.scheme", "https")
spark.conf.set("spark.datasource.flint.auth", "sigv4")
spark.conf.set("spark.datasource.flint.region", "us-west-2")
spark.conf.set("spark.datasource.flint.refresh_policy", "wait_for")

val df = spark.range(15).toDF("aInt")

val re = df.coalesce(1)
.write
.format("flint")
.mode("overwrite")
.save("t001")

val df = new SQLContext(sc).read
.format("flint")
.options(aos)
.schema("aInt int")
.load("t001")

```

## Benchmarks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class FlintPartitionReader(reader: FlintReader, schema: StructType, options: Fli

lazy val parser = new FlintJacksonParser(
schema,
new JSONOptionsInRead(CaseInsensitiveMap(DATE_FORMAT_PARAMETERS), options.timeZone(), ""),
new JSONOptionsInRead(CaseInsensitiveMap(DATE_FORMAT_PARAMETERS), options.timeZone, ""),
allowArrayAsStructs = true)
lazy val stringParser: (JsonFactory, String) => JsonParser =
CreateJacksonParser.string(_: JsonFactory, _: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ case class FlintPartitionWriter(
with Logging {

private lazy val jsonOptions = {
new JSONOptions(CaseInsensitiveMap(DATE_FORMAT_PARAMETERS), options.timeZone(), "")
new JSONOptions(CaseInsensitiveMap(DATE_FORMAT_PARAMETERS), options.timeZone, "")
}
private lazy val gen = FlintJacksonGenerator(dataSchema, flintWriter, jsonOptions)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.apache.spark.sql.flint.config

import org.apache.spark.internal.config.ConfigReader
import org.apache.spark.sql.internal.SQLConf

/**
* Similar to SPARK ConfigEntry. ConfigEntry register the configuration which can not been
Expand All @@ -15,44 +16,87 @@ private case class FlintConfig(key: String) {

private var doc = ""

private var dataSourcePrefix: Option[String] = None

val DATASOURCE_FLINT_PREFIX = "spark.datasource.flint."

def doc(s: String): FlintConfig = {
doc = s
this
}

/**
* if the configuration is datasource option also. which means user could define it using
* dataframe.option() interface. for example, sql.read.format("flint").options(Map("host" ->
* "localhost"))
* @return
*/
def datasourceOption(): FlintConfig = {
dataSourcePrefix = Some(DATASOURCE_FLINT_PREFIX)
this
}

def createWithDefault(defaultValue: String): FlintConfigEntry[String] = {
new FlintConfigEntryWithDefault(key, defaultValue, doc)
new FlintConfigEntryWithDefault(key, defaultValue, doc, dataSourcePrefix)
}

def createOptional(): FlintConfigEntry[Option[String]] = {
new FlintOptionalConfigEntry(key, doc)
new FlintOptionalConfigEntry(key, doc, dataSourcePrefix)
}
}

abstract class FlintConfigEntry[T](val key: String, val doc: String) {
protected def readString(reader: ConfigReader): Option[String] = {
reader.get(key)
}
abstract class FlintConfigEntry[T](val key: String, val doc: String, val prefix: Option[String]) {

protected val DATASOURCE_FLINT_PREFIX = "spark.datasource.flint."

protected def readOptionKeyString(reader: ConfigReader): Option[String] = reader.get(optionKey)

/**
* Get configuration from {@link ConfigReader}
*/
def readFrom(reader: ConfigReader): T

def defaultValue: Option[String] = None
/**
* Get configuration defined by key from {@link SQLConf}.
*/
protected def readFromConf(): Option[String] = {
if (SQLConf.get.contains(key)) {
Some(SQLConf.get.getConfString(key))
} else {
None
}
}

def defaultValue: Option[T] = None

/**
* DataSource option key. for example, the key = spark.datasource.flint.host, prefix = spark
* .datasource.flint. the optionKey is host.
*/
def optionKey: String = prefix.map(key.stripPrefix(_)).getOrElse(key)
}

private class FlintConfigEntryWithDefault(key: String, defaultValue: String, doc: String)
extends FlintConfigEntry[String](key, doc) {
private class FlintConfigEntryWithDefault(
key: String,
defaultValue: String,
doc: String,
prefix: Option[String])
extends FlintConfigEntry[String](key, doc, prefix) {

override def defaultValue: Option[String] = Some(defaultValue)

def readFrom(reader: ConfigReader): String = {
readString(reader).getOrElse(defaultValue)
override def readFrom(reader: ConfigReader): String = {
readOptionKeyString(reader)
.orElse(readFromConf())
.getOrElse(defaultValue)
}
}

private class FlintOptionalConfigEntry(key: String, doc: String)
extends FlintConfigEntry[Option[String]](key, doc) {
private class FlintOptionalConfigEntry(key: String, doc: String, prefix: Option[String])
extends FlintConfigEntry[Option[String]](key, doc, prefix) {

def readFrom(reader: ConfigReader): Option[String] = {
readString(reader)
override def readFrom(reader: ConfigReader): Option[String] = {
readOptionKeyString(reader)
.orElse(readFromConf())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

package org.apache.spark.sql.flint.config

import java.util
import java.util.{Map => JMap, NoSuchElementException}

import scala.collection.JavaConverters._

import org.opensearch.flint.core.FlintOptions

import org.apache.spark.internal.config.ConfigReader
import org.apache.spark.sql.RuntimeConfig
import org.apache.spark.sql.flint.config.FlintSparkConf._
import org.apache.spark.sql.internal.SQLConf

Expand All @@ -27,57 +27,60 @@ import org.apache.spark.sql.internal.SQLConf
*/
object FlintSparkConf {

val PREFIX = "spark.datasource.flint."

def apply(conf: JMap[String, String]): FlintSparkConf = new FlintSparkConf(conf)
val DATASOURCE_FLINT_PREFIX = "spark.datasource.flint."

/**
* Helper class, create {@link FlintOptions} from spark conf.
* Create FlintSparkConf from Datasource options. if no options provided, FlintSparkConf will
* read configuraiton from SQLConf.
*/
def apply(sparkConf: RuntimeConfig): FlintOptions = new FlintOptions(
Seq(HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY, SCROLL_SIZE, SCHEME, AUTH, REGION)
.map(conf => (conf.key, sparkConf.get(PREFIX + conf.key, conf.defaultValue.get)))
.toMap
.asJava)

def sparkConf(key: String): String = PREFIX + key
def apply(options: JMap[String, String] = new util.HashMap[String, String]()): FlintSparkConf =
new FlintSparkConf(options)

val HOST_ENDPOINT = FlintConfig("host")
val HOST_ENDPOINT = FlintConfig("spark.datasource.flint.host")
.datasourceOption()
.createWithDefault("localhost")

val HOST_PORT = FlintConfig("port")
val HOST_PORT = FlintConfig("spark.datasource.flint.port")
.datasourceOption()
.createWithDefault("9200")

val SCHEME = FlintConfig("scheme")
val SCHEME = FlintConfig("spark.datasource.flint.scheme")
.datasourceOption()
.doc("http or https")
.createWithDefault("http")

val AUTH = FlintConfig("auth")
val AUTH = FlintConfig("spark.datasource.flint.auth")
.datasourceOption()
.doc("authentication type. supported value: NONE_AUTH(false), SIGV4_AUTH(sigv4)")
.createWithDefault(FlintOptions.NONE_AUTH)

val REGION = FlintConfig("region")
val REGION = FlintConfig("spark.datasource.flint.region")
.datasourceOption()
.doc("AWS service region")
.createWithDefault(FlintOptions.DEFAULT_REGION)

val DOC_ID_COLUMN_NAME = FlintConfig("write.id_name")
val DOC_ID_COLUMN_NAME = FlintConfig("spark.datasource.flint.write.id_name")
.datasourceOption()
.doc(
"spark write task use spark.flint.write.id.name defined column as doc id when write to " +
"flint. if not provided, use system generated random id")
.createOptional()

val BATCH_SIZE = FlintConfig("write.batch_size")
val BATCH_SIZE = FlintConfig("spark.datasource.flint.write.batch_size")
.datasourceOption()
.doc(
"The number of documents written to Flint in a single batch request is determined by the " +
"overall size of the HTTP request, which should not exceed 100MB. The actual number of " +
"documents will vary depending on the individual size of each document.")
.createWithDefault("1000")

val REFRESH_POLICY = FlintConfig("write.refresh_policy")
val REFRESH_POLICY = FlintConfig("spark.datasource.flint.write.refresh_policy")
.datasourceOption()
.doc("refresh_policy, possible value are NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)")
.createWithDefault("false")

val SCROLL_SIZE = FlintConfig("read.scroll_size")
val SCROLL_SIZE = FlintConfig("spark.datasource.flint.read.scroll_size")
.datasourceOption()
.doc("scroll read size")
.createWithDefault("100")

Expand All @@ -86,6 +89,9 @@ object FlintSparkConf {
.createWithDefault("true")
}

/**
* if no options provided, FlintSparkConf read configuration from SQLConf.
*/
class FlintSparkConf(properties: JMap[String, String]) extends Serializable {

@transient lazy val reader = new ConfigReader(properties)
Expand All @@ -94,22 +100,25 @@ class FlintSparkConf(properties: JMap[String, String]) extends Serializable {

def docIdColumnName(): Option[String] = DOC_ID_COLUMN_NAME.readFrom(reader)

def timeZone(): String = SQLConf.get.sessionLocalTimeZone

def tableName(): String = {
if (properties.containsKey("path")) properties.get("path")
else throw new NoSuchElementException("index or path not found")
}

def isOptimizerEnabled: Boolean = OPTIMIZER_RULE_ENABLED.readFrom(reader).toBoolean

/**
* spark.sql.session.timeZone
*/
def timeZone: String = SQLConf.get.sessionLocalTimeZone

/**
* Helper class, create {@link FlintOptions}.
*/
def flintOptions(): FlintOptions = {
new FlintOptions(
Seq(HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY, SCROLL_SIZE, SCHEME, AUTH, REGION)
.map(conf => (conf.key, conf.readFrom(reader)))
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.toMap
.asJava)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.streaming.OutputMode.Append
class FlintSpark(val spark: SparkSession) {

/** Flint client for low-level index operation */
private val flintClient: FlintClient = FlintClientBuilder.build(FlintSparkConf(spark.conf))
private val flintClient: FlintClient = FlintClientBuilder.build(FlintSparkConf().flintOptions())

/** Required by json4s parse function */
implicit val formats: Formats = Serialization.formats(NoTypeHints)
Expand Down Expand Up @@ -182,9 +182,8 @@ class FlintSpark(val spark: SparkSession) {
object FlintSpark {

/**
* Index refresh mode:
* FULL: refresh on current source data in batch style at one shot
* INCREMENTAL: auto refresh on new data in continuous streaming style
* Index refresh mode: FULL: refresh on current source data in batch style at one shot
* INCREMENTAL: auto refresh on new data in continuous streaming style
*/
object RefreshMode extends Enumeration {
type RefreshMode = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.opensearch.flint.spark

import scala.collection.JavaConverters._

import org.opensearch.flint.spark.skipping.ApplyFlintSparkSkippingIndex

import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -36,7 +34,6 @@ class FlintSparkOptimizer(spark: SparkSession) extends Rule[LogicalPlan] {
}

private def isOptimizerEnabled: Boolean = {
val flintConf = new FlintSparkConf(spark.conf.getAll.asJava)
flintConf.isOptimizerEnabled
FlintSparkConf().isOptimizerEnabled
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.opensearch.flint.spark.FlintSparkExtensions

import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.flint.config.{FlintConfigEntry, FlintSparkConf}
import org.apache.spark.sql.flint.config.FlintConfigEntry
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession

Expand All @@ -32,6 +32,6 @@ trait FlintSuite extends SharedSparkSession {
* Set Flint Spark configuration. (Generic "value: T" has problem with FlintConfigEntry[Any])
*/
protected def setFlintSparkConf[T](config: FlintConfigEntry[T], value: Any): Unit = {
spark.conf.set(FlintSparkConf.sparkConf(config.key), value.toString)
spark.conf.set(config.key, value.toString)
}
}
Loading