Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25267][SQL][TEST] Disable ConvertToLocalRelation in the test cases of sql/core and sql/hive #22270

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import java.io.File

import org.scalatest.Suite

import org.apache.spark.SparkContext
import org.apache.spark.{DebugFilesystem, SparkConf, SparkContext}
import org.apache.spark.ml.{PredictionModel, Transformer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.test.TestSparkSession
import org.apache.spark.util.Utils
Expand All @@ -36,6 +37,13 @@ trait MLTest extends StreamTest with TempDirectory { self: Suite =>
@transient var sc: SparkContext = _
@transient var checkpointDir: String = _

protected override def sparkConf = {
new SparkConf()
.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
}

protected override def createSparkSession: TestSparkSession = {
new TestSparkSession(new SparkContext("local[2]", "MLlibUnitTest", sparkConf))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ select a, b, sum(b) from data group by 3;
select a, b, sum(b) + 2 from data group by 3;

-- negative case: nondeterministic expression
select a, rand(0), sum(b) from data group by a, 2;
select a, rand(0), sum(b)
from
(select /*+ REPARTITION(1) */ a, b from data) group by a, 2;

-- negative case: star
select * from data group by a, b, 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ aggregate functions are not allowed in GROUP BY, but found (sum(CAST(data.`b` AS


-- !query 13
select a, rand(0), sum(b) from data group by a, 2
select a, rand(0), sum(b)
from
(select /*+ REPARTITION(1) */ a, b from data) group by a, 2
-- !query 13 schema
struct<a:int,rand(0):double,sum(b):bigint>
-- !query 13 output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {

test("SPARK-18004 limit + aggregates") {
withSQLConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT.key -> "true") {
val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value")
val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value").repartition(1)
val limit2Df = df.limit(2)
checkAnswer(
limit2Df.groupBy("id").count().select($"id"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,16 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
}

val df5 = Seq((Seq("a", null), Seq(1, 2))).toDF("k", "v")
intercept[RuntimeException] {
val msg1 = intercept[Exception] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: #22270 (comment)

Didn't we disable the local relation test? Why don't we catch explicit SparkExection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Yeah... we could have caught SparkException here. My intention was to have this test case pass both when location relation optimization is on and off. Thats why i changed it a a generic exception along with verifying the error text.

df5.select(map_from_arrays($"k", $"v")).collect
}
}.getMessage
assert(msg1.contains("Cannot use null as map key!"))

val df6 = Seq((Seq(1, 2), Seq("a"))).toDF("k", "v")
intercept[RuntimeException] {
val msg2 = intercept[Exception] {
df6.select(map_from_arrays($"k", $"v")).collect
}
}.getMessage
assert(msg2.contains("The given two arrays should have the same length"))
}

test("struct with column name") {
Expand Down Expand Up @@ -2377,7 +2379,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
assert(ex2.getMessage.contains(
"The number of lambda function arguments '3' does not match"))

val ex3 = intercept[RuntimeException] {
val ex3 = intercept[Exception] {
dfExample1.selectExpr("transform_keys(i, (k, v) -> v)").show()
}
assert(ex3.getMessage.contains("Cannot use null as map key!"))
Expand Down Expand Up @@ -2697,7 +2699,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {

test("SPARK-24734: Fix containsNull of Concat for array type") {
val df = Seq((Seq(1), Seq[Integer](null), Seq("a", "b"))).toDF("k1", "k2", "v")
val ex = intercept[RuntimeException] {
val ex = intercept[Exception] {
df.select(map_from_arrays(concat($"k1", $"k2"), $"v")).show()
}
assert(ex.getMessage.contains("Cannot use null as map key"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSQLContex
import org.apache.spark.sql.test.SQLTestData.{NullInts, NullStrings, TestData2}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
import org.apache.spark.util.random.XORShiftRandom

class DataFrameSuite extends QueryTest with SharedSQLContext {
import testImplicits._
Expand Down Expand Up @@ -1729,10 +1730,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}

test("SPARK-9083: sort with non-deterministic expressions") {
import org.apache.spark.util.random.XORShiftRandom

val seed = 33
val df = (1 to 100).map(Tuple1.apply).toDF("i")
val df = (1 to 100).map(Tuple1.apply).toDF("i").repartition(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't follow this thread closely. Why do we need these repartition(1) changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I was just trying get this test case to pass when ConvertToLocalRelation is enabled as well as disabled. So when this rule is active, i saw that all the calls to random.nextXXX happens in one thread. When this rule is disabled, the random values get evaluated under the project operator and gets called from multiple threads. Thats why i am repartitioning the data frame to enforce single threaded execution. Is this not the right thing to do ? Please let me know ..

Copy link
Member

@HyukjinKwon HyukjinKwon Sep 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, do we still test the local relation conversion, which might be more common to users as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon We are leaving this optimization on for MLTest as of now. Should we open it up for TestHive and keep it disabled it for SharedSparkSession ? cc @gatorsmile

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this change It's okay. Was wondering if we actually make the coverage lower for local relation specifically, or if some other tests should be added additionally.

val random = new XORShiftRandom(seed)
val expected = (1 to 100).map(_ -> random.nextDouble()).sortBy(_._2).map(_._1)
val actual = df.sort(rand(seed)).collect().map(_.getInt(0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.scalatest.concurrent.Eventually

import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.internal.SQLConf

/**
Expand All @@ -39,6 +40,11 @@ trait SharedSparkSession
.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
// Disable ConvertToLocalRelation for better test coverage. Test cases built on
// LocalRelation will exercise the optimization rules better by disabling it as
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, ExternalCatalogWithListener}
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.command.CacheTableCommand
Expand All @@ -59,7 +60,12 @@ object TestHive
.set("spark.sql.warehouse.dir", TestHiveContext.makeWarehouseDir().toURI.getPath)
// SPARK-8910
.set("spark.ui.enabled", "false")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")))
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
// Disable ConvertToLocalRelation for better test coverage. Test cases built on
// LocalRelation will exercise the optimization rules better by disabling it as
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)))


case class TestHiveVersion(hiveClient: HiveClient)
Expand Down