Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7313][VL] Explicit Arrow transitions, part 4: explicit Arrow-to-Velox transition #7392

Merged
merged 19 commits into from
Oct 8, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.columnarbatch;

import org.apache.gluten.runtime.Runtime;
import org.apache.gluten.runtime.RuntimeAware;

public class VeloxColumnarBatchJniWrapper implements RuntimeAware {
private final Runtime runtime;

private VeloxColumnarBatchJniWrapper(Runtime runtime) {
this.runtime = runtime;
}

public static VeloxColumnarBatchJniWrapper create(Runtime runtime) {
return new VeloxColumnarBatchJniWrapper(runtime);
}

public native long from(long batch);

public native long compose(long[] batches);

@Override
public long rtHandle() {
return runtime.getHandle();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.columnarbatch;

import org.apache.gluten.runtime.Runtime;
import org.apache.gluten.runtime.Runtimes;

import com.google.common.base.Preconditions;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.SparkColumnarBatchUtil;

import java.util.Arrays;
import java.util.Objects;

public final class VeloxColumnarBatches {
public static final String COMPREHENSIVE_TYPE_VELOX = "velox";

public static void checkVeloxBatch(ColumnarBatch batch) {
final String comprehensiveType = ColumnarBatches.getComprehensiveLightBatchType(batch);
Preconditions.checkArgument(
Objects.equals(comprehensiveType, COMPREHENSIVE_TYPE_VELOX),
String.format(
"Expected comprehensive batch type %s, but got %s",
COMPREHENSIVE_TYPE_VELOX, comprehensiveType));
}

public static void checkNonVeloxBatch(ColumnarBatch batch) {
final String comprehensiveType = ColumnarBatches.getComprehensiveLightBatchType(batch);
Preconditions.checkArgument(
!Objects.equals(comprehensiveType, COMPREHENSIVE_TYPE_VELOX),
String.format("Comprehensive batch type is already %s", COMPREHENSIVE_TYPE_VELOX));
}

public static ColumnarBatch toVeloxBatch(ColumnarBatch input) {
checkNonVeloxBatch(input);
final Runtime runtime = Runtimes.contextInstance("VeloxColumnarBatches#toVeloxBatch");
final long handle = ColumnarBatches.getNativeHandle(input);
final long outHandle = VeloxColumnarBatchJniWrapper.create(runtime).from(handle);
final ColumnarBatch output = ColumnarBatches.create(outHandle);

// Follow input's reference count. This might be optimized using
// automatic clean-up or once the extensibility of ColumnarBatch is enriched
final long refCnt = ColumnarBatches.getRefCntLight(input);
final IndicatorVector giv = (IndicatorVector) output.column(0);
for (long i = 0; i < (refCnt - 1); i++) {
giv.retain();
}

// close the input one
for (long i = 0; i < refCnt; i++) {
input.close();
}

// Populate new vectors to input.
SparkColumnarBatchUtil.transferVectors(output, input);

return input;
}

/**
* Combine multiple columnar batches horizontally, assuming each of them is already offloaded.
* Otherwise {@link UnsupportedOperationException} will be thrown.
*/
public static ColumnarBatch compose(ColumnarBatch... batches) {
final Runtime runtime = Runtimes.contextInstance("VeloxColumnarBatches#compose");
final long[] handles =
Arrays.stream(batches).mapToLong(ColumnarBatches::getNativeHandle).toArray();
final long handle = VeloxColumnarBatchJniWrapper.create(runtime).compose(handles);
return ColumnarBatches.create(handle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@
*/
package org.apache.gluten.columnarbatch

import org.apache.gluten.execution.{RowToVeloxColumnarExec, VeloxColumnarToRowExec}
import org.apache.gluten.execution.{ArrowColumnarToVeloxColumnarExec, RowToVeloxColumnarExec, VeloxColumnarToRowExec}
import org.apache.gluten.extension.columnar.transition.{Convention, Transition}

object VeloxBatch extends Convention.BatchType {
fromRow(RowToVeloxColumnarExec.apply)
toRow(VeloxColumnarToRowExec.apply)
// TODO: Add explicit transitions between Arrow native batch and Velox batch.
// See https://github.com/apache/incubator-gluten/issues/7313.
fromBatch(ArrowBatches.ArrowNativeBatch, Transition.empty)
fromBatch(ArrowBatches.ArrowNativeBatch, ArrowColumnarToVeloxColumnarExec.apply)
toBatch(ArrowBatches.ArrowNativeBatch, Transition.empty)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.columnarbatch.{VeloxBatch, VeloxColumnarBatches}
import org.apache.gluten.columnarbatch.ArrowBatches.ArrowNativeBatch

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.vectorized.ColumnarBatch

case class ArrowColumnarToVeloxColumnarExec(override val child: SparkPlan)
extends ColumnarToColumnarExec(ArrowNativeBatch, VeloxBatch) {
override protected def mapIterator(in: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = {
in.map {
b =>
val out = VeloxColumnarBatches.toVeloxBatch(b)
out
}
}
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
ArrowColumnarToVeloxColumnarExec(child = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.utils

import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.runtime.Runtimes
Expand Down Expand Up @@ -145,13 +145,14 @@ object ExecUtil {
val pid = rangePartitioner.get.getPartition(partitionKeyExtractor(row))
pidVec.putInt(i, pid)
}
val pidBatch = ColumnarBatches.offload(
ArrowBufferAllocators.contextInstance(),
new ColumnarBatch(Array[ColumnVector](pidVec), cb.numRows))
val newHandle = ColumnarBatches.compose(pidBatch, cb)
val pidBatch = VeloxColumnarBatches.toVeloxBatch(
ColumnarBatches.offload(
ArrowBufferAllocators.contextInstance(),
new ColumnarBatch(Array[ColumnVector](pidVec), cb.numRows)))
val newBatch = VeloxColumnarBatches.compose(pidBatch, cb)
// Composed batch already hold pidBatch's shared ref, so close is safe.
ColumnarBatches.forceClose(pidBatch)
(0, ColumnarBatches.create(newHandle))
(0, newBatch)
})
.recyclePayload(p => ColumnarBatches.forceClose(p._2)) // FIXME why force close?
.create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.gluten.test.VeloxBackendTestBase;
import org.apache.gluten.vectorized.ArrowWritableColumnVector;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;
Expand Down Expand Up @@ -128,6 +129,40 @@ public void testOffloadAndLoadReadRow() {
});
}

@Test
public void testCompose() {
TaskResources$.MODULE$.runUnsafe(
() -> {
final int numRows = 20;
final ColumnarBatch batch1 = newArrowBatch("a boolean, b int", numRows);
final ColumnarBatch batch2 = newArrowBatch("b int, a boolean", numRows);
final ArrowWritableColumnVector col0 = (ArrowWritableColumnVector) batch1.column(0);
final ArrowWritableColumnVector col1 = (ArrowWritableColumnVector) batch1.column(1);
final ArrowWritableColumnVector col2 = (ArrowWritableColumnVector) batch2.column(0);
final ArrowWritableColumnVector col3 = (ArrowWritableColumnVector) batch2.column(1);
for (int j = 0; j < numRows; j++) {
col0.putBoolean(j, j % 2 == 0);
col1.putInt(j, 15 - j);
col2.putInt(j, 15 - j);
col3.putBoolean(j, j % 2 == 0);
}
ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(), batch1);
ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(), batch2);
VeloxColumnarBatches.toVeloxBatch(batch1);
VeloxColumnarBatches.toVeloxBatch(batch2);
final ColumnarBatch batch3 = VeloxColumnarBatches.compose(batch1, batch2);
Assert.assertEquals(
VeloxColumnarBatches.COMPREHENSIVE_TYPE_VELOX,
ColumnarBatches.getComprehensiveLightBatchType(batch3));

Assert.assertEquals(numRows, batch3.numRows());
Assert.assertEquals(4, batch3.numCols());
Assert.assertEquals(
"[false,14,14,false]\n[true,13,13,true]", ColumnarBatches.toString(batch3, 1, 2));
return null;
});
}

@Test
public void testToString() {
TaskResources$.MODULE$.runUnsafe(
Expand All @@ -146,7 +181,9 @@ public void testToString() {
structType = structType.add("b", DataTypes.IntegerType, true);
ColumnarBatch veloxBatch =
RowToVeloxColumnarExec.toColumnarBatchIterator(
JavaConverters.asScalaIterator(batch.rowIterator()), structType, numRows)
JavaConverters.<InternalRow>asScalaIterator(batch.rowIterator()),
structType,
numRows)
.next();
Assert.assertEquals("[true,15]\n[false,14]", ColumnarBatches.toString(veloxBatch, 0, 2));
Assert.assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.gluten.extension.columnar.transition
import org.apache.gluten.backendsapi.velox.VeloxListenerApi
import org.apache.gluten.columnarbatch.ArrowBatches.{ArrowJavaBatch, ArrowNativeBatch}
import org.apache.gluten.columnarbatch.VeloxBatch
import org.apache.gluten.execution.{LoadArrowDataExec, OffloadArrowDataExec, RowToVeloxColumnarExec, VeloxColumnarToRowExec}
import org.apache.gluten.execution.{ArrowColumnarToVeloxColumnarExec, LoadArrowDataExec, OffloadArrowDataExec, RowToVeloxColumnarExec, VeloxColumnarToRowExec}
import org.apache.gluten.extension.columnar.transition.Convention.BatchType.VanillaBatch
import org.apache.gluten.test.MockVeloxBackend

Expand Down Expand Up @@ -52,21 +52,21 @@ class VeloxTransitionSuite extends SharedSparkSession {
test("ArrowNative C2R - outputs row") {
val in = BatchLeaf(ArrowNativeBatch)
val out = Transitions.insertTransitions(in, outputsColumnar = false)
assert(out == VeloxColumnarToRowExec(BatchLeaf(ArrowNativeBatch)))
assert(out == ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatch))))
}

test("ArrowNative C2R - requires row input") {
val in = RowUnary(BatchLeaf(ArrowNativeBatch))
val out = Transitions.insertTransitions(in, outputsColumnar = false)
assert(out == RowUnary(VeloxColumnarToRowExec(BatchLeaf(ArrowNativeBatch))))
assert(out == RowUnary(ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatch)))))
}

