Skip to content

Commit

Permalink
Disable Spark DS tests w/ Orc for Spark 3.x
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Kudinkin committed Aug 3, 2022
1 parent ddfe97d commit 3f0c39b
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -471,10 +471,12 @@ private Writer createLogWriter(Option<FileSlice> fileSlice, String baseCommitTim

return HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId).overBaseCommit(baseCommitTime)
.withFileId(fileId)
.overBaseCommit(baseCommitTime)
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
.withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
.withSizeThreshold(config.getLogFileMaxSize()).withFs(fs)
.withSizeThreshold(config.getLogFileMaxSize())
.withFs(fs)
.withRolloverLogWriteToken(writeToken)
.withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private[hudi] trait SparkVersionsSupport {
def isSpark3_2: Boolean = getSparkVersion.startsWith("3.2")
def isSpark3_3: Boolean = getSparkVersion.startsWith("3.3")

def gteqSpark3_0: Boolean = getSparkVersion >= "3.0"
def gteqSpark3_1: Boolean = getSparkVersion >= "3.1"
def gteqSpark3_1_3: Boolean = getSparkVersion >= "3.1.3"
def gteqSpark3_2: Boolean = getSparkVersion >= "3.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.time.Instant
import java.util.{Collections, Date, UUID}
import org.apache.commons.io.FileUtils
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils.gteqSpark3_0
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
Expand All @@ -41,7 +42,8 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
import org.junit.jupiter.params.provider.Arguments.arguments
import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, MethodSource, ValueSource}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.Assertions.assertThrows
Expand Down Expand Up @@ -485,11 +487,8 @@ class TestHoodieSparkSqlWriter {
* @param populateMetaFields Flag for populating meta fields
*/
@ParameterizedTest
@CsvSource(
Array("COPY_ON_WRITE,parquet,true", "COPY_ON_WRITE,parquet,false", "MERGE_ON_READ,parquet,true", "MERGE_ON_READ,parquet,false",
"COPY_ON_WRITE,orc,true", "COPY_ON_WRITE,orc,false", "MERGE_ON_READ,orc,true", "MERGE_ON_READ,orc,false"
))
def testDatasourceInsertForTableTypeBaseFileMetaFields(tableType: String, baseFileFormat: String, populateMetaFields: Boolean): Unit = {
@MethodSource(Array("testDatasourceInsert"))
def testDatasourceInsertForTableTypeBaseFileMetaFields(tableType: String, populateMetaFields: Boolean, baseFileFormat: String): Unit = {
val hoodieFooTableName = "hoodie_foo_tbl"
val fooTableModifier = Map("path" -> tempBasePath,
HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
Expand Down Expand Up @@ -1069,3 +1068,26 @@ class TestHoodieSparkSqlWriter {
assertTrue(kg2 == classOf[SimpleKeyGenerator].getName)
}
}

object TestHoodieSparkSqlWriter {
def testDatasourceInsert: java.util.stream.Stream[Arguments] = {
val scenarios = Array(
Seq("COPY_ON_WRITE", true),
Seq("COPY_ON_WRITE", false),
Seq("MERGE_ON_READ", true),
Seq("MERGE_ON_READ", false)
)

val parquetScenarios = scenarios.map { _ :+ "parquet" }
val orcScenarios = scenarios.map { _ :+ "orc" }

// TODO(HUDI-4496) Fix Orc support in Spark 3.x
val targetScenarios = if (gteqSpark3_0) {
parquetScenarios
} else {
parquetScenarios ++ orcScenarios
}

java.util.Arrays.stream(targetScenarios.map(as => arguments(as.map(_.asInstanceOf[AnyRef]):_*)))
}
}

0 comments on commit 3f0c39b

Please sign in to comment.