Skip to content

Commit

Permalink
full implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
advancedxy committed Mar 15, 2024
1 parent d88c7cb commit 47cb668
Show file tree
Hide file tree
Showing 11 changed files with 628 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ protected static CometVector getVector(
}
}

public static CometVector getVector(ValueVector vector, boolean useDecimal128) {
protected static CometVector getVector(ValueVector vector, boolean useDecimal128) {
return getVector(vector, useDecimal128, null);
}
}
10 changes: 10 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,16 @@ object CometConf {
"Comet will convert row-based data to columnar format before processing.")
.booleanConf
.createWithDefault(false)

val COMET_ROW_TO_COLUMNAR_SOURCE_NODE_LIST: ConfigEntry[Seq[String]] =
conf("spark.comet.rowToColumnar.sourceNodeList")
.doc(
"A comma-separated list of row-based data sources that will be converted to columnar " +
"format when 'spark.comet.rowToColumnar.enabled' is true")
.stringConf
.toSequence
.createWithDefault(Seq("Range,InMemoryTableScan"))

}

object ConfigHelpers {
Expand Down
16 changes: 16 additions & 0 deletions common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,20 @@ class NativeUtil {
case _ => throw new SparkException(s"Unsupported Arrow Vector: ${valueVector.getClass}")
}
}

}

object NativeUtil {
def rootAsBatch(arrowRoot: VectorSchemaRoot): ColumnarBatch = {
rootAsBatch(arrowRoot, null)
}

def rootAsBatch(arrowRoot: VectorSchemaRoot, provider: DictionaryProvider): ColumnarBatch = {
val vectors = (0 until arrowRoot.getFieldVectors.size()).map { i =>
val vector = arrowRoot.getFieldVectors.get(i)
// Native shuffle always uses decimal128.
CometVector.getVector(vector, true, provider)
}
new ColumnarBatch(vectors.toArray, arrowRoot.getRowCount)
}
}
12 changes: 2 additions & 10 deletions common/src/main/scala/org/apache/comet/vector/StreamReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ package org.apache.comet.vector

import java.nio.channels.ReadableByteChannel

import scala.collection.JavaConverters.collectionAsScalaIterableConverter

import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.{ArrowStreamReader, ReadChannel}
import org.apache.arrow.vector.ipc.message.MessageChannelReader
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* A reader that consumes Arrow data from an input channel, and produces Comet batches.
Expand All @@ -47,13 +45,7 @@ case class StreamReader(channel: ReadableByteChannel) extends AutoCloseable {
}

private def rootAsBatch(root: VectorSchemaRoot): ColumnarBatch = {
val columns = root.getFieldVectors.asScala.map { vector =>
// Native shuffle always uses decimal128.
CometVector.getVector(vector, true, arrowReader).asInstanceOf[ColumnVector]
}.toArray
val batch = new ColumnarBatch(columns)
batch.setNumRows(root.getRowCount)
batch
NativeUtil.rootAsBatch(root, arrowReader)
}

override def close(): Unit = {
Expand Down
Loading

0 comments on commit 47cb668

Please sign in to comment.