Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-186]backport 1.1 batch2 (#214)
Browse files Browse the repository at this point in the history
* [NSE-185] Avoid unnecessary copying when simply projecting on fields (#187)

* [NSE-185] Avoid unnecessary copying when simply projecting on fields

* Avoid sharing buffers in output

* [NSE-211] IndexOutOfBoundsException during running TPC-DS Q2 (#212)

Closes #211

* fix missing decimal check (#219)

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* [NSE-227]fix issues from codescan (#225)

* fix code scan

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix code scan

* fix format

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix wrong patch

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* Klockwork issues fix

* fix format

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* more fixes

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

Co-authored-by: Rui Mo <rui.mo@intel.com>
Co-authored-by: Hongze Zhang <hongze.zhang@intel.com>

* allow to config batchsize in hashagg and wscg (#222)

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

Co-authored-by: Hongze Zhang <hongze.zhang@intel.com>
Co-authored-by: Rui Mo <rui.mo@intel.com>
  • Loading branch information
3 people authored Apr 2, 2021
1 parent 692d574 commit 198629c
Show file tree
Hide file tree
Showing 29 changed files with 459 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ object ArrowUtils {
val partitionColumns = ArrowWritableColumnVector.allocateColumns(rowCount, partitionSchema)
(0 until partitionColumns.length).foreach(i => {
ColumnVectorUtils.populate(partitionColumns(i), partitionValues, i)
partitionColumns(i).setValueCount(rowCount)
partitionColumns(i).setIsConstant()
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ public ArrowRecordBatch readNext() throws IOException {
ArrowRecordBatchBuilderImpl recordBatchBuilderImpl =
new ArrowRecordBatchBuilderImpl(recordBatchBuilder);
ArrowRecordBatch batch = recordBatchBuilderImpl.build();
if (batch == null) {
throw new IllegalArgumentException("failed to build record batch");
}
this.lastReadLength = batch.getLength();
return batch;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,10 @@ private static File moveFileFromJarToTemp(String tmpDir, String libraryToLoad) t
try (final InputStream is = JniUtils.class.getClassLoader().getResourceAsStream(libraryToLoad)) {
if (is == null) {
throw new FileNotFoundException(libraryToLoad);
} else {
try {
Files.copy(is, temp.toPath());
} catch (Exception e) {
}
}
try {
Files.copy(is, temp.toPath());
} catch (Exception e) {
}
}
return temp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,24 @@

package com.intel.oap.expression

import java.util.Collections
import java.util
import java.util.Objects
import java.util.concurrent.TimeUnit

import com.google.common.collect.Lists
import com.intel.oap.expression.ColumnarConditionProjector.{FieldOptimizedProjector, FilterProjector, ProjectorWrapper}
import com.intel.oap.vectorized.ArrowWritableColumnVector

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}

import org.apache.arrow.gandiva.evaluator._
import org.apache.arrow.gandiva.exceptions.GandivaException
import org.apache.arrow.gandiva.expression._
import org.apache.arrow.gandiva.ipc.GandivaTypes
import org.apache.arrow.gandiva.ipc.GandivaTypes.SelectionVectorType
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.memory.RootAllocator
Expand All @@ -42,9 +43,13 @@ import org.apache.arrow.vector.ipc.message.ArrowRecordBatch
import org.apache.arrow.vector.types.pojo.Schema
import org.apache.arrow.vector.types.pojo.Field
import org.apache.arrow.vector.types.pojo.ArrowType

import org.apache.arrow.memory.ArrowBuf
import org.apache.arrow.util.AutoCloseables
import org.apache.arrow.vector.ValueVector

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks._

class ColumnarConditionProjector(
condPrepareList: (TreeNode, ArrowType),
Expand Down Expand Up @@ -115,7 +120,7 @@ class ColumnarConditionProjector(
false
}
val projector = if (skip == false) {
createProjector(projectionArrowSchema, projPrepareList, withCond)
createProjector(projectionArrowSchema, resultArrowSchema, projPrepareList, withCond)
} else {
null
}
Expand All @@ -134,24 +139,25 @@ class ColumnarConditionProjector(
}

def createProjector(
arrowSchema: Schema,
projectionSchema: Schema,
resultSchema: Schema,
prepareList: Seq[(ExpressionTree, ArrowType)],
withCond: Boolean): Projector = synchronized {
withCond: Boolean): ProjectorWrapper = synchronized {
if (projector != null) {
return projector
}
val fieldNodesList = prepareList.map(_._1).toList.asJava
try {
if (withCond) {
Projector.make(arrowSchema, fieldNodesList, SelectionVectorType.SV_INT16)
new FilterProjector(projectionSchema, resultSchema, fieldNodesList, SelectionVectorType.SV_INT16)
} else {
Projector.make(arrowSchema, fieldNodesList)
new FieldOptimizedProjector(projectionSchema, resultSchema, fieldNodesList)
}
} catch {
case e =>
logError(
s"\noriginalInputAttributes is ${originalInputAttributes} ${originalInputAttributes.map(
_.dataType)}, \narrowSchema is ${arrowSchema}, \nProjection is ${prepareList.map(_._1.toProtobuf)}")
_.dataType)}, \nprojectionSchema is ${projectionSchema}, \nresultSchema is ${resultSchema}, \nProjection is ${prepareList.map(_._1.toProtobuf)}")
throw e
}
}
Expand Down Expand Up @@ -258,28 +264,19 @@ class ColumnarConditionProjector(
// for now, we either filter one columnarBatch who has valid rows or we only need to do project
// either scenario we will need to output one columnarBatch.
beforeEval = System.nanoTime()
val resultColumnVectors =
ArrowWritableColumnVector.allocateColumns(numRows, resultSchema).toArray
val outputVectors = resultColumnVectors
.map(columnVector => {
columnVector.getValueVector()
})
.toList
.asJava

val cols = projectOrdinalList.map(i => {
columnarBatch.column(i).asInstanceOf[ArrowWritableColumnVector].getValueVector()
})
input = ConverterUtils.createArrowRecordBatch(columnarBatch.numRows, cols)
if (conditioner != null) {
projector.evaluate(input, selectionVector, outputVectors);
val outputBatch = if (conditioner != null) {
projector.evaluate(input, numRows, selectionVector);
} else {
projector.evaluate(input, outputVectors);
projector.evaluate(input);
}

ConverterUtils.releaseArrowRecordBatch(input)
val outputBatch =
new ColumnarBatch(resultColumnVectors.map(_.asInstanceOf[ColumnVector]), numRows)

proc_time += ((System.nanoTime() - beforeEval) / (1000 * 1000))
resColumnarBatch = outputBatch
true
Expand Down Expand Up @@ -448,4 +445,190 @@ object ColumnarConditionProjector extends Logging {
numOutputRows,
procTime)
}

trait ProjectorWrapper {
def evaluate(recordBatch: ArrowRecordBatch): ColumnarBatch = {
throw new UnsupportedOperationException
}

def evaluate(recordBatch: ArrowRecordBatch, numRows: Int, selectionVector: SelectionVector): ColumnarBatch = {
throw new UnsupportedOperationException
}

def close(): Unit
}

/**
* Proxy projector that is optimized for field projections.
*/
class FieldOptimizedProjector(projectionSchema: Schema, resultSchema: Schema,
exprs: java.util.List[ExpressionTree]) extends ProjectorWrapper {

val fieldExprs = ListBuffer[(ExpressionTree, Int)]()
val fieldExprNames = new util.HashSet[String]()

/**
* nonFieldExprs may include fields that are already appeared in projection list.
* To avoid sharing same buffers over output columns.
*/
val nonFieldExprs = ListBuffer[(ExpressionTree, Int)]()

exprs.asScala.zipWithIndex.foreach {
case (expr, i) =>
val root = getRoot(expr)
if (fieldClazz.isInstance(root) && !fieldExprNames.contains(getField(root).getName)) {
fieldExprs.append((expr, i))
fieldExprNames.add(getField(root).getName)
} else {
nonFieldExprs.append((expr, i))
}
}

val fieldResultSchema = new Schema(
fieldExprs.map {
case (_, i) =>
resultSchema.getFields.get(i)
}.asJava)

val nonFieldResultSchema = new Schema(
nonFieldExprs.map {
case (_, i) =>
resultSchema.getFields.get(i)
}.asJava)

val nonFieldProjector: Option[Projector] =
if (nonFieldExprs.isEmpty) {
None
} else {
Some(
Projector.make(
projectionSchema, nonFieldExprs.map {
case (e, _) => e
}.toList.asJava))
}

override def evaluate(recordBatch: ArrowRecordBatch): ColumnarBatch = {
val numRows = recordBatch.getLength
val projectedAVs = new Array[ArrowWritableColumnVector](exprs.size())

// Execute expression-based projections
val nonFieldResultColumnVectors =
ArrowWritableColumnVector.allocateColumns(numRows,
ArrowUtils.fromArrowSchema(nonFieldResultSchema))

val outputVectors = nonFieldResultColumnVectors
.map(columnVector => {
columnVector.getValueVector
})
.toList
.asJava

nonFieldProjector.foreach {
_.evaluate(recordBatch, outputVectors)
}

var k: Int = 0
nonFieldExprs.foreach {
case (_, i) =>
projectedAVs(i) = nonFieldResultColumnVectors(k)
k += 1
}

val inAVs = ArrowWritableColumnVector.loadColumns(numRows, projectionSchema, recordBatch)

fieldExprs.foreach {
case (fieldExpr, i) =>
val field = getField(getRoot(fieldExpr))
var found = false
breakable {
for (j <- 0 until projectionSchema.getFields.size()) {
val projField = projectionSchema.getFields.get(j)
if (Objects.equals(field.getName, projField.getName)) {
// Found field in input schema
if (projectedAVs(i) != null) {
throw new IllegalStateException()
}
val vector = inAVs(j)
projectedAVs(i) = vector
vector.retain()
found = true
break
}
}
}
if (!found) {
throw new IllegalArgumentException("Field not found for projection: " + field.getName)
}
}

inAVs.foreach(_.close())

// Projected vector count check
projectedAVs.foreach {
arrowVector =>
if (arrowVector == null) {
throw new IllegalStateException()
}
}

val outputBatch =
new ColumnarBatch(projectedAVs.map(_.asInstanceOf[ColumnVector]), numRows)

outputBatch
}

override def close() = {
nonFieldProjector.foreach(_.close())
}
}

class FilterProjector(projectionSchema: Schema, resultSchema: Schema,
exprs: java.util.List[ExpressionTree],
selectionVectorType: GandivaTypes.SelectionVectorType) extends ProjectorWrapper {
val projector = Projector.make(projectionSchema, exprs, selectionVectorType)

override def evaluate(recordBatch: ArrowRecordBatch, numRows: Int,
selectionVector: SelectionVector): ColumnarBatch = {
val resultColumnVectors =
ArrowWritableColumnVector.allocateColumns(numRows, ArrowUtils.fromArrowSchema(resultSchema))

val outputVectors = resultColumnVectors
.map(columnVector => {
columnVector.getValueVector
})
.toList
.asJava

projector.evaluate(recordBatch, selectionVector, outputVectors)

val outputBatch =
new ColumnarBatch(resultColumnVectors.map(_.asInstanceOf[ColumnVector]), numRows)

outputBatch
}

override def close(): Unit = {
projector.close()
}
}

val treeClazz = classOf[ExpressionTree]
val rootField = treeClazz.getDeclaredField("root")
val fieldClazz = Class.forName("org.apache.arrow.gandiva.expression.FieldNode")
val fieldField = fieldClazz.getDeclaredField("field")

rootField.setAccessible(true)
fieldField.setAccessible(true)

def getRoot(expressionTree: ExpressionTree): TreeNode = {
rootField.get(expressionTree).asInstanceOf[TreeNode]
}

def getField(fieldNode: Any): Field = {
if (!fieldClazz.isInstance(fieldNode)) {
throw new IllegalArgumentException
}
fieldField.get(fieldNode).asInstanceOf[Field]

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ class TPCDSSuite extends QueryTest with SharedSparkSession {
val conf = super.sparkConf
conf.set("spark.memory.offHeap.size", String.valueOf(MAX_DIRECT_MEMORY))
.set("spark.sql.extensions", "com.intel.oap.ColumnarPlugin")
.set("spark.sql.codegen.wholeStage", "false")
.set("spark.sql.codegen.wholeStage", "true")
.set("spark.sql.sources.useV1SourceList", "")
.set("spark.sql.columnar.tmp_dir", "/tmp/")
.set("spark.sql.adaptive.enabled", "false")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.sql.columnar.sort.broadcastJoin", "true")
.set("spark.storage.blockManagerSlaveTimeoutMs", "3600000")
.set("spark.executor.heartbeatInterval", "3600000")
Expand All @@ -50,6 +50,7 @@ class TPCDSSuite extends QueryTest with SharedSparkSession {
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.unsafe.exceptionOnMemoryLeak", "false")
.set("spark.network.io.preferDirectBufs", "false")
.set("spark.sql.sources.useV1SourceList", "arrow,parquet")
return conf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,16 @@ class TPCDSTableGen(val spark: SparkSession, scale: Double, path: String)
case "web_page" => webPageSchema
case "web_site" => webSiteSchema
}
writeParquetTable(name, rows, schema)
val partitionBy: List[String] = name match {
case "catalog_sales" => List("cs_sold_date_sk")
case "web_sales" => List("ws_sold_date_sk")
case _ => List[String]()
}
writeParquetTable(name, rows, schema, partitionBy)
}

private def writeParquetTable(tableName: String, rows: List[Row], schema: StructType): Unit = {
private def writeParquetTable(tableName: String, rows: List[Row], schema: StructType,
partitionBy: List[String]): Unit = {
if (rows.isEmpty) {
return
}
Expand All @@ -87,10 +93,11 @@ class TPCDSTableGen(val spark: SparkSession, scale: Double, path: String)
}

convertedData.coalesce(1)
.write
.format("parquet")
.mode("overwrite")
.save(path + File.separator + tableName)
.write
.format("parquet")
.mode("overwrite")
.partitionBy(partitionBy.toArray: _*)
.save(path + File.separator + tableName)
}

override def gen(): Unit = {
Expand Down
Loading

0 comments on commit 198629c

Please sign in to comment.