Skip to content

Commit

Permalink
Fixed tests for Spark 2.x
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Kudinkin committed Mar 4, 2022
1 parent 62edf7d commit e3902bf
Showing 1 changed file with 79 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.util.counters.BenchmarkCounter
import org.apache.spark.HoodieUnsafeRDDUtils
import org.apache.spark.sql.{Dataset, Row, SaveMode}
import org.apache.spark.sql.catalyst.InternalRow
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
import org.junit.jupiter.api.{Tag, Test}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -62,11 +62,19 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness {

// Stats for the reads fetching only _projected_ columns (note how amount of bytes read
// increases along w/ the # of columns)
val projectedColumnsReadStats: Array[(String, Long)] = Array(
("rider", 2452),
("rider,driver", 2552),
("rider,driver,tip_history", 3517)
)
val projectedColumnsReadStats: Array[(String, Long)] =
if (HoodieSparkUtils.isSpark3)
Array(
("rider", 2452),
("rider,driver", 2552),
("rider,driver,tip_history", 3517))
else if (HoodieSparkUtils.isSpark2)
Array(
("rider", 2595),
("rider,driver", 2735),
("rider,driver,tip_history", 3750))
else
fail("Only Spark 3 and Spark 2 are currently supported")

// Test COW / Snapshot
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, "", projectedColumnsReadStats)
Expand All @@ -83,19 +91,35 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness {

// Stats for the reads fetching only _projected_ columns (note how amount of bytes read
// increases along w/ the # of columns)
val projectedColumnsReadStats: Array[(String, Long)] = Array(
("rider", 2452),
("rider,driver", 2552),
("rider,driver,tip_history", 3517)
)
val projectedColumnsReadStats: Array[(String, Long)] =
if (HoodieSparkUtils.isSpark3)
Array(
("rider", 2452),
("rider,driver", 2552),
("rider,driver,tip_history", 3517))
else if (HoodieSparkUtils.isSpark2)
Array(
("rider", 2595),
("rider,driver", 2735),
("rider,driver,tip_history", 3750))
else
fail("Only Spark 3 and Spark 2 are currently supported")

// Stats for the reads fetching _all_ columns (note, how amount of bytes read
// is invariant of the # of columns)
val fullColumnsReadStats: Array[(String, Long)] = Array(
("rider", 14665),
("rider,driver", 14665),
("rider,driver,tip_history", 14665)
)
val fullColumnsReadStats: Array[(String, Long)] =
if (HoodieSparkUtils.isSpark3)
Array(
("rider", 14665),
("rider,driver", 14665),
("rider,driver,tip_history", 14665))
else if (HoodieSparkUtils.isSpark2)
Array(
("rider", 15336),
("rider,driver", 15336),
("rider,driver,tip_history", 15336))
else
fail("Only Spark 3 and Spark 2 are currently supported")

// Test MOR / Snapshot / Skip-merge
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats)
Expand All @@ -122,11 +146,19 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness {

// Stats for the reads fetching only _projected_ columns (note how amount of bytes read
// increases along w/ the # of columns)
val projectedColumnsReadStats: Array[(String, Long)] = Array(
("rider", 2452),
("rider,driver", 2552),
("rider,driver,tip_history", 3517)
)
val projectedColumnsReadStats: Array[(String, Long)] =
if (HoodieSparkUtils.isSpark3)
Array(
("rider", 2452),
("rider,driver", 2552),
("rider,driver,tip_history", 3517))
else if (HoodieSparkUtils.isSpark2)
Array(
("rider", 2595),
("rider,driver", 2735),
("rider,driver,tip_history", 3750))
else
fail("Only Spark 3 and Spark 2 are currently supported")

// Test MOR / Snapshot / Skip-merge
runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats)
Expand All @@ -149,19 +181,35 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness {

// Stats for the reads fetching only _projected_ columns (note how amount of bytes read
// increases along w/ the # of columns)
val projectedColumnsReadStats: Array[(String, Long)] = Array(
("rider", 2560),
("rider,driver", 2660),
("rider,driver,tip_history", 3625)
)
val projectedColumnsReadStats: Array[(String, Long)] =
if (HoodieSparkUtils.isSpark3)
Array(
("rider", 2560),
("rider,driver", 2660),
("rider,driver,tip_history", 3625))
else if (HoodieSparkUtils.isSpark2)
Array(
("rider", 2775),
("rider,driver", 2915),
("rider,driver,tip_history", 3930))
else
fail("Only Spark 3 and Spark 2 are currently supported")

// Stats for the reads fetching _all_ columns (note, how amount of bytes read
// is invariant of the # of columns)
val fullColumnsReadStats: Array[(String, Long)] = Array(
("rider", 14667),
("rider,driver", 14667),
("rider,driver,tip_history", 14667)
)
val fullColumnsReadStats: Array[(String, Long)] =
if (HoodieSparkUtils.isSpark3)
Array(
("rider", 14667),
("rider,driver", 14667),
("rider,driver,tip_history", 14667))
else if (HoodieSparkUtils.isSpark2)
Array(
("rider", 15338),
("rider,driver", 15338),
("rider,driver,tip_history", 15338))
else
fail("Only Spark 3 and Spark 2 are currently supported")

val incrementalOpts: Map[String, String] = Map(
DataSourceReadOptions.BEGIN_INSTANTTIME.key -> "001"
Expand Down

0 comments on commit e3902bf

Please sign in to comment.