diff --git a/docs/Configuration.md b/docs/Configuration.md index 688f83226..277243d94 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -13,7 +13,7 @@ You can add these configuration into spark-defaults.conf to enable or disable th | spark.memory.offHeap.size| To set up how much memory to be used for Java OffHeap.
Please notice Gazelle Plugin will leverage this setting to allocate memory space for native usage even offHeap is disabled.
The value is based on your system and it is recommended to set it larger if you are facing Out of Memory issue in Gazelle Plugin | 30G | | spark.sql.sources.useV1SourceList | Choose to use V1 source | avro | | spark.sql.join.preferSortMergeJoin | To turn off preferSortMergeJoin in Spark | false | -| spark.sql.extensions | To turn on Gazelle Plugin | com.intel.oap.ColumnarPlugin | +| spark.plugins | To turn on Gazelle Plugin | com.intel.oap.GazellePlugin | | spark.shuffle.manager | To turn on Gazelle Columnar Shuffle Plugin | org.apache.spark.shuffle.sort.ColumnarShuffleManager | | spark.oap.sql.columnar.batchscan | Enable or Disable Columnar Batchscan, default is true | true | | spark.oap.sql.columnar.hashagg | Enable or Disable Columnar Hash Aggregate, default is true | true | @@ -46,7 +46,7 @@ Below is an example for spark-default.conf, if you are using conda to install OA spark.sql.sources.useV1SourceList avro spark.sql.join.preferSortMergeJoin false -spark.sql.extensions com.intel.oap.ColumnarPlugin +spark.plugins com.intel.oap.GazellePlugin spark.shuffle.manager org.apache.spark.shuffle.sort.ColumnarShuffleManager # note Gazelle Plugin depends on arrow data source diff --git a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluator.java b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluator.java index c1efeadc2..f4bb82f0e 100644 --- a/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluator.java +++ b/native-sql-engine/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluator.java @@ -17,7 +17,7 @@ package com.intel.oap.vectorized; -import com.intel.oap.ColumnarPluginConfig; +import com.intel.oap.GazellePluginConfig; import com.intel.oap.execution.ColumnarNativeIterator; import com.intel.oap.spark.sql.execution.datasources.v2.arrow.Spiller; import org.apache.arrow.dataset.jni.NativeMemoryPool; @@ -51,17 +51,17 @@ public ExpressionEvaluator() throws IOException, IllegalAccessException, Illegal } public ExpressionEvaluator(List listJars) throws IOException, IllegalAccessException, IllegalStateException { - String tmp_dir = ColumnarPluginConfig.getTempFile(); + String tmp_dir = GazellePluginConfig.getTempFile(); if (tmp_dir == null) { tmp_dir = System.getProperty("java.io.tmpdir"); } jniWrapper = new ExpressionEvaluatorJniWrapper(tmp_dir, listJars); jniWrapper.nativeSetJavaTmpDir(jniWrapper.tmp_dir_path); - jniWrapper.nativeSetBatchSize(ColumnarPluginConfig.getBatchSize()); - if (ColumnarPluginConfig.getSpillThreshold() != -1) - jniWrapper.nativeSetSortSpillThreshold(ColumnarPluginConfig.getSpillThreshold()); - jniWrapper.nativeSetMetricsTime(ColumnarPluginConfig.getEnableMetricsTime()); - ColumnarPluginConfig.setRandomTempDir(jniWrapper.tmp_dir_path); + jniWrapper.nativeSetBatchSize(GazellePluginConfig.getBatchSize()); + if (GazellePluginConfig.getSpillThreshold() != -1) + jniWrapper.nativeSetSortSpillThreshold(GazellePluginConfig.getSpillThreshold()); + jniWrapper.nativeSetMetricsTime(GazellePluginConfig.getEnableMetricsTime()); + GazellePluginConfig.setRandomTempDir(jniWrapper.tmp_dir_path); } long getInstanceId() { diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala new file mode 100644 index 000000000..f81eefec1 --- /dev/null +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePlugin.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap + +import java.util +import java.util.Collections +import java.util.Objects + +import scala.language.implicitConversions + +import com.intel.oap.GazellePlugin.GAZELLE_SESSION_EXTENSION_NAME +import com.intel.oap.GazellePlugin.SPARK_SESSION_EXTS_KEY +import com.intel.oap.extension.StrategyOverrides + +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext +import org.apache.spark.api.plugin.DriverPlugin +import org.apache.spark.api.plugin.ExecutorPlugin +import org.apache.spark.api.plugin.PluginContext +import org.apache.spark.api.plugin.SparkPlugin +import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.internal.StaticSQLConf + +class GazellePlugin extends SparkPlugin { + override def driverPlugin(): DriverPlugin = { + new GazelleDriverPlugin() + } + + override def executorPlugin(): ExecutorPlugin = { + new GazelleExecutorPlugin() + } +} + +private[oap] class GazelleDriverPlugin extends DriverPlugin { + override def init(sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = { + val conf = pluginContext.conf() + setPredefinedConfigs(conf) + Collections.emptyMap() + } + + def setPredefinedConfigs(conf: SparkConf): Unit = { + if (conf.contains(SPARK_SESSION_EXTS_KEY)) { + throw new IllegalArgumentException("Spark extensions are already specified before " + + "enabling Gazelle plugin: " + conf.get(GazellePlugin.SPARK_SESSION_EXTS_KEY)) + } + conf.set(SPARK_SESSION_EXTS_KEY, GAZELLE_SESSION_EXTENSION_NAME) + } +} + +private[oap] class GazelleExecutorPlugin extends ExecutorPlugin { + // N/A +} + +private[oap] class GazelleSessionsExtensions extends (SparkSessionExtensions => Unit) { + override def apply(exts: SparkSessionExtensions): Unit = { + GazellePlugin.DEFAULT_INJECTORS.foreach(injector => injector.inject(exts)) + } +} + +private[oap] class SparkConfImplicits(conf: SparkConf) { + def enableGazellePlugin(): SparkConf = { + if (conf.contains(GazellePlugin.SPARK_SQL_PLUGINS_KEY)) { + throw new IllegalArgumentException("A Spark plugin is already specified before enabling " + + "Gazelle plugin: " + conf.get(GazellePlugin.SPARK_SQL_PLUGINS_KEY)) + } + conf.set(GazellePlugin.SPARK_SQL_PLUGINS_KEY, GazellePlugin.GAZELLE_PLUGIN_NAME) + } +} + +private[oap] trait GazelleSparkExtensionsInjector { + def inject(extensions: SparkSessionExtensions) +} + +private[oap] object GazellePlugin { + // To enable GazellePlugin in production, set "spark.plugins=com.intel.oap.GazellePlugin" + val SPARK_SQL_PLUGINS_KEY: String = "spark.plugins" + val GAZELLE_PLUGIN_NAME: String = Objects.requireNonNull(classOf[GazellePlugin] + .getCanonicalName) + val SPARK_SESSION_EXTS_KEY: String = StaticSQLConf.SPARK_SESSION_EXTENSIONS.key + val GAZELLE_SESSION_EXTENSION_NAME: String = Objects.requireNonNull( + classOf[GazelleSessionsExtensions].getCanonicalName) + + /** + * Specify all injectors that Gazelle is using in following list. + */ + val DEFAULT_INJECTORS: List[GazelleSparkExtensionsInjector] = List( + ColumnarOverrides, + StrategyOverrides + ) + + implicit def sparkConfImplicit(conf: SparkConf): SparkConfImplicits = { + new SparkConfImplicits(conf) + } +} diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala similarity index 92% rename from native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala rename to native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala index 383ee2a61..68e1c7373 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/GazellePluginConfig.scala @@ -21,18 +21,19 @@ import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.internal.SQLConf -case class ColumnarNumaBindingInfo( +case class GazelleNumaBindingInfo( enableNumaBinding: Boolean, totalCoreRange: Array[String] = null, numCoresPerExecutor: Int = -1) {} -class ColumnarPluginConfig(conf: SQLConf) extends Logging { +class GazellePluginConfig(conf: SQLConf) extends Logging { def getCpu(): Boolean = { val source = scala.io.Source.fromFile("/proc/cpuinfo") val lines = try source.mkString finally source.close() //TODO(): check CPU flags to enable/disable AVX512 + return true if (lines.contains("GenuineIntel")) { return true } else { @@ -78,6 +79,10 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { val enableArrowColumnarToRow: Boolean = conf.getConfString("spark.oap.sql.columnar.columnartorow", "true").toBoolean && enableCpu + val forceShuffledHashJoin: Boolean = + conf.getConfString("spark.oap.sql.columnar.forceshuffledhashjoin", "false").toBoolean && + enableCpu + // enable or disable columnar sortmergejoin // this should be set with preferSortMergeJoin=false val enableColumnarSortMergeJoin: Boolean = @@ -161,42 +166,42 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { val columnarShuffleUseCustomizedCompressionCodec: String = conf.getConfString("spark.oap.sql.columnar.shuffle.customizedCompression.codec", "lz4") - val numaBindingInfo: ColumnarNumaBindingInfo = { + val numaBindingInfo: GazelleNumaBindingInfo = { val enableNumaBinding: Boolean = conf.getConfString("spark.oap.sql.columnar.numaBinding", "false").toBoolean if (!enableNumaBinding) { - ColumnarNumaBindingInfo(false) + GazelleNumaBindingInfo(false) } else { val tmp = conf.getConfString("spark.oap.sql.columnar.coreRange", null) if (tmp == null) { - ColumnarNumaBindingInfo(false) + GazelleNumaBindingInfo(false) } else { val numCores = conf.getConfString("spark.executor.cores", "1").toInt val coreRangeList: Array[String] = tmp.split('|').map(_.trim) - ColumnarNumaBindingInfo(true, coreRangeList, numCores) + GazelleNumaBindingInfo(true, coreRangeList, numCores) } } } } -object ColumnarPluginConfig { - var ins: ColumnarPluginConfig = null +object GazellePluginConfig { + var ins: GazellePluginConfig = null var random_temp_dir_path: String = null /** * @deprecated We should avoid caching this value in entire JVM. us */ @Deprecated - def getConf: ColumnarPluginConfig = synchronized { + def getConf: GazellePluginConfig = synchronized { if (ins == null) { ins = getSessionConf } ins } - def getSessionConf: ColumnarPluginConfig = { - new ColumnarPluginConfig(SQLConf.get) + def getSessionConf: GazellePluginConfig = { + new GazellePluginConfig(SQLConf.get) } def getBatchSize: Int = synchronized { diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala index 15325e111..8b72fcf08 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBasicPhysicalOperators.scala @@ -17,7 +17,7 @@ package com.intel.oap.execution -import com.intel.oap.ColumnarPluginConfig +import com.intel.oap.GazellePluginConfig import com.intel.oap.expression._ import com.intel.oap.vectorized._ import org.apache.spark.sql.catalyst.InternalRow @@ -35,7 +35,7 @@ import org.apache.spark.{SparkConf, TaskContext} import org.apache.arrow.gandiva.expression._ import org.apache.arrow.vector.types.pojo.ArrowType import com.google.common.collect.Lists -import com.intel.oap.ColumnarPluginConfig +import com.intel.oap.GazellePluginConfig import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils; case class ColumnarConditionProjectExec( @@ -48,7 +48,7 @@ case class ColumnarConditionProjectExec( with AliasAwareOutputPartitioning with Logging { - val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo + val numaBindingInfo = GazellePluginConfig.getConf.numaBindingInfo val sparkConf: SparkConf = sparkContext.getConf @@ -249,7 +249,7 @@ case class ColumnarConditionProjectExec( numInputBatches.set(0) child.executeColumnar().mapPartitions { iter => - ColumnarPluginConfig.getConf + GazellePluginConfig.getConf ExecutorManager.tryTaskSet(numaBindingInfo) val condProj = ColumnarConditionProjector.create( condition, diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExec.scala index ac2deaa87..ed8be5ad8 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBatchScanExec.scala @@ -17,7 +17,7 @@ package com.intel.oap.execution -import com.intel.oap.ColumnarPluginConfig +import com.intel.oap.GazellePluginConfig import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan} @@ -27,7 +27,7 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} class ColumnarBatchScanExec(output: Seq[AttributeReference], @transient scan: Scan) extends BatchScanExec(output, scan) { - val tmpDir: String = ColumnarPluginConfig.getConf.tmpFile + val tmpDir: String = GazellePluginConfig.getConf.tmpFile override def supportsColumnar(): Boolean = true override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala index ac0df8cac..2f050fc78 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarBroadcastHashJoinExec.scala @@ -18,7 +18,7 @@ package com.intel.oap.execution import com.google.common.collect.Lists -import com.intel.oap.ColumnarPluginConfig +import com.intel.oap.GazellePluginConfig import com.intel.oap.expression._ import com.intel.oap.vectorized.{ExpressionEvaluator, _} import org.apache.arrow.gandiva.expression._ @@ -62,7 +62,7 @@ case class ColumnarBroadcastHashJoinExec( with ShuffledJoin { val sparkConf = sparkContext.getConf - val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo + val numaBindingInfo = GazellePluginConfig.getConf.numaBindingInfo override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"), @@ -495,7 +495,7 @@ case class ColumnarBroadcastHashJoinExec( def uploadAndListJars(signature: String): Seq[String] = if (signature != "") { if (sparkContext.listJars.filter(path => path.contains(s"${signature}.jar")).isEmpty) { - val tempDir = ColumnarPluginConfig.getRandomTempDir + val tempDir = GazellePluginConfig.getRandomTempDir val jarFileName = s"${tempDir}/tmp/spark-columnar-plugin-codegen-precompile-${signature}.jar" sparkContext.addJar(jarFileName) @@ -525,8 +525,8 @@ case class ColumnarBroadcastHashJoinExec( streamedPlan.executeColumnar().mapPartitions { streamIter => ExecutorManager.tryTaskSet(numaBindingInfo) - ColumnarPluginConfig.getConf - val execTempDir = ColumnarPluginConfig.getTempFile + GazellePluginConfig.getConf + val execTempDir = GazellePluginConfig.getTempFile val jarList = listJars.map(jarUrl => { logWarning(s"Get Codegened library Jar ${jarUrl}") UserAddedJarUtils.fetchJarFromSpark( diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarDataSourceRDD.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarDataSourceRDD.scala index 8b63fa897..0187e7577 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarDataSourceRDD.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarDataSourceRDD.scala @@ -17,7 +17,7 @@ package com.intel.oap.execution -import com.intel.oap.ColumnarPluginConfig +import com.intel.oap.GazellePluginConfig import com.intel.oap.vectorized._ import org.apache.spark._ import org.apache.spark.rdd.RDD @@ -47,7 +47,7 @@ class ColumnarDataSourceRDD( inputSize: SQLMetric, tmp_dir: String) extends RDD[ColumnarBatch](sc, Nil) { - val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo + val numaBindingInfo = GazellePluginConfig.getConf.numaBindingInfo override protected def getPartitions: Array[Partition] = { inputPartitions.zipWithIndex.map { diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala index eff751347..1fd9e8734 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarHashAggregateExec.scala @@ -17,7 +17,7 @@ package com.intel.oap.execution -import com.intel.oap.ColumnarPluginConfig +import com.intel.oap.GazellePluginConfig import com.intel.oap.expression._ import com.intel.oap.vectorized._ import com.google.common.collect.Lists @@ -73,7 +73,7 @@ case class ColumnarHashAggregateExec( with AliasAwareOutputPartitioning { val sparkConf = sparkContext.getConf - val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo + val numaBindingInfo = GazellePluginConfig.getConf.numaBindingInfo override def supportsColumnar = true var resAttributes: Seq[Attribute] = resultExpressions.map(_.toAttribute) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala index dc2b6083b..d78156982 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarShuffledHashJoinExec.scala @@ -20,7 +20,7 @@ package com.intel.oap.execution import java.util.concurrent.TimeUnit._ import com.intel.oap.vectorized._ -import com.intel.oap.ColumnarPluginConfig +import com.intel.oap.GazellePluginConfig import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.util.{UserAddedJarUtils, Utils, ExecutorManager} @@ -72,7 +72,7 @@ case class ColumnarShuffledHashJoinExec( with ShuffledJoin { val sparkConf = sparkContext.getConf - val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo + val numaBindingInfo = GazellePluginConfig.getConf.numaBindingInfo override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"), @@ -407,7 +407,7 @@ case class ColumnarShuffledHashJoinExec( def uploadAndListJars(signature: String): Seq[String] = if (signature != "") { if (sparkContext.listJars.filter(path => path.contains(s"${signature}.jar")).isEmpty) { - val tempDir = ColumnarPluginConfig.getRandomTempDir + val tempDir = GazellePluginConfig.getRandomTempDir val jarFileName = s"${tempDir}/tmp/spark-columnar-plugin-codegen-precompile-${signature}.jar" sparkContext.addJar(jarFileName) @@ -476,8 +476,8 @@ case class ColumnarShuffledHashJoinExec( streamedPlan.executeColumnar().zipPartitions(buildPlan.executeColumnar()) { (streamIter, buildIter) => ExecutorManager.tryTaskSet(numaBindingInfo) - ColumnarPluginConfig.getConf - val execTempDir = ColumnarPluginConfig.getTempFile + GazellePluginConfig.getConf + val execTempDir = GazellePluginConfig.getTempFile val jarList = listJars.map(jarUrl => { logWarning(s"Get Codegened library Jar ${jarUrl}") UserAddedJarUtils.fetchJarFromSpark( diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortExec.scala index 6b33b0fbd..61dca8a01 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortExec.scala @@ -17,7 +17,7 @@ package com.intel.oap.execution -import com.intel.oap.ColumnarPluginConfig +import com.intel.oap.GazellePluginConfig import com.intel.oap.expression._ import com.intel.oap.vectorized._ import com.google.common.collect.Lists @@ -60,7 +60,7 @@ case class ColumnarSortExec( with ColumnarCodegenSupport { val sparkConf = sparkContext.getConf - val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo + val numaBindingInfo = GazellePluginConfig.getConf.numaBindingInfo override def supportsColumnar = true override protected def doExecute(): RDD[InternalRow] = { throw new UnsupportedOperationException(s"ColumnarSortExec doesn't support doExecute") @@ -187,7 +187,7 @@ case class ColumnarSortExec( def uploadAndListJars(signature: String): Seq[String] = if (signature != "") { if (sparkContext.listJars.filter(path => path.contains(s"${signature}.jar")).isEmpty) { - val tempDir = ColumnarPluginConfig.getRandomTempDir + val tempDir = GazellePluginConfig.getRandomTempDir val jarFileName = s"${tempDir}/tmp/spark-columnar-plugin-codegen-precompile-${signature}.jar" sparkContext.addJar(jarFileName) @@ -210,8 +210,8 @@ case class ColumnarSortExec( // If sortOrder are all Literal, no need to do sorting. new CloseableColumnBatchIterator(iter) } else { - ColumnarPluginConfig.getConf - val execTempDir = ColumnarPluginConfig.getTempFile + GazellePluginConfig.getConf + val execTempDir = GazellePluginConfig.getTempFile val jarList = listJars.map(jarUrl => { logWarning(s"Get Codegened library Jar ${jarUrl}") UserAddedJarUtils.fetchJarFromSpark( diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala index 90475407b..8c1f669b5 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarSortMergeJoinExec.scala @@ -20,7 +20,7 @@ package com.intel.oap.execution import java.util.concurrent.TimeUnit._ import com.intel.oap.vectorized._ -import com.intel.oap.ColumnarPluginConfig +import com.intel.oap.GazellePluginConfig import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.util.{UserAddedJarUtils, Utils} @@ -410,7 +410,7 @@ case class ColumnarSortMergeJoinExec( def uploadAndListJars(signature: String) = if (signature != "") { if (sparkContext.listJars.filter(path => path.contains(s"${signature}.jar")).isEmpty) { - val tempDir = ColumnarPluginConfig.getRandomTempDir + val tempDir = GazellePluginConfig.getRandomTempDir val jarFileName = s"${tempDir}/tmp/spark-columnar-plugin-codegen-precompile-${signature}.jar" sparkContext.addJar(jarFileName) @@ -424,8 +424,8 @@ case class ColumnarSortMergeJoinExec( val signature = getCodeGenSignature val listJars = uploadAndListJars(signature) right.executeColumnar().zipPartitions(left.executeColumnar()) { (streamIter, buildIter) => - ColumnarPluginConfig.getConf - val execTempDir = ColumnarPluginConfig.getTempFile + GazellePluginConfig.getConf + val execTempDir = GazellePluginConfig.getTempFile val jarList = listJars.map(jarUrl => { logWarning(s"Get Codegened library Jar ${jarUrl}") UserAddedJarUtils.fetchJarFromSpark( diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala index fcfcf9776..fae78cb5a 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWholeStageCodegenExec.scala @@ -20,7 +20,7 @@ package com.intel.oap.execution import java.util.concurrent.TimeUnit.NANOSECONDS import com.google.common.collect.Lists -import com.intel.oap.ColumnarPluginConfig +import com.intel.oap.GazellePluginConfig import com.intel.oap.expression._ import com.intel.oap.vectorized.{BatchIterator, ExpressionEvaluator, _} import org.apache.arrow.gandiva.expression._ @@ -74,8 +74,8 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I with ColumnarCodegenSupport { val sparkConf = sparkContext.getConf - val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo - val enableColumnarSortMergeJoinLazyRead = ColumnarPluginConfig.getConf.enableColumnarSortMergeJoinLazyRead + val numaBindingInfo = GazellePluginConfig.getConf.numaBindingInfo + val enableColumnarSortMergeJoinLazyRead = GazellePluginConfig.getConf.enableColumnarSortMergeJoinLazyRead override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), @@ -123,7 +123,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I def uploadAndListJars(signature: String): Seq[String] = if (signature != "") { if (sparkContext.listJars.filter(path => path.contains(s"${signature}.jar")).isEmpty) { - val tempDir = ColumnarPluginConfig.getRandomTempDir + val tempDir = GazellePluginConfig.getRandomTempDir val jarFileName = s"${tempDir}/tmp/spark-columnar-plugin-codegen-precompile-${signature}.jar" sparkContext.addJar(jarFileName) @@ -304,7 +304,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I val buildPlan = p.getBuildPlan val buildInputByteBuf = buildPlan.executeBroadcast[ColumnarHashedRelation]() curRDD.mapPartitions { iter => - ColumnarPluginConfig.getConf + GazellePluginConfig.getConf ExecutorManager.tryTaskSet(numaBindingInfo) // received broadcast value contain a hashmap and raw recordBatch val beforeFetch = System.nanoTime() @@ -442,8 +442,8 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I curRDD.mapPartitions { iter => ExecutorManager.tryTaskSet(numaBindingInfo) - ColumnarPluginConfig.getConf - val execTempDir = ColumnarPluginConfig.getTempFile + GazellePluginConfig.getConf + val execTempDir = GazellePluginConfig.getTempFile val jarList = listJars.map(jarUrl => { logWarning(s"Get Codegened library Jar ${jarUrl}") UserAddedJarUtils.fetchJarFromSpark( diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala index fb6a24ff3..9975ded0a 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala @@ -20,7 +20,7 @@ package com.intel.oap.execution import java.util.concurrent.TimeUnit import com.google.flatbuffers.FlatBufferBuilder -import com.intel.oap.ColumnarPluginConfig +import com.intel.oap.GazellePluginConfig import com.intel.oap.expression.{CodeGeneration, ConverterUtils} import com.intel.oap.vectorized.{ArrowWritableColumnVector, CloseableColumnBatchIterator, ExpressionEvaluator} import org.apache.arrow.gandiva.expression.TreeBuilder @@ -92,7 +92,7 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], val totalTime = longMetric("totalTime") val sparkConf = sparkContext.getConf - val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo + val numaBindingInfo = GazellePluginConfig.getConf.numaBindingInfo def buildCheck(): Unit = { var allLiteral = true diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/DataToArrowColumnarExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/DataToArrowColumnarExec.scala index 2f21b2e94..5ecb4cfcb 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/DataToArrowColumnarExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/DataToArrowColumnarExec.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import java.util.concurrent.TimeUnit._ import com.intel.oap.vectorized._ -import com.intel.oap.ColumnarPluginConfig +import com.intel.oap.GazellePluginConfig import org.apache.spark.{broadcast, TaskContext} import org.apache.spark.rdd.RDD diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryOperator.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryOperator.scala index 7f8feb213..b0787993e 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryOperator.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarBinaryOperator.scala @@ -18,7 +18,7 @@ package com.intel.oap.expression import com.google.common.collect.Lists -import com.intel.oap.ColumnarPluginConfig +import com.intel.oap.GazellePluginConfig import org.apache.arrow.gandiva.evaluator._ import org.apache.arrow.gandiva.exceptions.GandivaException import org.apache.arrow.gandiva.expression._ @@ -176,7 +176,7 @@ class ColumnarEqualTo(left: Expression, right: Expression, original: Expression) } var function = "equal" - val nanCheck = ColumnarPluginConfig.getConf.enableColumnarNaNCheck + val nanCheck = GazellePluginConfig.getConf.enableColumnarNaNCheck if (nanCheck) { unifiedType match { case t: ArrowType.FloatingPoint => @@ -227,7 +227,7 @@ class ColumnarEqualNull(left: Expression, right: Expression, original: Expressio val falseNode = TreeBuilder.makeLiteral(false.asInstanceOf[java.lang.Boolean]) var function = "equal" - val nanCheck = ColumnarPluginConfig.getConf.enableColumnarNaNCheck + val nanCheck = GazellePluginConfig.getConf.enableColumnarNaNCheck if (nanCheck) { unifiedType match { case t: ArrowType.FloatingPoint => @@ -274,7 +274,7 @@ class ColumnarLessThan(left: Expression, right: Expression, original: Expression } var function = "less_than" - val nanCheck = ColumnarPluginConfig.getConf.enableColumnarNaNCheck + val nanCheck = GazellePluginConfig.getConf.enableColumnarNaNCheck if (nanCheck) { unifiedType match { case t: ArrowType.FloatingPoint => @@ -316,7 +316,7 @@ class ColumnarLessThanOrEqual(left: Expression, right: Expression, original: Exp } var function = "less_than_or_equal_to" - val nanCheck = ColumnarPluginConfig.getConf.enableColumnarNaNCheck + val nanCheck = GazellePluginConfig.getConf.enableColumnarNaNCheck if (nanCheck) { unifiedType match { case t: ArrowType.FloatingPoint => @@ -360,7 +360,7 @@ class ColumnarGreaterThan(left: Expression, right: Expression, original: Express } var function = "greater_than" - val nanCheck = ColumnarPluginConfig.getConf.enableColumnarNaNCheck + val nanCheck = GazellePluginConfig.getConf.enableColumnarNaNCheck if (nanCheck) { unifiedType match { case t: ArrowType.FloatingPoint => @@ -404,7 +404,7 @@ class ColumnarGreaterThanOrEqual(left: Expression, right: Expression, original: } var function = "greater_than_or_equal_to" - val nanCheck = ColumnarPluginConfig.getConf.enableColumnarNaNCheck + val nanCheck = GazellePluginConfig.getConf.enableColumnarNaNCheck if (nanCheck) { unifiedType match { case t: ArrowType.FloatingPoint => diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConditionedProbeJoin.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConditionedProbeJoin.scala index 56297d2ea..8233c2ffa 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConditionedProbeJoin.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarConditionedProbeJoin.scala @@ -19,7 +19,7 @@ package com.intel.oap.expression import java.util.concurrent.TimeUnit._ -import com.intel.oap.ColumnarPluginConfig +import com.intel.oap.GazellePluginConfig import com.intel.oap.vectorized.ArrowWritableColumnVector import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarHashAggregation.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarHashAggregation.scala index 31e9786ea..7231498d4 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarHashAggregation.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarHashAggregation.scala @@ -23,7 +23,7 @@ import java.util.Collections import java.util.concurrent.TimeUnit._ import util.control.Breaks._ -import com.intel.oap.ColumnarPluginConfig +import com.intel.oap.GazellePluginConfig import com.intel.oap.vectorized.ArrowWritableColumnVector import org.apache.spark.sql.util.ArrowUtils import com.intel.oap.vectorized.ExpressionEvaluator @@ -393,7 +393,7 @@ class ColumnarHashAggregation( def prepareKernelFunction: TreeNode = { // build gandiva projection here. - ColumnarPluginConfig.getConf + GazellePluginConfig.getConf val mode = if (aggregateExpressions.size > 0) { aggregateExpressions(0).mode diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarSortMergeJoin.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarSortMergeJoin.scala index 74d10afad..579648f68 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarSortMergeJoin.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarSortMergeJoin.scala @@ -19,7 +19,7 @@ package com.intel.oap.expression import java.util.concurrent.TimeUnit._ -import com.intel.oap.ColumnarPluginConfig +import com.intel.oap.GazellePluginConfig import com.intel.oap.vectorized.ArrowWritableColumnVector import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD @@ -79,7 +79,7 @@ class ColumnarSortMergeJoin( streamProjector: ColumnarProjection, streamKeyProjectOrdinalList: List[Int]) extends Logging { - ColumnarPluginConfig.getConf + GazellePluginConfig.getConf var probe_iterator: BatchIterator = _ var build_cb: ColumnarBatch = null var last_cb: ColumnarBatch = null @@ -279,7 +279,7 @@ object ColumnarSortMergeJoin extends Logging { val totaltime_sortmergejoin = _totaltime_sortmergejoin val numOutputRows = _numOutputRows val sparkConf = _sparkConf - ColumnarPluginConfig.getConf + GazellePluginConfig.getConf // TODO val l_input_schema: List[Attribute] = left.output.toList val r_input_schema: List[Attribute] = right.output.toList diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarSorter.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarSorter.scala index 1531083de..cf0850e16 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarSorter.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ColumnarSorter.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import java.util.concurrent.TimeUnit._ import com.google.common.collect.Lists -import com.intel.oap.ColumnarPluginConfig +import com.intel.oap.GazellePluginConfig import com.intel.oap.vectorized.ArrowWritableColumnVector import com.intel.oap.vectorized.ExpressionEvaluator import com.intel.oap.vectorized.BatchIterator @@ -233,8 +233,8 @@ object ColumnarSorter extends Logging { result_type: Int = 0): TreeNode = { logInfo(s"ColumnarSorter sortOrder is ${sortOrder}, outputAttributes is ${outputAttributes}") checkIfKeyFound(sortOrder, outputAttributes) - val NaNCheck = ColumnarPluginConfig.getConf.enableColumnarNaNCheck - val codegen = ColumnarPluginConfig.getConf.enableColumnarCodegenSort + val NaNCheck = GazellePluginConfig.getConf.enableColumnarNaNCheck + val codegen = GazellePluginConfig.getConf.enableColumnarCodegenSort /////////////// Prepare ColumnarSorter ////////////// val keyFieldList: List[Field] = sortOrder.toList.map(sort => { val attr = ConverterUtils.getAttrFromExpr(sort.child) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala similarity index 96% rename from native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala rename to native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index 3778e1726..08d0902ea 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -18,7 +18,10 @@ package com.intel.oap import com.intel.oap.execution._ +import com.intel.oap.extension.columnar.ColumnarGuardRule +import com.intel.oap.extension.columnar.RowGuard import com.intel.oap.sql.execution.RowToArrowColumnarExec + import org.apache.spark.internal.config._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} @@ -37,7 +40,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.CalendarIntervalType case class ColumnarPreOverrides() extends Rule[SparkPlan] { - val columnarConf: ColumnarPluginConfig = ColumnarPluginConfig.getSessionConf + val columnarConf: GazellePluginConfig = GazellePluginConfig.getSessionConf var isSupportAdaptive: Boolean = true def replaceWithColumnarPlan(plan: SparkPlan): SparkPlan = plan match { @@ -284,7 +287,7 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { } case class ColumnarPostOverrides() extends Rule[SparkPlan] { - val columnarConf = ColumnarPluginConfig.getSessionConf + val columnarConf = GazellePluginConfig.getSessionConf var isSupportAdaptive: Boolean = true def replaceWithColumnarPlan(plan: SparkPlan): SparkPlan = plan match { @@ -396,19 +399,10 @@ case class ColumnarOverrideRules(session: SparkSession) extends ColumnarRule wit plan } } - } -/** - * Extension point to enable columnar processing. - * - * To run with columnar set spark.sql.extensions to com.intel.oap.ColumnarPlugin - */ -class ColumnarPlugin extends Function1[SparkSessionExtensions, Unit] with Logging { - override def apply(extensions: SparkSessionExtensions): Unit = { - logDebug( - "Installing extensions to enable columnar CPU support." + - " To disable this set `org.apache.spark.example.columnar.enabled` to false") - extensions.injectColumnar((session) => ColumnarOverrideRules(session)) +object ColumnarOverrides extends GazelleSparkExtensionsInjector { + override def inject(extensions: SparkSessionExtensions): Unit = { + extensions.injectColumnar(ColumnarOverrideRules) } } diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/DriverOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/DriverOverrides.scala new file mode 100644 index 000000000..58d9d0828 --- /dev/null +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/DriverOverrides.scala @@ -0,0 +1,5 @@ +package com.intel.oap.extension + +class DriverOverrides { + +} diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/StrategyOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/StrategyOverrides.scala new file mode 100644 index 000000000..634ad615f --- /dev/null +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/StrategyOverrides.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.extension + +import com.intel.oap.GazellePlugin +import com.intel.oap.GazellePluginConfig +import com.intel.oap.GazelleSparkExtensionsInjector + +import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.Strategy +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.optimizer.JoinSelectionHelper +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.joins + + +object JoinSelectionOverrides extends Strategy with JoinSelectionHelper with SQLConfHelper { + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + // targeting equi-joins only + case j @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) => + if (getBroadcastBuildSide(left, right, joinType, hint, hintOnly = false, conf).isDefined) { + return Nil + } + + if (GazellePluginConfig.getSessionConf.forceShuffledHashJoin) { + // Force use of ShuffledHashJoin in preference to SortMergeJoin. With no respect to + // conf setting "spark.sql.join.preferSortMergeJoin". + return Option(getSmallerSide(left, right)).map { + buildSide => + Seq(joins.ShuffledHashJoinExec( + leftKeys, + rightKeys, + joinType, + buildSide, + nonEquiCond, + planLater(left), + planLater(right))) + }.getOrElse(Nil) + } + + Nil + case _ => Nil + } +} + +object StrategyOverrides extends GazelleSparkExtensionsInjector { + override def inject(extensions: SparkSessionExtensions): Unit = { + extensions.injectPlannerStrategy(_ => JoinSelectionOverrides) + } +} diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala similarity index 96% rename from native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala rename to native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala index 7a4092435..ee02e165c 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala @@ -15,17 +15,16 @@ * limitations under the License. */ -package com.intel.oap +package com.intel.oap.extension.columnar +import com.intel.oap.GazellePluginConfig import com.intel.oap.execution._ -import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging + import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} -import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.FullOuter +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ import org.apache.spark.sql.execution.aggregate.HashAggregateExec @@ -33,9 +32,9 @@ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins._ -import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, ColumnarArrowEvalPythonExec} +import org.apache.spark.sql.execution.python.ArrowEvalPythonExec +import org.apache.spark.sql.execution.python.ColumnarArrowEvalPythonExec import org.apache.spark.sql.execution.window.WindowExec -import org.apache.spark.sql.internal.SQLConf case class RowGuard(child: SparkPlan) extends SparkPlan { def output: Seq[Attribute] = child.output @@ -46,7 +45,7 @@ case class RowGuard(child: SparkPlan) extends SparkPlan { } case class ColumnarGuardRule() extends Rule[SparkPlan] { - val columnarConf = ColumnarPluginConfig.getSessionConf + val columnarConf = GazellePluginConfig.getSessionConf val preferColumnar = columnarConf.enablePreferColumnar val optimizeLevel = columnarConf.joinOptimizationThrottle val enableColumnarShuffle = columnarConf.enableColumnarShuffle diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index a3c1f8a68..dbb895474 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -20,7 +20,7 @@ package org.apache.spark.shuffle import java.io.IOException import com.google.common.annotations.VisibleForTesting -import com.intel.oap.ColumnarPluginConfig +import com.intel.oap.GazellePluginConfig import com.intel.oap.expression.ConverterUtils import com.intel.oap.spark.sql.execution.datasources.v2.arrow.Spiller import com.intel.oap.vectorized.{ArrowWritableColumnVector, ShuffleSplitterJniWrapper, SplitResult} @@ -62,13 +62,13 @@ class ColumnarShuffleWriter[K, V]( conf.getInt("spark.sql.execution.arrow.maxRecordsPerBatch", 4096) private val customizedCompressCodec = - ColumnarPluginConfig.getConf.columnarShuffleUseCustomizedCompressionCodec + GazellePluginConfig.getConf.columnarShuffleUseCustomizedCompressionCodec private val defaultCompressionCodec = if (conf.getBoolean("spark.shuffle.compress", true)) { conf.get("spark.io.compression.codec", "lz4") } else { "uncompressed" } - private val preferSpill = ColumnarPluginConfig.getConf.columnarShufflePreferSpill + private val preferSpill = GazellePluginConfig.getConf.columnarShufflePreferSpill private val jniWrapper = new ShuffleSplitterJniWrapper() diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/util/ExecutorManager.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/util/ExecutorManager.scala index ce0262c78..4b40519a8 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/util/ExecutorManager.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/util/ExecutorManager.scala @@ -30,7 +30,7 @@ import com.intel.oap._ object ExecutorManager { def getExecutorIds(sc: SparkContext): Seq[String] = sc.getExecutorIds var isTaskSet: Boolean = false - def tryTaskSet(numaInfo: ColumnarNumaBindingInfo) = synchronized { + def tryTaskSet(numaInfo: GazelleNumaBindingInfo) = synchronized { if (numaInfo.enableNumaBinding && !isTaskSet) { val cmd_output = Utils.executeAndGetOutput( diff --git a/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala b/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala index 9ade2b69d..abe99bb3b 100644 --- a/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala +++ b/native-sql-engine/core/src/test/scala/com/intel/oap/misc/DateTimeSuite.scala @@ -47,7 +47,7 @@ class DateTimeSuite extends QueryTest with SharedSparkSession { val conf = super.sparkConf conf.set("spark.memory.offHeap.size", String.valueOf(MAX_DIRECT_MEMORY)) - .set("spark.sql.extensions", "com.intel.oap.ColumnarPlugin") + .set("spark.plugins", "com.intel.oap.GazellePlugin") .set("spark.sql.codegen.wholeStage", "false") .set("spark.sql.sources.useV1SourceList", "") .set("spark.sql.columnar.tmp_dir", "/tmp/") diff --git a/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala b/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala index f90b4b936..0c023194f 100644 --- a/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala +++ b/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala @@ -34,7 +34,7 @@ class TPCDSSuite extends QueryTest with SharedSparkSession { override protected def sparkConf: SparkConf = { val conf = super.sparkConf conf.set("spark.memory.offHeap.size", String.valueOf(MAX_DIRECT_MEMORY)) - .set("spark.sql.extensions", "com.intel.oap.ColumnarPlugin") + .set("spark.plugins", "com.intel.oap.GazellePlugin") .set("spark.sql.codegen.wholeStage", "true") .set("spark.sql.sources.useV1SourceList", "") .set("spark.oap.sql.columnar.tmp_dir", "/tmp/") @@ -106,10 +106,24 @@ class TPCDSSuite extends QueryTest with SharedSparkSession { runner.runTPCQuery("q95", 1, true) } + test("q2") { + runner.runTPCQuery("q2", 1, true) + } + + test("q2 - shj") { + withSQLConf(("spark.oap.sql.columnar.forceshuffledhashjoin", "true")) { + runner.runTPCQuery("q2", 1, true) + } + } + test("q47") { runner.runTPCQuery("q47", 1, true) } + test("q59") { + runner.runTPCQuery("q59", 1, true) + } + test("window function with non-decimal input") { val df = spark.sql("SELECT i_item_sk, i_class_id, SUM(i_category_id)" + " OVER (PARTITION BY i_class_id) FROM item LIMIT 1000") diff --git a/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/h/TPCHSuite.scala b/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/h/TPCHSuite.scala index 72b96179d..7aab58b8d 100644 --- a/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/h/TPCHSuite.scala +++ b/native-sql-engine/core/src/test/scala/com/intel/oap/tpc/h/TPCHSuite.scala @@ -49,7 +49,7 @@ class TPCHSuite extends QueryTest with SharedSparkSession { override protected def sparkConf: SparkConf = { val conf = super.sparkConf conf.set("spark.memory.offHeap.size", String.valueOf(MAX_DIRECT_MEMORY)) - .set("spark.sql.extensions", "com.intel.oap.ColumnarPlugin") + .set("spark.plugins", "com.intel.oap.GazellePlugin") .set("spark.sql.codegen.wholeStage", "false") .set("spark.sql.sources.useV1SourceList", "") .set("spark.oap.sql.columnar.tmp_dir", "/tmp/") diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala index e608df4c1..34df53ec2 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala @@ -78,7 +78,7 @@ trait SharedSparkSessionBase .setAppName("test") .set("spark.sql.parquet.columnarReaderBatchSize", "4096") .set("spark.sql.sources.useV1SourceList", "avro") - .set("spark.sql.extensions", "com.intel.oap.ColumnarPlugin") + .set("spark.plugins", "com.intel.oap.GazellePlugin") .set("spark.sql.execution.arrow.maxRecordsPerBatch", "4096") // .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") .set("spark.memory.offHeap.enabled", "true")