From b9eb0ea042584dc03701e7e3ec67bc257fb832ee Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 11 Jul 2023 14:31:31 -0700 Subject: [PATCH] Generate doc ID in build index job for idempotency (#1803) * Add doc id to DF by Spark built-in sha1 function Signed-off-by: Chen Dai * Add UT Signed-off-by: Chen Dai * Fix broken IT Signed-off-by: Chen Dai * Change id column name to avoid conflict with user data Signed-off-by: Chen Dai --------- Signed-off-by: Chen Dai --- flint/build.sbt | 3 +- .../opensearch/flint/spark/FlintSpark.scala | 22 +++++++--- .../flint/spark/FlintSparkIndex.scala | 8 ++++ .../skipping/FlintSparkSkippingIndex.scala | 6 ++- .../FlintSparkSkippingIndexSuite.scala | 42 +++++++++++++++++++ ...a => FlintSparkSkippingIndexITSuite.scala} | 4 +- 6 files changed, 76 insertions(+), 9 deletions(-) create mode 100644 flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala rename flint/integ-test/src/test/scala/org/opensearch/flint/spark/{FlintSparkSkippingIndexSuite.scala => FlintSparkSkippingIndexITSuite.scala} (99%) diff --git a/flint/build.sbt b/flint/build.sbt index e0f5ffa185..6a8af2cd22 100644 --- a/flint/build.sbt +++ b/flint/build.sbt @@ -50,7 +50,8 @@ lazy val flintCore = (project in file("flint-core")) scalaVersion := scala212, libraryDependencies ++= Seq( "org.opensearch.client" % "opensearch-rest-client" % opensearchVersion, - "org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion, + "org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion + exclude("org.apache.logging.log4j", "log4j-api"), "com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind") )) diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 7ba05290db..635cbae50a 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.spark +import scala.collection.JavaConverters._ + import org.json4s.{Formats, JArray, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization @@ -12,6 +14,7 @@ import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.FlintSpark._ import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode} +import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.skipping.{FlintSparkSkippingIndex, FlintSparkSkippingStrategy} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.{SkippingKind, SkippingKindSerializer} @@ -25,6 +28,7 @@ import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.catalog.Column import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE import org.apache.spark.sql.flint.config.FlintSparkConf +import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN} import org.apache.spark.sql.streaming.OutputMode.Append /** @@ -32,8 +36,16 @@ import org.apache.spark.sql.streaming.OutputMode.Append */ class FlintSpark(val spark: SparkSession) { + /** Flint spark configuration */ + private val flintSparkConf: FlintSparkConf = + FlintSparkConf( + Map( + DOC_ID_COLUMN_NAME.key -> ID_COLUMN, + IGNORE_DOC_ID_COLUMN.key -> "true" + ).asJava) + /** Flint client for low-level index operation */ - private val flintClient: FlintClient = FlintClientBuilder.build(FlintSparkConf().flintOptions()) + private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions()) /** Required by json4s parse function */ implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer @@ -254,8 +266,7 @@ object FlintSpark { require(tableName.nonEmpty, "table name cannot be empty") val col = findColumn(colName) - addIndexedColumn( - ValueSetSkippingStrategy(columnName = col.name, columnType = col.dataType)) + addIndexedColumn(ValueSetSkippingStrategy(columnName = col.name, columnType = col.dataType)) this } @@ -269,9 +280,8 @@ object FlintSpark { */ def addMinMax(colName: String): IndexBuilder = { val col = findColumn(colName) - indexedColumns = indexedColumns :+ MinMaxSkippingStrategy( - columnName = col.name, - columnType = col.dataType) + indexedColumns = + indexedColumns :+ MinMaxSkippingStrategy(columnName = col.name, columnType = col.dataType) this } diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index af0f38d787..bbfa4c4bae 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -42,3 +42,11 @@ trait FlintSparkIndex { */ def build(df: DataFrame): DataFrame } + +object FlintSparkIndex { + + /** + * ID column name. + */ + val ID_COLUMN: String = "__id__" +} diff --git a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index b1dc4ce550..829a143555 100644 --- a/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -10,13 +10,14 @@ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.FlintSparkIndex +import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions.DslExpression import org.apache.spark.sql.flint.datatype.FlintDataType -import org.apache.spark.sql.functions.input_file_name +import org.apache.spark.sql.functions.{col, input_file_name, sha1} import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType} /** @@ -30,6 +31,8 @@ class FlintSparkSkippingIndex( val indexedColumns: Seq[FlintSparkSkippingStrategy]) extends FlintSparkIndex { + require(indexedColumns.nonEmpty, "indexed columns must not be empty") + /** Required by json4s write function */ implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer @@ -64,6 +67,7 @@ class FlintSparkSkippingIndex( df.groupBy(input_file_name().as(FILE_PATH_COLUMN)) .agg(namedAggFuncs.head, namedAggFuncs.tail: _*) + .withColumn(ID_COLUMN, sha1(col(FILE_PATH_COLUMN))) } private def getMetaInfo: String = { diff --git a/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala new file mode 100644 index 0000000000..509c11236d --- /dev/null +++ b/flint/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.skipping + +import org.mockito.Mockito.when +import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.FILE_PATH_COLUMN +import org.scalatest.matchers.must.Matchers.contain +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper +import org.scalatestplus.mockito.MockitoSugar.mock + +import org.apache.spark.FlintSuite +import org.apache.spark.sql.catalyst.expressions.aggregate.CollectSet +import org.apache.spark.sql.functions.col + +class FlintSparkSkippingIndexSuite extends FlintSuite { + + test("get skipping index name") { + val index = new FlintSparkSkippingIndex("test", Seq(mock[FlintSparkSkippingStrategy])) + index.name() shouldBe "flint_test_skipping_index" + } + + test("can build index building job with unique ID column") { + val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.outputSchema()).thenReturn(Map("name" -> "string")) + when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("name").expr))) + val index = new FlintSparkSkippingIndex("test", Seq(indexCol)) + + val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age") + val indexDf = index.build(df) + indexDf.schema.fieldNames should contain only ("name", FILE_PATH_COLUMN, ID_COLUMN) + } + + test("should fail if no indexed column given") { + assertThrows[IllegalArgumentException] { + new FlintSparkSkippingIndex("test", Seq.empty) + } + } +} diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala similarity index 99% rename from flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala rename to flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index cc63b55423..068100e814 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf._ import org.apache.spark.sql.functions.col import org.apache.spark.sql.streaming.StreamTest -class FlintSparkSkippingIndexSuite +class FlintSparkSkippingIndexITSuite extends QueryTest with FlintSuite with OpenSearchSuite @@ -205,12 +205,14 @@ class FlintSparkSkippingIndexSuite flint .skippingIndex() .onTable(testTable) + .addPartitions("year") .create() assertThrows[IllegalStateException] { flint .skippingIndex() .onTable(testTable) + .addPartitions("year") .create() } }