Skip to content

Commit

Permalink
ignore id column (#1732)
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo committed Jun 26, 2023
1 parent ae3389d commit 102c8c8
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 31 deletions.
1 change: 1 addition & 0 deletions flint/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.auth`: default is false. valid values [false, sigv4]
- `spark.datasource.flint.region`: default is us-west-2. only been used when auth=sigv4
- `spark.datasource.flint.write.id_name`: no default value.
- `spark.datasource.flint.ignore.id_column` : default value is true.
- `spark.datasource.flint.write.batch_size`: default value is 1000.
- `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false),
IMMEDIATE(true), WAIT_UNTIL(wait_for)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@ case class FlintPartitionWriter(
private lazy val jsonOptions = {
new JSONOptions(CaseInsensitiveMap(DATE_FORMAT_PARAMETERS), options.timeZone, "")
}
private lazy val gen = FlintJacksonGenerator(dataSchema, flintWriter, jsonOptions)
private lazy val gen =
FlintJacksonGenerator(dataSchema, flintWriter, jsonOptions, ignoredFieldName)

private lazy val idOrdinal = options
.docIdColumnName()
.flatMap(filedName => dataSchema.getFieldIndex(filedName))
private lazy val idFieldName = options.docIdColumnName()

private lazy val idOrdinal =
idFieldName.flatMap(filedName => dataSchema.getFieldIndex(filedName))

private lazy val ignoredFieldName: Option[String] =
idFieldName.filter(_ => options.ignoreIdColumn())

/**
* total write doc count.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ object FlintSparkConf {
"flint. if not provided, use system generated random id")
.createOptional()

val IGNORE_DOC_ID_COLUMN = FlintConfig("spark.datasource.flint.ignore.id_column")
.datasourceOption()
.doc("Enable spark write task ignore doc_id column. the default value is ture")
.createWithDefault("true")

val BATCH_SIZE = FlintConfig("spark.datasource.flint.write.batch_size")
.datasourceOption()
.doc(
Expand Down Expand Up @@ -100,6 +105,8 @@ class FlintSparkConf(properties: JMap[String, String]) extends Serializable {

def docIdColumnName(): Option[String] = DOC_ID_COLUMN_NAME.readFrom(reader)

def ignoreIdColumn(): Boolean = IGNORE_DOC_ID_COLUMN.readFrom(reader).toBoolean

def tableName(): String = {
if (properties.containsKey("path")) properties.get("path")
else throw new NoSuchElementException("index or path not found")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ import org.apache.spark.sql.types._

/**
* copy from spark {@link JacksonGenerator}.
* 1. Add ignoredFieldName, the column is ignored when write.
*/
case class FlintJacksonGenerator(dataType: DataType, writer: Writer, options: JSONOptions) {
case class FlintJacksonGenerator(
dataType: DataType,
writer: Writer,
options: JSONOptions,
ignoredFieldName: Option[String] = None) {
// A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate
// JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that
// we can directly access data in `ArrayData` without the help of `SpecificMutableRow`.
Expand Down Expand Up @@ -202,12 +207,14 @@ case class FlintJacksonGenerator(dataType: DataType, writer: Writer, options: JS
var i = 0
while (i < row.numFields) {
val field = schema(i)
if (!row.isNullAt(i)) {
gen.writeFieldName(field.name)
fieldWriters(i).apply(row, i)
} else if (!options.ignoreNullFields) {
gen.writeFieldName(field.name)
gen.writeNull()
if (!ignoredFieldName.contains(field.name)) {
if (!row.isNullAt(i)) {
gen.writeFieldName(field.name)
fieldWriters(i).apply(row, i)
} else if (!options.ignoreNullFields) {
gen.writeFieldName(field.name)
gen.writeNull()
}
}
i += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.functions.{asc, current_date, current_timestamp, to_date, to_timestamp}
import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamingQuery, StreamTest}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -131,9 +132,7 @@ class FlintDataSourceV2ITSuite
| }
| }
|}""".stripMargin
val options =
openSearchOptions + (s"${FlintSparkConf.REFRESH_POLICY.optionKey}" -> "wait_for",
s"${FlintSparkConf.DOC_ID_COLUMN_NAME.optionKey}" -> "aInt")
val options = openSearchOptions + (s"${DOC_ID_COLUMN_NAME.optionKey}" -> "aInt")
Seq(Seq.empty, 1 to 14).foreach(data => {
withIndexName(indexName) {
index(indexName, oneNodeSetting, mappings, Seq.empty)
Expand Down Expand Up @@ -165,11 +164,42 @@ class FlintDataSourceV2ITSuite
})
}

test("write dataframe to flint ignore id column") {
val indexName = "t0002"
val mappings =
"""{
| "properties": {
| "aString": {
| "type": "keyword"
| }
| }
|}""".stripMargin
val options =
openSearchOptions + (s"${DOC_ID_COLUMN_NAME.optionKey}" -> "aInt",
s"${IGNORE_DOC_ID_COLUMN.optionKey}" -> "true")
withIndexName(indexName) {
index(indexName, oneNodeSetting, mappings, Seq.empty)

val df = spark.createDataFrame(Seq((1, "string1"), (2, "string2"))).toDF("aInt", "aString")

df.coalesce(1)
.write
.format("flint")
.options(options)
.mode("overwrite")
.save(indexName)

val dfResult1 = spark.sqlContext.read
.format("flint")
.options(options)
.load(indexName)
checkAnswer(dfResult1, df.drop("aInt"))
}
}

test("write dataframe to flint with batch size configuration") {
val indexName = "t0004"
val options =
openSearchOptions + (s"${FlintSparkConf.REFRESH_POLICY.optionKey}" -> "wait_for",
s"${FlintSparkConf.DOC_ID_COLUMN_NAME.optionKey}" -> "aInt")
val options = openSearchOptions + (s"${DOC_ID_COLUMN_NAME.optionKey}" -> "aInt")
Seq(0, 1).foreach(batchSize => {
withIndexName(indexName) {
val mappings =
Expand Down Expand Up @@ -224,8 +254,7 @@ class FlintDataSourceV2ITSuite
.option("checkpointLocation", checkpointDir)
.format("flint")
.options(openSearchOptions)
.option(s"${FlintSparkConf.REFRESH_POLICY.optionKey}", "wait_for")
.option(s"${FlintSparkConf.DOC_ID_COLUMN_NAME.optionKey}", "aInt")
.option(s"${DOC_ID_COLUMN_NAME.optionKey}", "aInt")
.start(indexName)

inputData.addData(1, 2, 3)
Expand Down Expand Up @@ -288,8 +317,6 @@ class FlintDataSourceV2ITSuite

test("load and save date and timestamp type field") {
val indexName = "t0001"
val options =
openSearchOptions + (s"${FlintSparkConf.REFRESH_POLICY.optionKey}" -> "wait_for")
Seq(
"""{
| "properties": {
Expand Down Expand Up @@ -325,13 +352,13 @@ class FlintDataSourceV2ITSuite
df.coalesce(1)
.write
.format("flint")
.options(options)
.options(openSearchOptions)
.mode("overwrite")
.save(indexName)

val dfResult1 = spark.sqlContext.read
.format("flint")
.options(options)
.options(openSearchOptions)
.load(indexName)
checkAnswer(dfResult1, df)
}
Expand All @@ -340,8 +367,6 @@ class FlintDataSourceV2ITSuite

test("load timestamp field in epoch format") {
val indexName = "t0001"
val options =
openSearchOptions + (s"${FlintSparkConf.REFRESH_POLICY.optionKey}" -> "wait_for")
Seq(
"""{
| "properties": {
Expand All @@ -366,7 +391,7 @@ class FlintDataSourceV2ITSuite

val df = spark.sqlContext.read
.format("flint")
.options(options)
.options(openSearchOptions)
.load(indexName)
checkAnswer(df, Row(Timestamp.valueOf("2014-12-31 16:00:00")))
}
Expand Down Expand Up @@ -418,8 +443,6 @@ class FlintDataSourceV2ITSuite

test("scan with date filter push-down") {
val indexName = "t0001"
val options =
openSearchOptions + (s"${FlintSparkConf.REFRESH_POLICY.optionKey}" -> "wait_for")
withIndexName(indexName) {
val mappings = """{
| "properties": {
Expand Down Expand Up @@ -447,7 +470,7 @@ class FlintDataSourceV2ITSuite
df.coalesce(1)
.write
.format("flint")
.options(options)
.options(openSearchOptions)
.mode("overwrite")
.save(indexName)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import org.opensearch.common.xcontent.XContentType
import org.opensearch.testcontainers.OpenSearchContainer
import org.scalatest.{BeforeAndAfterAll, Suite}

import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT, IGNORE_DOC_ID_COLUMN, REFRESH_POLICY}

/**
* Test required OpenSearch domain should extend OpenSearchSuite.
*/
Expand All @@ -32,7 +34,11 @@ trait OpenSearchSuite extends BeforeAndAfterAll {
RestClient.builder(new HttpHost(openSearchHost, openSearchPort, "http")))

protected lazy val openSearchOptions =
Map("host" -> openSearchHost, "port" -> s"$openSearchPort")
Map(
s"${HOST_ENDPOINT.optionKey}" -> openSearchHost,
s"${HOST_PORT.optionKey}" -> s"$openSearchPort",
s"${REFRESH_POLICY.optionKey}" -> "wait_for",
s"${IGNORE_DOC_ID_COLUMN.optionKey}" -> "false")

override def beforeAll(): Unit = {
container.start()
Expand Down

0 comments on commit 102c8c8

Please sign in to comment.