Skip to content

Commit

Permalink
[GLUTEN-7313][VL] Explicit Arrow transitions, part 4: explicit Arrow-…
Browse files Browse the repository at this point in the history
…to-Velox transition (#7392)
  • Loading branch information
zhztheplayer authored Oct 8, 2024
1 parent 3dceeb8 commit 8cb3e70
Show file tree
Hide file tree
Showing 30 changed files with 453 additions and 305 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.columnarbatch;

import org.apache.gluten.runtime.Runtime;
import org.apache.gluten.runtime.RuntimeAware;

public class VeloxColumnarBatchJniWrapper implements RuntimeAware {
private final Runtime runtime;

private VeloxColumnarBatchJniWrapper(Runtime runtime) {
this.runtime = runtime;
}

public static VeloxColumnarBatchJniWrapper create(Runtime runtime) {
return new VeloxColumnarBatchJniWrapper(runtime);
}

public native long from(long batch);

public native long compose(long[] batches);

@Override
public long rtHandle() {
return runtime.getHandle();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.columnarbatch;

import org.apache.gluten.runtime.Runtime;
import org.apache.gluten.runtime.Runtimes;

import com.google.common.base.Preconditions;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.SparkColumnarBatchUtil;

import java.util.Arrays;
import java.util.Objects;

public final class VeloxColumnarBatches {
public static final String COMPREHENSIVE_TYPE_VELOX = "velox";

public static void checkVeloxBatch(ColumnarBatch batch) {
final String comprehensiveType = ColumnarBatches.getComprehensiveLightBatchType(batch);
Preconditions.checkArgument(
Objects.equals(comprehensiveType, COMPREHENSIVE_TYPE_VELOX),
String.format(
"Expected comprehensive batch type %s, but got %s",
COMPREHENSIVE_TYPE_VELOX, comprehensiveType));
}

public static void checkNonVeloxBatch(ColumnarBatch batch) {
final String comprehensiveType = ColumnarBatches.getComprehensiveLightBatchType(batch);
Preconditions.checkArgument(
!Objects.equals(comprehensiveType, COMPREHENSIVE_TYPE_VELOX),
String.format("Comprehensive batch type is already %s", COMPREHENSIVE_TYPE_VELOX));
}

public static ColumnarBatch toVeloxBatch(ColumnarBatch input) {
checkNonVeloxBatch(input);
final Runtime runtime = Runtimes.contextInstance("VeloxColumnarBatches#toVeloxBatch");
final long handle = ColumnarBatches.getNativeHandle(input);
final long outHandle = VeloxColumnarBatchJniWrapper.create(runtime).from(handle);
final ColumnarBatch output = ColumnarBatches.create(outHandle);

// Follow input's reference count. This might be optimized using
// automatic clean-up or once the extensibility of ColumnarBatch is enriched
final long refCnt = ColumnarBatches.getRefCntLight(input);
final IndicatorVector giv = (IndicatorVector) output.column(0);
for (long i = 0; i < (refCnt - 1); i++) {
giv.retain();
}

// close the input one
for (long i = 0; i < refCnt; i++) {
input.close();
}

// Populate new vectors to input.
SparkColumnarBatchUtil.transferVectors(output, input);

return input;
}

/**
* Combine multiple columnar batches horizontally, assuming each of them is already offloaded.
* Otherwise {@link UnsupportedOperationException} will be thrown.
*/
public static ColumnarBatch compose(ColumnarBatch... batches) {
final Runtime runtime = Runtimes.contextInstance("VeloxColumnarBatches#compose");
final long[] handles =
Arrays.stream(batches).mapToLong(ColumnarBatches::getNativeHandle).toArray();
final long handle = VeloxColumnarBatchJniWrapper.create(runtime).compose(handles);
return ColumnarBatches.create(handle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@
*/
package org.apache.gluten.columnarbatch

import org.apache.gluten.execution.{RowToVeloxColumnarExec, VeloxColumnarToRowExec}
import org.apache.gluten.execution.{ArrowColumnarToVeloxColumnarExec, RowToVeloxColumnarExec, VeloxColumnarToRowExec}
import org.apache.gluten.extension.columnar.transition.{Convention, Transition}

object VeloxBatch extends Convention.BatchType {
fromRow(RowToVeloxColumnarExec.apply)
toRow(VeloxColumnarToRowExec.apply)
// TODO: Add explicit transitions between Arrow native batch and Velox batch.
// See https://github.com/apache/incubator-gluten/issues/7313.
fromBatch(ArrowBatches.ArrowNativeBatch, Transition.empty)
fromBatch(ArrowBatches.ArrowNativeBatch, ArrowColumnarToVeloxColumnarExec.apply)
toBatch(ArrowBatches.ArrowNativeBatch, Transition.empty)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.columnarbatch.{VeloxBatch, VeloxColumnarBatches}
import org.apache.gluten.columnarbatch.ArrowBatches.ArrowNativeBatch

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.vectorized.ColumnarBatch

case class ArrowColumnarToVeloxColumnarExec(override val child: SparkPlan)
extends ColumnarToColumnarExec(ArrowNativeBatch, VeloxBatch) {
override protected def mapIterator(in: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = {
in.map {
b =>
val out = VeloxColumnarBatches.toVeloxBatch(b)
out
}
}
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
ArrowColumnarToVeloxColumnarExec(child = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.utils

import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.runtime.Runtimes
Expand Down Expand Up @@ -145,13 +145,14 @@ object ExecUtil {
val pid = rangePartitioner.get.getPartition(partitionKeyExtractor(row))
pidVec.putInt(i, pid)
}
val pidBatch = ColumnarBatches.offload(
ArrowBufferAllocators.contextInstance(),
new ColumnarBatch(Array[ColumnVector](pidVec), cb.numRows))
val newHandle = ColumnarBatches.compose(pidBatch, cb)
val pidBatch = VeloxColumnarBatches.toVeloxBatch(
ColumnarBatches.offload(
ArrowBufferAllocators.contextInstance(),
new ColumnarBatch(Array[ColumnVector](pidVec), cb.numRows)))
val newBatch = VeloxColumnarBatches.compose(pidBatch, cb)
// Composed batch already hold pidBatch's shared ref, so close is safe.
ColumnarBatches.forceClose(pidBatch)
(0, ColumnarBatches.create(newHandle))
(0, newBatch)
})
.recyclePayload(p => ColumnarBatches.forceClose(p._2)) // FIXME why force close?
.create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.gluten.test.VeloxBackendTestBase;
import org.apache.gluten.vectorized.ArrowWritableColumnVector;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;
Expand Down Expand Up @@ -128,6 +129,40 @@ public void testOffloadAndLoadReadRow() {
});
}

@Test
public void testCompose() {
TaskResources$.MODULE$.runUnsafe(
() -> {
final int numRows = 20;
final ColumnarBatch batch1 = newArrowBatch("a boolean, b int", numRows);
final ColumnarBatch batch2 = newArrowBatch("b int, a boolean", numRows);
final ArrowWritableColumnVector col0 = (ArrowWritableColumnVector) batch1.column(0);
final ArrowWritableColumnVector col1 = (ArrowWritableColumnVector) batch1.column(1);
final ArrowWritableColumnVector col2 = (ArrowWritableColumnVector) batch2.column(0);
final ArrowWritableColumnVector col3 = (ArrowWritableColumnVector) batch2.column(1);
for (int j = 0; j < numRows; j++) {
col0.putBoolean(j, j % 2 == 0);
col1.putInt(j, 15 - j);
col2.putInt(j, 15 - j);
col3.putBoolean(j, j % 2 == 0);
}
ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(), batch1);
ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(), batch2);
VeloxColumnarBatches.toVeloxBatch(batch1);
VeloxColumnarBatches.toVeloxBatch(batch2);
final ColumnarBatch batch3 = VeloxColumnarBatches.compose(batch1, batch2);
Assert.assertEquals(
VeloxColumnarBatches.COMPREHENSIVE_TYPE_VELOX,
ColumnarBatches.getComprehensiveLightBatchType(batch3));

Assert.assertEquals(numRows, batch3.numRows());
Assert.assertEquals(4, batch3.numCols());
Assert.assertEquals(
"[false,14,14,false]\n[true,13,13,true]", ColumnarBatches.toString(batch3, 1, 2));
return null;
});
}

@Test
public void testToString() {
TaskResources$.MODULE$.runUnsafe(
Expand All @@ -146,7 +181,9 @@ public void testToString() {
structType = structType.add("b", DataTypes.IntegerType, true);
ColumnarBatch veloxBatch =
RowToVeloxColumnarExec.toColumnarBatchIterator(
JavaConverters.asScalaIterator(batch.rowIterator()), structType, numRows)
JavaConverters.<InternalRow>asScalaIterator(batch.rowIterator()),
structType,
numRows)
.next();
Assert.assertEquals("[true,15]\n[false,14]", ColumnarBatches.toString(veloxBatch, 0, 2));
Assert.assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.gluten.extension.columnar.transition
import org.apache.gluten.backendsapi.velox.VeloxListenerApi
import org.apache.gluten.columnarbatch.ArrowBatches.{ArrowJavaBatch, ArrowNativeBatch}
import org.apache.gluten.columnarbatch.VeloxBatch
import org.apache.gluten.execution.{LoadArrowDataExec, OffloadArrowDataExec, RowToVeloxColumnarExec, VeloxColumnarToRowExec}
import org.apache.gluten.execution.{ArrowColumnarToVeloxColumnarExec, LoadArrowDataExec, OffloadArrowDataExec, RowToVeloxColumnarExec, VeloxColumnarToRowExec}
import org.apache.gluten.extension.columnar.transition.Convention.BatchType.VanillaBatch
import org.apache.gluten.test.MockVeloxBackend

Expand Down Expand Up @@ -52,21 +52,21 @@ class VeloxTransitionSuite extends SharedSparkSession {
test("ArrowNative C2R - outputs row") {
val in = BatchLeaf(ArrowNativeBatch)
val out = Transitions.insertTransitions(in, outputsColumnar = false)
assert(out == VeloxColumnarToRowExec(BatchLeaf(ArrowNativeBatch)))
assert(out == ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatch))))
}

test("ArrowNative C2R - requires row input") {
val in = RowUnary(BatchLeaf(ArrowNativeBatch))
val out = Transitions.insertTransitions(in, outputsColumnar = false)
assert(out == RowUnary(VeloxColumnarToRowExec(BatchLeaf(ArrowNativeBatch))))
assert(out == RowUnary(ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatch)))))
}

test("ArrowNative R2C - requires Arrow input") {
val in = BatchUnary(ArrowNativeBatch, RowLeaf())
val out = Transitions.insertTransitions(in, outputsColumnar = false)
assert(
out == VeloxColumnarToRowExec(
BatchUnary(ArrowNativeBatch, RowToVeloxColumnarExec(RowLeaf()))))
out == ColumnarToRowExec(
LoadArrowDataExec(BatchUnary(ArrowNativeBatch, RowToVeloxColumnarExec(RowLeaf())))))
}

