Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-512] Add strategy to force use of SHJ (#513)
Browse files Browse the repository at this point in the history
Closes #512
  • Loading branch information
zhztheplayer authored Oct 8, 2021
1 parent 7597fb1 commit e88fc25
Show file tree
Hide file tree
Showing 30 changed files with 299 additions and 106 deletions.
4 changes: 2 additions & 2 deletions docs/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ You can add these configuration into spark-defaults.conf to enable or disable th
| spark.memory.offHeap.size| To set up how much memory to be used for Java OffHeap.<br /> Please notice Gazelle Plugin will leverage this setting to allocate memory space for native usage even offHeap is disabled. <br /> The value is based on your system and it is recommended to set it larger if you are facing Out of Memory issue in Gazelle Plugin | 30G |
| spark.sql.sources.useV1SourceList | Choose to use V1 source | avro |
| spark.sql.join.preferSortMergeJoin | To turn off preferSortMergeJoin in Spark | false |
| spark.sql.extensions | To turn on Gazelle Plugin | com.intel.oap.ColumnarPlugin |
| spark.plugins | To turn on Gazelle Plugin | com.intel.oap.GazellePlugin |
| spark.shuffle.manager | To turn on Gazelle Columnar Shuffle Plugin | org.apache.spark.shuffle.sort.ColumnarShuffleManager |
| spark.oap.sql.columnar.batchscan | Enable or Disable Columnar Batchscan, default is true | true |
| spark.oap.sql.columnar.hashagg | Enable or Disable Columnar Hash Aggregate, default is true | true |
Expand Down Expand Up @@ -46,7 +46,7 @@ Below is an example for spark-default.conf, if you are using conda to install OA
spark.sql.sources.useV1SourceList avro
spark.sql.join.preferSortMergeJoin false
spark.sql.extensions com.intel.oap.ColumnarPlugin
spark.plugins com.intel.oap.GazellePlugin
spark.shuffle.manager org.apache.spark.shuffle.sort.ColumnarShuffleManager
# note Gazelle Plugin depends on arrow data source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.intel.oap.vectorized;

import com.intel.oap.ColumnarPluginConfig;
import com.intel.oap.GazellePluginConfig;
import com.intel.oap.execution.ColumnarNativeIterator;
import com.intel.oap.spark.sql.execution.datasources.v2.arrow.Spiller;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
Expand Down Expand Up @@ -51,17 +51,17 @@ public ExpressionEvaluator() throws IOException, IllegalAccessException, Illegal
}

public ExpressionEvaluator(List<String> listJars) throws IOException, IllegalAccessException, IllegalStateException {
String tmp_dir = ColumnarPluginConfig.getTempFile();
String tmp_dir = GazellePluginConfig.getTempFile();
if (tmp_dir == null) {
tmp_dir = System.getProperty("java.io.tmpdir");
}
jniWrapper = new ExpressionEvaluatorJniWrapper(tmp_dir, listJars);
jniWrapper.nativeSetJavaTmpDir(jniWrapper.tmp_dir_path);
jniWrapper.nativeSetBatchSize(ColumnarPluginConfig.getBatchSize());
if (ColumnarPluginConfig.getSpillThreshold() != -1)
jniWrapper.nativeSetSortSpillThreshold(ColumnarPluginConfig.getSpillThreshold());
jniWrapper.nativeSetMetricsTime(ColumnarPluginConfig.getEnableMetricsTime());
ColumnarPluginConfig.setRandomTempDir(jniWrapper.tmp_dir_path);
jniWrapper.nativeSetBatchSize(GazellePluginConfig.getBatchSize());
if (GazellePluginConfig.getSpillThreshold() != -1)
jniWrapper.nativeSetSortSpillThreshold(GazellePluginConfig.getSpillThreshold());
jniWrapper.nativeSetMetricsTime(GazellePluginConfig.getEnableMetricsTime());
GazellePluginConfig.setRandomTempDir(jniWrapper.tmp_dir_path);
}