test("ArrowNative R2C - requires Arrow input") {
val in = BatchUnary(ArrowNativeBatch, RowLeaf())
val out = Transitions.insertTransitions(in, outputsColumnar = false)
assert(
out == VeloxColumnarToRowExec(
BatchUnary(ArrowNativeBatch, RowToVeloxColumnarExec(RowLeaf()))))
out == ColumnarToRowExec(
LoadArrowDataExec(BatchUnary(ArrowNativeBatch, RowToVeloxColumnarExec(RowLeaf())))))
}

test("ArrowNative-to-Velox C2C") {
Expand All @@ -75,23 +75,27 @@ class VeloxTransitionSuite extends SharedSparkSession {
// No explicit transition needed for ArrowNative-to-Velox.
// FIXME: Add explicit transitions.
// See https://github.com/apache/incubator-gluten/issues/7313.
assert(out == VeloxColumnarToRowExec(BatchUnary(VeloxBatch, BatchLeaf(ArrowNativeBatch))))
assert(
out == VeloxColumnarToRowExec(
BatchUnary(VeloxBatch, ArrowColumnarToVeloxColumnarExec(BatchLeaf(ArrowNativeBatch)))))
}

test("Velox-to-ArrowNative C2C") {
val in = BatchUnary(ArrowNativeBatch, BatchLeaf(VeloxBatch))
val out = Transitions.insertTransitions(in, outputsColumnar = false)
assert(out == VeloxColumnarToRowExec(BatchUnary(ArrowNativeBatch, BatchLeaf(VeloxBatch))))
assert(
out == ColumnarToRowExec(
LoadArrowDataExec(BatchUnary(ArrowNativeBatch, BatchLeaf(VeloxBatch)))))
}

test("Vanilla-to-ArrowNative C2C") {
val in = BatchUnary(ArrowNativeBatch, BatchLeaf(VanillaBatch))
val out = Transitions.insertTransitions(in, outputsColumnar = false)
assert(
out == VeloxColumnarToRowExec(
BatchUnary(
out == ColumnarToRowExec(
LoadArrowDataExec(BatchUnary(
ArrowNativeBatch,
RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatch))))))
RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatch)))))))
}