test("ArrowNative-to-Velox C2C") {
Expand All @@ -75,23 +75,27 @@ class VeloxTransitionSuite extends SharedSparkSession {
// No explicit transition needed for ArrowNative-to-Velox.
// FIXME: Add explicit transitions.
// See https://github.com/apache/incubator-gluten/issues/7313.
assert(out == VeloxColumnarToRowExec(BatchUnary(VeloxBatch, BatchLeaf(ArrowNativeBatch))))
assert(
out == VeloxColumnarToRowExec(
BatchUnary(VeloxBatch, ArrowColumnarToVeloxColumnarExec(BatchLeaf(ArrowNativeBatch)))))
}

test("Velox-to-ArrowNative C2C") {
val in = BatchUnary(ArrowNativeBatch, BatchLeaf(VeloxBatch))
val out = Transitions.insertTransitions(in, outputsColumnar = false)
assert(out == VeloxColumnarToRowExec(BatchUnary(ArrowNativeBatch, BatchLeaf(VeloxBatch))))
assert(
out == ColumnarToRowExec(
LoadArrowDataExec(BatchUnary(ArrowNativeBatch, BatchLeaf(VeloxBatch)))))
}

test("Vanilla-to-ArrowNative C2C") {
val in = BatchUnary(ArrowNativeBatch, BatchLeaf(VanillaBatch))
val out = Transitions.insertTransitions(in, outputsColumnar = false)
assert(
out == VeloxColumnarToRowExec(
BatchUnary(
out == ColumnarToRowExec(
LoadArrowDataExec(BatchUnary(
ArrowNativeBatch,
RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatch))))))
RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatch)))))))
}