long getInstanceId() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap

import java.util
import java.util.Collections
import java.util.Objects

import scala.language.implicitConversions

import com.intel.oap.GazellePlugin.GAZELLE_SESSION_EXTENSION_NAME
import com.intel.oap.GazellePlugin.SPARK_SESSION_EXTS_KEY
import com.intel.oap.extension.StrategyOverrides

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.api.plugin.DriverPlugin
import org.apache.spark.api.plugin.ExecutorPlugin
import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.api.plugin.SparkPlugin
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.internal.StaticSQLConf

class GazellePlugin extends SparkPlugin {
override def driverPlugin(): DriverPlugin = {
new GazelleDriverPlugin()
}

override def executorPlugin(): ExecutorPlugin = {
new GazelleExecutorPlugin()
}
}

private[oap] class GazelleDriverPlugin extends DriverPlugin {
override def init(sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = {
val conf = pluginContext.conf()
setPredefinedConfigs(conf)
Collections.emptyMap()
}

def setPredefinedConfigs(conf: SparkConf): Unit = {
if (conf.contains(SPARK_SESSION_EXTS_KEY)) {
throw new IllegalArgumentException("Spark extensions are already specified before " +
"enabling Gazelle plugin: " + conf.get(GazellePlugin.SPARK_SESSION_EXTS_KEY))
}
conf.set(SPARK_SESSION_EXTS_KEY, GAZELLE_SESSION_EXTENSION_NAME)
}
}

private[oap] class GazelleExecutorPlugin extends ExecutorPlugin {
// N/A
}

private[oap] class GazelleSessionsExtensions extends (SparkSessionExtensions => Unit) {
override def apply(exts: SparkSessionExtensions): Unit = {
GazellePlugin.DEFAULT_INJECTORS.foreach(injector => injector.inject(exts))
}
}

private[oap] class SparkConfImplicits(conf: SparkConf) {
def enableGazellePlugin(): SparkConf = {
if (conf.contains(GazellePlugin.SPARK_SQL_PLUGINS_KEY)) {
throw new IllegalArgumentException("A Spark plugin is already specified before enabling " +
"Gazelle plugin: " + conf.get(GazellePlugin.SPARK_SQL_PLUGINS_KEY))
}
conf.set(GazellePlugin.SPARK_SQL_PLUGINS_KEY, GazellePlugin.GAZELLE_PLUGIN_NAME)
}
}

private[oap] trait GazelleSparkExtensionsInjector {
def inject(extensions: SparkSessionExtensions)
}

private[oap] object GazellePlugin {
// To enable GazellePlugin in production, set "spark.plugins=com.intel.oap.GazellePlugin"
val SPARK_SQL_PLUGINS_KEY: String = "spark.plugins"
val GAZELLE_PLUGIN_NAME: String = Objects.requireNonNull(classOf[GazellePlugin]
.getCanonicalName)
val SPARK_SESSION_EXTS_KEY: String = StaticSQLConf.SPARK_SESSION_EXTENSIONS.key
val GAZELLE_SESSION_EXTENSION_NAME: String = Objects.requireNonNull(
classOf[GazelleSessionsExtensions].getCanonicalName)

/**
* Specify all injectors that Gazelle is using in following list.
*/
val DEFAULT_INJECTORS: List[GazelleSparkExtensionsInjector] = List(
ColumnarOverrides,
StrategyOverrides
)

implicit def sparkConfImplicit(conf: SparkConf): SparkConfImplicits = {
new SparkConfImplicits(conf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.internal.SQLConf

case class ColumnarNumaBindingInfo(
case class GazelleNumaBindingInfo(
enableNumaBinding: Boolean,
totalCoreRange: Array[String] = null,
numCoresPerExecutor: Int = -1) {}

class ColumnarPluginConfig(conf: SQLConf) extends Logging {
class GazellePluginConfig(conf: SQLConf) extends Logging {
def getCpu(): Boolean = {
val source = scala.io.Source.fromFile("/proc/cpuinfo")
val lines =
try source.mkString
finally source.close()
//TODO(): check CPU flags to enable/disable AVX512
return true
if (lines.contains("GenuineIntel")) {
return true
} else {
Expand Down Expand Up @@ -78,6 +79,10 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging {
val enableArrowColumnarToRow: Boolean =
conf.getConfString("spark.oap.sql.columnar.columnartorow", "true").toBoolean && enableCpu

val forceShuffledHashJoin: Boolean =
conf.getConfString("spark.oap.sql.columnar.forceshuffledhashjoin", "false").toBoolean &&
enableCpu

// enable or disable columnar sortmergejoin
// this should be set with preferSortMergeJoin=false
val enableColumnarSortMergeJoin: Boolean =
Expand Down Expand Up @@ -161,42 +166,42 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging {
val columnarShuffleUseCustomizedCompressionCodec: String =
conf.getConfString("spark.oap.sql.columnar.shuffle.customizedCompression.codec", "lz4")

val numaBindingInfo: ColumnarNumaBindingInfo = {
val numaBindingInfo: GazelleNumaBindingInfo = {
val enableNumaBinding: Boolean =
conf.getConfString("spark.oap.sql.columnar.numaBinding", "false").toBoolean
if (!enableNumaBinding) {
ColumnarNumaBindingInfo(false)
GazelleNumaBindingInfo(false)
} else {
val tmp = conf.getConfString("spark.oap.sql.columnar.coreRange", null)
if (tmp == null) {
ColumnarNumaBindingInfo(false)
GazelleNumaBindingInfo(false)
} else {
val numCores = conf.getConfString("spark.executor.cores", "1").toInt
val coreRangeList: Array[String] = tmp.split('|').map(_.trim)
ColumnarNumaBindingInfo(true, coreRangeList, numCores)
GazelleNumaBindingInfo(true, coreRangeList, numCores)
}

}
}
}

object ColumnarPluginConfig {
var ins: ColumnarPluginConfig = null
object GazellePluginConfig {
var ins: GazellePluginConfig = null
var random_temp_dir_path: String = null

/**
* @deprecated We should avoid caching this value in entire JVM. us
*/
@Deprecated
def getConf: ColumnarPluginConfig = synchronized {
def getConf: GazellePluginConfig = synchronized {
if (ins == null) {
ins = getSessionConf
}
ins
}

def getSessionConf: ColumnarPluginConfig = {
new ColumnarPluginConfig(SQLConf.get)
def getSessionConf: GazellePluginConfig = {
new GazellePluginConfig(SQLConf.get)
}

def getBatchSize: Int = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.intel.oap.execution

import com.intel.oap.ColumnarPluginConfig
import com.intel.oap.GazellePluginConfig
import com.intel.oap.expression._
import com.intel.oap.vectorized._
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -35,7 +35,7 @@ import org.apache.spark.{SparkConf, TaskContext}
import org.apache.arrow.gandiva.expression._
import org.apache.arrow.vector.types.pojo.ArrowType
import com.google.common.collect.Lists
import com.intel.oap.ColumnarPluginConfig
import com.intel.oap.GazellePluginConfig
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils;

case class ColumnarConditionProjectExec(
Expand All @@ -48,7 +48,7 @@ case class ColumnarConditionProjectExec(
with AliasAwareOutputPartitioning
with Logging {

val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo
val numaBindingInfo = GazellePluginConfig.getConf.numaBindingInfo

val sparkConf: SparkConf = sparkContext.getConf

Expand Down Expand Up @@ -249,7 +249,7 @@ case class ColumnarConditionProjectExec(
numInputBatches.set(0)

child.executeColumnar().mapPartitions { iter =>
ColumnarPluginConfig.getConf
GazellePluginConfig.getConf
ExecutorManager.tryTaskSet(numaBindingInfo)
val condProj = ColumnarConditionProjector.create(
condition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.intel.oap.execution

import com.intel.oap.ColumnarPluginConfig
import com.intel.oap.GazellePluginConfig
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan}
Expand All @@ -27,7 +27,7 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

class ColumnarBatchScanExec(output: Seq[AttributeReference], @transient scan: Scan)
extends BatchScanExec(output, scan) {
val tmpDir: String = ColumnarPluginConfig.getConf.tmpFile
val tmpDir: String = GazellePluginConfig.getConf.tmpFile
override def supportsColumnar(): Boolean = true
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package com.intel.oap.execution

import com.google.common.collect.Lists
import com.intel.oap.ColumnarPluginConfig
import com.intel.oap.GazellePluginConfig
import com.intel.oap.expression._
import com.intel.oap.vectorized.{ExpressionEvaluator, _}
import org.apache.arrow.gandiva.expression._
Expand Down Expand Up @@ -62,7 +62,7 @@ case class ColumnarBroadcastHashJoinExec(
with ShuffledJoin {

val sparkConf = sparkContext.getConf
val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo
val numaBindingInfo = GazellePluginConfig.getConf.numaBindingInfo
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"),
Expand Down Expand Up @@ -495,7 +495,7 @@ case class ColumnarBroadcastHashJoinExec(
def uploadAndListJars(signature: String): Seq[String] =
if (signature != "") {
if (sparkContext.listJars.filter(path => path.contains(s"${signature}.jar")).isEmpty) {
val tempDir = ColumnarPluginConfig.getRandomTempDir
val tempDir = GazellePluginConfig.getRandomTempDir
val jarFileName =
s"${tempDir}/tmp/spark-columnar-plugin-codegen-precompile-${signature}.jar"
sparkContext.addJar(jarFileName)
Expand Down Expand Up @@ -525,8 +525,8 @@ case class ColumnarBroadcastHashJoinExec(

streamedPlan.executeColumnar().mapPartitions { streamIter =>
ExecutorManager.tryTaskSet(numaBindingInfo)
ColumnarPluginConfig.getConf
val execTempDir = ColumnarPluginConfig.getTempFile
GazellePluginConfig.getConf
val execTempDir = GazellePluginConfig.getTempFile
val jarList = listJars.map(jarUrl => {
logWarning(s"Get Codegened library Jar ${jarUrl}")
UserAddedJarUtils.fetchJarFromSpark(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.intel.oap.execution

import com.intel.oap.ColumnarPluginConfig
import com.intel.oap.GazellePluginConfig
import com.intel.oap.vectorized._
import org.apache.spark._
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -47,7 +47,7 @@ class ColumnarDataSourceRDD(
inputSize: SQLMetric,
tmp_dir: String)
extends RDD[ColumnarBatch](sc, Nil) {
val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo
val numaBindingInfo = GazellePluginConfig.getConf.numaBindingInfo

override protected def getPartitions: Array[Partition] = {
inputPartitions.zipWithIndex.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.intel.oap.execution

import com.intel.oap.ColumnarPluginConfig
import com.intel.oap.GazellePluginConfig
import com.intel.oap.expression._
import com.intel.oap.vectorized._
import com.google.common.collect.Lists
Expand Down Expand Up @@ -73,7 +73,7 @@ case class ColumnarHashAggregateExec(
with AliasAwareOutputPartitioning {

val sparkConf = sparkContext.getConf
val numaBindingInfo = ColumnarPluginConfig.getConf.numaBindingInfo
val numaBindingInfo = GazellePluginConfig.getConf.numaBindingInfo
override def supportsColumnar = true

var resAttributes: Seq[Attribute] = resultExpressions.map(_.toAttribute)
Expand Down
Loading

0 comments on commit e88fc25

Please sign in to comment.