From 1fd5d60d80290b7b4176267f80804b8476fd0754 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Mon, 2 Sep 2024 10:01:08 -0700 Subject: [PATCH] Fix SortBuffer batchSize computation overflow (#10848) Summary: Exception: ``` 24/08/13 16:32:20 ERROR [Executor task launch worker for task 6588.0 in stage 2.0 (TID 138337)] util.TaskResources: Task 138337 failed by error: org.apache.gluten.exception.GlutenException: org.apache.gluten.exception.GlutenException: Exception: VeloxUserError Error Source: USER Error Code: ARITHMETIC_ERROR Reason: integer overflow: 18446744071588638181 * 8 Retriable: False Function: checkedMultiply File: /home/binweiyang/gluten/ep/build-velox/build/velox_ep/velox/common/base/CheckedArithmetic.h Line: 51 Stack trace: # 0 _ZN8facebook5velox7process10StackTraceC1Ei # 1 _ZN8facebook5velox14VeloxExceptionC1EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bNS1_4TypeES7_ # 2 _ZN8facebook5velox6detail14veloxCheckFailINS0_14VeloxUserErrorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEEEvRKNS1_18VeloxCheckFailArgsET0_ # 3 _ZN8facebook5velox15checkedMultiplyImEET_RKS2_S4_PKc.constprop.0 # 4 _ZN8facebook5velox13AlignedBuffer8allocateIlEEN5boost13intrusive_ptrINS0_6BufferEEEmPNS0_6memory10MemoryPoolERKSt8optionalIT_E # 5 _ZN8facebook5velox10BaseVector14createInternalERKSt10shared_ptrIKNS0_4TypeEEiPNS0_6memory10MemoryPoolE # 6 _ZN8facebook5velox10BaseVector6createIS1_EESt10shared_ptrIT_ERKS3_IKNS0_4TypeEEiPNS0_6memory10MemoryPoolE # 7 _ZN8facebook5velox10BaseVector14createInternalERKSt10shared_ptrIKNS0_4TypeEEiPNS0_6memory10MemoryPoolE # 8 _ZN8facebook5velox4exec10SortBuffer13prepareOutputEj # 9 _ZN8facebook5velox4exec10SortBuffer9getOutputEj # 10 _ZN8facebook5velox4exec7OrderBy9getOutputEv # 11 _ZN8facebook5velox4exec6Driver11runInternalERSt10shared_ptrIS2_ERS3_INS1_13BlockingStateEERS3_INS0_9RowVectorEE # 12 _ZN8facebook5velox4exec6Driver4nextERSt10shared_ptrINS1_13BlockingStateEE # 13 _ZN8facebook5velox4exec4Task4nextEPN5folly10SemiFutureINS3_4UnitEEE # 14 _ZN6gluten24WholeStageResultIterator4nextEv # 15 Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeHasNext # 16 0x00007f5a281a8b28 at org.apache.gluten.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:39) at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43) at org.apache.gluten.utils.iterator.IteratorsV1$InvocationFlowProtection.hasNext(IteratorsV1.scala:159) at org.apache.gluten.utils.iterator.IteratorsV1$IteratorCompleter.hasNext(IteratorsV1.scala:71) at org.apache.gluten.utils.iterator.IteratorsV1$PayloadCloser.hasNext(IteratorsV1.scala:37) at org.apache.gluten.utils.iterator.IteratorsV1$LifeTimeAccumulator.hasNext(IteratorsV1.scala:100) at scala.collection.Iterator.isEmpty(Iterator.scala:385) at scala.collection.Iterator.isEmpty$(Iterator.scala:385) at org.apache.gluten.utils.iterator.IteratorsV1$LifeTimeAccumulator.isEmpty(IteratorsV1.scala:90) at org.apache.gluten.execution.VeloxColumnarToRowExec$.toRowIterator(VeloxColumnarToRowExec.scala:108) at org.apache.gluten.execution.VeloxColumnarToRowExec.$anonfun$doExecuteInternal$1(VeloxColumnarToRowExec.scala:79) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:949) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:949) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374) at org.apache.spark.rdd.RDD.iterator(RDD.scala:338) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374) at org.apache.spark.rdd.RDD.iterator(RDD.scala:338) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374) at org.apache.spark.rdd.RDD.iterator(RDD.scala:338) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374) at org.apache.spark.rdd.RDD.iterator(RDD.scala:338) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1471) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` Debug message: ``` batchSize: -2112458117 input type is ROW ``` The batchSize is negative in SortBuffer `PrepareOutput`. vector_size_t is int32_t but numInputRows_, numOutputRows_ and maxOutputRows is uint32_t. So when (numInputRows_ - numOutputRows_) is bigger than 0x7fffffff, error occurs. We can control the batch size but can't control the partition size. So we should use uint64_t data type for numInputRows_ and numOutputRows_. Relevant to: https://github.com/apache/incubator-gluten/issues/6823 Pull Request resolved: https://github.com/facebookincubator/velox/pull/10848 Reviewed By: amitkdutta Differential Revision: D62091115 Pulled By: xiaoxmeng fbshipit-source-id: 0be2e58ed40d71cee17025adcf6dd7fb441bda72 --- velox/exec/SortBuffer.cpp | 3 +-- velox/exec/SortBuffer.h | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index 0d7b5e00ec99..31a1ef34f542 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -287,9 +287,8 @@ void SortBuffer::spillOutput() { void SortBuffer::prepareOutput(vector_size_t maxOutputRows) { VELOX_CHECK_GT(maxOutputRows, 0); VELOX_CHECK_GT(numInputRows_, numOutputRows_); - const vector_size_t batchSize = - std::min(numInputRows_ - numOutputRows_, maxOutputRows); + std::min(numInputRows_ - numOutputRows_, maxOutputRows); if (output_ != nullptr) { VectorPtr output = std::move(output_); BaseVector::prepareForReuse(output, batchSize); diff --git a/velox/exec/SortBuffer.h b/velox/exec/SortBuffer.h index cd02f966f9df..6da47871311c 100644 --- a/velox/exec/SortBuffer.h +++ b/velox/exec/SortBuffer.h @@ -102,7 +102,7 @@ class SortBuffer { // sort buffer object. bool noMoreInput_ = false; // The number of received input rows. - size_t numInputRows_ = 0; + uint64_t numInputRows_ = 0; // Used to store the input data in row format. std::unique_ptr data_; std::vector sortedRows_; @@ -123,7 +123,7 @@ class SortBuffer { // 'data_->estimateRowSize()' across all accumulated data set. std::optional estimatedOutputRowSize_{}; // The number of rows that has been returned. - size_t numOutputRows_{0}; + uint64_t numOutputRows_{0}; }; } // namespace facebook::velox::exec