test("ArrowNative-to-Vanilla C2C") {
Expand Down Expand Up @@ -127,7 +131,9 @@ class VeloxTransitionSuite extends SharedSparkSession {
val out = Transitions.insertTransitions(in, outputsColumnar = false)
assert(
out == VeloxColumnarToRowExec(
BatchUnary(VeloxBatch, OffloadArrowDataExec(BatchLeaf(ArrowJavaBatch)))))
BatchUnary(
VeloxBatch,
ArrowColumnarToVeloxColumnarExec(OffloadArrowDataExec(BatchLeaf(ArrowJavaBatch))))))
}

test("Velox-to-ArrowJava C2C") {
Expand Down
2 changes: 1 addition & 1 deletion cpp/core/compute/Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class Runtime : public std::enable_shared_from_this<Runtime> {

virtual std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t numRows) = 0;

virtual std::shared_ptr<ColumnarBatch> select(std::shared_ptr<ColumnarBatch>, std::vector<int32_t>) = 0;
virtual std::shared_ptr<ColumnarBatch> select(std::shared_ptr<ColumnarBatch>, const std::vector<int32_t>&) = 0;

virtual MemoryManager* memoryManager() {
return memoryManager_.get();
Expand Down
21 changes: 0 additions & 21 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -667,27 +667,6 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWra
JNI_METHOD_END(kInvalidObjectHandle)
}

JNIEXPORT jlong JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_compose( // NOLINT
JNIEnv* env,
jobject wrapper,
jlongArray batchHandles) {
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

int handleCount = env->GetArrayLength(batchHandles);
auto safeArray = gluten::getLongArrayElementsSafe(env, batchHandles);

std::vector<std::shared_ptr<ColumnarBatch>> batches;
for (int i = 0; i < handleCount; ++i) {
int64_t handle = safeArray.elems()[i];
auto batch = ObjectStore::retrieve<ColumnarBatch>(handle);
batches.push_back(batch);
}
auto newBatch = CompositeColumnarBatch::create(std::move(batches));
return ctx->saveObject(newBatch);
JNI_METHOD_END(kInvalidObjectHandle)
}

JNIEXPORT void JNICALL Java_org_apache_gluten_columnarbatch_ColumnarBatchJniWrapper_exportToArrow( // NOLINT
JNIEnv* env,
jobject wrapper,
Expand Down
Loading
Loading