test("ArrowNative-to-Vanilla C2C") {
Expand Down Expand Up @@ -127,7 +131,9 @@ class VeloxTransitionSuite extends SharedSparkSession {
val out = Transitions.insertTransitions(in, outputsColumnar = false)
assert(
out == VeloxColumnarToRowExec(
BatchUnary(VeloxBatch, OffloadArrowDataExec(BatchLeaf(ArrowJavaBatch)))))
BatchUnary(
VeloxBatch,
ArrowColumnarToVeloxColumnarExec(OffloadArrowDataExec(BatchLeaf(ArrowJavaBatch))))))
}

test("Velox-to-ArrowJava C2C") {
Expand Down
2 changes: 1 addition & 1 deletion cpp/core/compute/Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class Runtime : public std::enable_shared_from_this<Runtime> {

virtual std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t numRows) = 0;

virtual std::shared_ptr<ColumnarBatch> select(std::shared_ptr<ColumnarBatch>, std::vector<int32_t>) = 0;
virtual std::shared_ptr<ColumnarBatch> select(std::shared_ptr<ColumnarBatch>, const std::vector<int32_t>&) = 0;

virtual MemoryManager* memoryManager() {
return memoryManager_.get();
Expand Down
21 changes: 0 additions & 21 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -667,27 +667,6 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWra
JNI_METHOD_END(kInvalidObjectHandle)
}

JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_compose( // NOLINT
JNIEnv* env,
jobject wrapper,
jlongArray batchHandles) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

int handleCount = env->GetArrayLength(batchHandles);
auto safeArray = gluten::getLongArrayElementsSafe(env, batchHandles);

std::vector<std::shared_ptr<ColumnarBatch>> batches;
for (int i = 0; i < handleCount; ++i) {
int64_t handle = safeArray.elems()[i];
auto batch = ObjectStore::retrieve<ColumnarBatch>(handle);
batches.push_back(batch);
}
auto newBatch = CompositeColumnarBatch::create(std::move(batches));
return ctx->saveObject(newBatch);
JNI_METHOD_END(kInvalidObjectHandle)
}

JNIEXPORT void JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_exportToArrow( // NOLINT
JNIEnv* env,
jobject wrapper,
Expand Down
Loading

0 comments on commit 8cb3e70

Please sign in to comment.