Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-667] backport patches to 1.3 branch (#680)
Browse files Browse the repository at this point in the history
* Use Hadoop 3.2 as default hadoop.version (#652)

* [NSE-661] Add left/right trim in WSCG

* [NSE-675] Add instr expression support (#676)

* Initial commit

* Add the support in wscg

* [NSE-674] Add translate expression support (#672)

* Initial commit

* Add StringTranslate for subquery checking

* Code refactor

* Change arrow branch for unit test [will revert at last]

* Revert "Change arrow branch for unit test [will revert at last]"

This reverts commit bf74356.

* Port the function to wscg

* Change arrow branch for unit test [will revert at last]

* Format native code

* Fix a bug

* Revert "Change arrow branch for unit test [will revert at last]"

This reverts commit 3a53fa2.

* [NSE-681] Add floor & ceil expression support (#682)

* Initial commit

* Add ceil expression support

* Change arrow branch for unit test [will revert at last]

* Revert "Change arrow branch for unit test [will revert at last]"

This reverts commit 5fb2f4b.

* [NSE-647] Leverage buffered write in shuffle  (#648)

Closes #647

* [NSE-400] Native Arrow Row to columnar support (#637)

* Support ArrowRowToColumnar Optimization

* Replace expired code

* Add the code to convert recordbatch to columnarBatch

* Add unit test on java size

* Update the unit tests

* Fix the bug when reading decimal value from unsafeRow

* Use ArrowRowToColumnarExec instead of RowToArrowColumnarExec

* Use clang-format to standardize the CPP code format

* enable arrowRowToColumnarExec

* Add the metrics for ArrowRowToColumnarExec

* Add the metrics for ArrowRowToColumnarExec and unsupport Codegen

* Add parameter 'spark.oap.sql.columnar.rowtocolumnar' to control ArrowRowToColumnarExec

* Remove useless code

* Release arrowbuf after return recordbatch

* Fix the processTime metric for ArrowRowToColumnarExec

* Refine the code of ArrowRowToColumnar operator

* Add more metrics to detect the elapse time of each action

* Small fix about allocating buffer for unsafeRow

* Remove useless code

* Remove useless metrics for ArrowRowToColumnarExec

* Fall back to use java RowToColumnarExec when the row is not unsafeRow Type

* Fix the bug for decimal format

* fix format

Co-authored-by: Yuan Zhou <yuan.zhou@intel.com>

* fix leakage in rowtocolumn (#683)

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

Co-authored-by: Wei-Ting Chen <weiting.chen@intel.com>
Co-authored-by: PHILO-HE <feilong.he@intel.com>
Co-authored-by: Hongze Zhang <hongze.zhang@intel.com>
Co-authored-by: haojinIntel <hao.jin@intel.com>
  • Loading branch information
5 people authored Jan 9, 2022
1 parent da597b8 commit 3033f84
Show file tree
Hide file tree
Showing 19 changed files with 1,970 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.vectorized;

import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;

public class ArrowRowToColumnarJniWrapper {
public ArrowRowToColumnarJniWrapper() throws Exception {
JniUtils.getInstance();
}

public native byte[] nativeConvertRowToColumnar(
byte[] schema, long[] rowLength,
long bufferAddress, long memoryPollID) throws RuntimeException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ class GazellePluginConfig(conf: SQLConf) extends Logging {
val enableArrowColumnarToRow: Boolean =
conf.getConfString("spark.oap.sql.columnar.columnartorow", "true").toBoolean && enableCpu

val enableArrowRowToColumnar: Boolean =
conf.getConfString("spark.oap.sql.columnar.rowtocolumnar", "true").toBoolean && enableCpu

val forceShuffledHashJoin: Boolean =
conf.getConfString("spark.oap.sql.columnar.forceshuffledhashjoin", "false").toBoolean &&
enableCpu
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.execution

import java.util.concurrent.TimeUnit._

import scala.collection.mutable.ListBuffer
import com.intel.oap.expression.ConverterUtils
import com.intel.oap.sql.execution.RowToColumnConverter
import com.intel.oap.vectorized.{ArrowRowToColumnarJniWrapper, ArrowWritableColumnVector, CloseableColumnBatchIterator}
import org.apache.arrow.dataset.jni.UnsafeRecordBatchSerializer
import org.apache.arrow.memory.ArrowBuf
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch
import org.apache.arrow.vector.types.pojo.Schema
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.{RowToColumnarExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.v2.arrow.{SparkMemoryUtils, SparkSchemaUtils}
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils.UnsafeItr
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.vectorized.WritableColumnVector
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.TaskContext
import org.apache.spark.unsafe.Platform


class ArrowRowToColumnarExec(child: SparkPlan) extends RowToColumnarExec(child = child) {
override def nodeName: String = "ArrowRowToColumnarExec"

buildCheck()

def buildCheck(): Unit = {
val schema = child.schema
for (field <- schema.fields) {
field.dataType match {
case d: BooleanType =>
case d: ByteType =>
case d: ShortType =>
case d: IntegerType =>
case d: LongType =>
case d: FloatType =>
case d: DoubleType =>
case d: StringType =>
case d: DateType =>
case d: DecimalType =>
case d: TimestampType =>
case d: BinaryType =>
case _ =>
throw new UnsupportedOperationException(s"${field.dataType} " +
s"is not supported in ArrowColumnarToRowExec.")
}
}
}

override lazy val metrics: Map[String, SQLMetric] = Map(
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"),
"processTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to convert")
)

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numInputRows = longMetric("numInputRows")
val numOutputBatches = longMetric("numOutputBatches")
val processTime = longMetric("processTime")
// Instead of creating a new config we are reusing columnBatchSize. In the future if we do
// combine with some of the Arrow conversion tools we will need to unify some of the configs.
val numRows = conf.columnBatchSize
// This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
// plan (this) in the closure.
val localSchema = this.schema
child.execute().mapPartitions { rowIterator =>

val jniWrapper = new ArrowRowToColumnarJniWrapper()
val timeZoneId = SparkSchemaUtils.getLocalTimezoneID()
val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
var schemaBytes: Array[Byte] = null

if (rowIterator.hasNext) {
val res = new Iterator[ColumnarBatch] {
private val converters = new RowToColumnConverter(localSchema)
private var last_cb: ColumnarBatch = null
private var elapse: Long = 0
// Allocate large buffer to store the numRows rows
val bufferSize = 134217728 // 128M can estimator the buffer size based on the data type
val allocator = SparkMemoryUtils.contextAllocator()
val arrowBuf: ArrowBuf = allocator.buffer(bufferSize)
override def hasNext: Boolean = {
rowIterator.hasNext
}
TaskContext.get().addTaskCompletionListener[Unit] { _ =>
arrowBuf.close()
}
override def next(): ColumnarBatch = {
var isUnsafeRow = true
var firstRow = InternalRow.apply()
var hasNextRow = false
if (rowIterator.hasNext) {
firstRow = rowIterator.next()
hasNextRow = true
}
if (!firstRow.isInstanceOf[UnsafeRow]) {
isUnsafeRow = false
}

if (arrowBuf != null && isUnsafeRow) {
val rowLength = new ListBuffer[Long]()
var rowCount = 0
var offset = 0
val start = System.nanoTime()

assert(firstRow.isInstanceOf[UnsafeRow])
val unsafeRow = firstRow.asInstanceOf[UnsafeRow]
val sizeInBytes = unsafeRow.getSizeInBytes
Platform.copyMemory(unsafeRow.getBaseObject, unsafeRow.getBaseOffset,
null, arrowBuf.memoryAddress() + offset, sizeInBytes)
offset += sizeInBytes
rowLength += sizeInBytes.toLong
rowCount += 1

while (rowCount < numRows && rowIterator.hasNext) {
val row = rowIterator.next() // UnsafeRow
assert(row.isInstanceOf[UnsafeRow])
val unsafeRow = row.asInstanceOf[UnsafeRow]
val sizeInBytes = unsafeRow.getSizeInBytes
Platform.copyMemory(unsafeRow.getBaseObject, unsafeRow.getBaseOffset,
null, arrowBuf.memoryAddress() + offset, sizeInBytes)
offset += sizeInBytes
rowLength += sizeInBytes.toLong
rowCount += 1
}
if (schemaBytes == null) {
schemaBytes = ConverterUtils.getSchemaBytesBuf(arrowSchema)
}
val serializedRecordBatch = jniWrapper.nativeConvertRowToColumnar(schemaBytes, rowLength.toArray,
arrowBuf.memoryAddress(), SparkMemoryUtils.contextMemoryPool().getNativeInstanceId)
numInputRows += rowCount
numOutputBatches += 1
val rb = UnsafeRecordBatchSerializer.deserializeUnsafe(allocator, serializedRecordBatch)
val output = ConverterUtils.fromArrowRecordBatch(arrowSchema, rb)
val outputNumRows = rb.getLength
ConverterUtils.releaseArrowRecordBatch(rb)
last_cb = new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]).toArray, outputNumRows)
elapse = System.nanoTime() - start
processTime.set(NANOSECONDS.toMillis(elapse))
last_cb
} else {
logInfo("the buffer allocated failed and will fall back to non arrow optimization")
val vectors: Seq[WritableColumnVector] =
ArrowWritableColumnVector.allocateColumns(numRows, schema)
var rowCount = 0

val start = System.nanoTime()
converters.convert(firstRow, vectors.toArray)
elapse += System.nanoTime() - start
rowCount += 1

while (rowCount < numRows && rowIterator.hasNext) {
val row = rowIterator.next()
val start = System.nanoTime()
converters.convert(row, vectors.toArray)
elapse += System.nanoTime() - start
rowCount += 1
}
vectors.foreach(v => v.asInstanceOf[ArrowWritableColumnVector].setValueCount(rowCount))
processTime.set(NANOSECONDS.toMillis(elapse))
numInputRows += rowCount
numOutputBatches += 1
last_cb = new ColumnarBatch(vectors.toArray, rowCount)
last_cb
}
}
}
new CloseableColumnBatchIterator(res)
} else {
Iterator.empty
}
}
}

override def canEqual(other: Any): Boolean = other.isInstanceOf[ArrowRowToColumnarExec]

override def equals(other: Any): Boolean = other match {
case that: ArrowRowToColumnarExec =>
(that canEqual this) && super.equals(that)
case _ => false
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,22 @@ class ColumnarGetJsonObject(left: Expression, right: Expression, original: GetJs
}
}

class ColumnarStringInstr(left: Expression, right: Expression, original: StringInstr)
extends StringInstr(original.str, original.substr) with ColumnarExpression with Logging {

override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = {
val (left_node, _): (TreeNode, ArrowType) =
left.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val (right_node, _): (TreeNode, ArrowType) =
right.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val resultType = CodeGeneration.getResultType(dataType)
// Be careful about the argument order.
val funcNode = TreeBuilder.makeFunction("locate",
Lists.newArrayList(right_node, left_node,
TreeBuilder.makeLiteral(1.asInstanceOf[java.lang.Integer])), resultType)
(funcNode, resultType)
}
}

object ColumnarBinaryExpression {

Expand All @@ -116,6 +132,8 @@ object ColumnarBinaryExpression {
new ColumnarDateSub(left, right)
case g: GetJsonObject =>
new ColumnarGetJsonObject(left, right, g)
case instr: StringInstr =>
new ColumnarStringInstr(left, right, instr)
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,17 @@ object ColumnarExpressionConverter extends Logging {
ss.len,
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
expr)
case st: StringTranslate =>
logInfo(s"${expr.getClass} ${expr} is supported, no_cal is $check_if_no_calculation.")
ColumnarTernaryOperator.create(
replaceWithColumnarExpression(st.srcExpr, attributeSeq,
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
replaceWithColumnarExpression(st.matchingExpr, attributeSeq,
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
replaceWithColumnarExpression(st.replaceExpr, attributeSeq,
convertBoundRefToAttrRef = convertBoundRefToAttrRef),
expr
)
case u: UnaryExpression =>
logInfo(s"${expr.getClass} ${expr} is supported, no_cal is $check_if_no_calculation.")
if (!u.isInstanceOf[CheckOverflow] || !u.child.isInstanceOf[Divide]) {
Expand Down Expand Up @@ -384,8 +395,11 @@ object ColumnarExpressionConverter extends Logging {
containsSubquery(b.left) || containsSubquery(b.right)
case s: String2TrimExpression =>
s.children.map(containsSubquery).exists(_ == true)
case st: StringTranslate =>
st.children.map(containsSubquery).exists(_ == true)
case regexp: RegExpReplace =>
containsSubquery(regexp.subject) || containsSubquery(regexp.regexp) || containsSubquery(regexp.rep) || containsSubquery(regexp.pos)
containsSubquery(regexp.subject) || containsSubquery(
regexp.regexp) || containsSubquery(regexp.rep) || containsSubquery(regexp.pos)
case expr =>
throw new UnsupportedOperationException(
s" --> ${expr.getClass} | ${expr} is not currently supported.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,43 @@ class ColumnarStringSplit(child: Expression, regex: Expression,
}
}

class ColumnarStringTranslate(src: Expression, matchingExpr: Expression,
replaceExpr: Expression, original: Expression)
extends StringTranslate(src, matchingExpr, replaceExpr) with ColumnarExpression{
buildCheck

def buildCheck: Unit = {
val supportedTypes = List(StringType)
if (supportedTypes.indexOf(src.dataType) == -1) {
throw new UnsupportedOperationException(s"${src.dataType}" +
s" is not supported in ColumnarStringTranslate!")
}
}

override def doColumnarCodeGen(args: java.lang.Object) : (TreeNode, ArrowType) = {
val (str_node, _): (TreeNode, ArrowType) =
src.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val (matchingExpr_node, _): (TreeNode, ArrowType) =
matchingExpr.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val (replaceExpr_node, _): (TreeNode, ArrowType) =
replaceExpr.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val resultType = new ArrowType.Utf8()
(TreeBuilder.makeFunction("translate",
Lists.newArrayList(str_node, matchingExpr_node, replaceExpr_node), resultType), resultType)
}
}

object ColumnarTernaryOperator {

def create(str: Expression, pos: Expression, len: Expression,
def create(src: Expression, arg1: Expression, arg2: Expression,
original: Expression): Expression = original match {
case ss: Substring =>
new ColumnarSubString(str, pos, len, ss)
new ColumnarSubString(src, arg1, arg2, ss)
// Currently not supported.
// case a: StringSplit =>
// new ColumnarStringSplit(str, a.regex, a.limit, a)
case st: StringTranslate =>
new ColumnarStringTranslate(src, arg1, arg2, st)
case other =>
throw new UnsupportedOperationException(s"not currently supported: $other.")
}
Expand Down
Loading

0 comments on commit 3033f84

Please sign in to comment.