Skip to content

Commit

Permalink
Fix SortBuffer batchSize computation overflow (facebookincubator#10848)
Browse files Browse the repository at this point in the history
Summary:
Exception:
```
24/08/13 16:32:20 ERROR [Executor task launch worker for task 6588.0 in stage 2.0 (TID 138337)] util.TaskResources: Task 138337 failed by error:
org.apache.gluten.exception.GlutenException: org.apache.gluten.exception.GlutenException: Exception: VeloxUserError
Error Source: USER
Error Code: ARITHMETIC_ERROR
Reason: integer overflow: 18446744071588638181 * 8
Retriable: False
Function: checkedMultiply
File: /home/binweiyang/gluten/ep/build-velox/build/velox_ep/velox/common/base/CheckedArithmetic.h
Line: 51
Stack trace:
# 0  _ZN8facebook5velox7process10StackTraceC1Ei
# 1  _ZN8facebook5velox14VeloxExceptionC1EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bNS1_4TypeES7_
# 2  _ZN8facebook5velox6detail14veloxCheckFailINS0_14VeloxUserErrorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEEEvRKNS1_18VeloxCheckFailArgsET0_
# 3  _ZN8facebook5velox15checkedMultiplyImEET_RKS2_S4_PKc.constprop.0
# 4  _ZN8facebook5velox13AlignedBuffer8allocateIlEEN5boost13intrusive_ptrINS0_6BufferEEEmPNS0_6memory10MemoryPoolERKSt8optionalIT_E
# 5  _ZN8facebook5velox10BaseVector14createInternalERKSt10shared_ptrIKNS0_4TypeEEiPNS0_6memory10MemoryPoolE
# 6  _ZN8facebook5velox10BaseVector6createIS1_EESt10shared_ptrIT_ERKS3_IKNS0_4TypeEEiPNS0_6memory10MemoryPoolE
# 7  _ZN8facebook5velox10BaseVector14createInternalERKSt10shared_ptrIKNS0_4TypeEEiPNS0_6memory10MemoryPoolE
# 8  _ZN8facebook5velox4exec10SortBuffer13prepareOutputEj
# 9  _ZN8facebook5velox4exec10SortBuffer9getOutputEj
# 10 _ZN8facebook5velox4exec7OrderBy9getOutputEv
# 11 _ZN8facebook5velox4exec6Driver11runInternalERSt10shared_ptrIS2_ERS3_INS1_13BlockingStateEERS3_INS0_9RowVectorEE
# 12 _ZN8facebook5velox4exec6Driver4nextERSt10shared_ptrINS1_13BlockingStateEE
# 13 _ZN8facebook5velox4exec4Task4nextEPN5folly10SemiFutureINS3_4UnitEEE
# 14 _ZN6gluten24WholeStageResultIterator4nextEv
# 15 Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeHasNext
# 16 0x00007f5a281a8b28

	at org.apache.gluten.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:39)
	at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
	at org.apache.gluten.utils.iterator.IteratorsV1$InvocationFlowProtection.hasNext(IteratorsV1.scala:159)
	at org.apache.gluten.utils.iterator.IteratorsV1$IteratorCompleter.hasNext(IteratorsV1.scala:71)
	at org.apache.gluten.utils.iterator.IteratorsV1$PayloadCloser.hasNext(IteratorsV1.scala:37)
	at org.apache.gluten.utils.iterator.IteratorsV1$LifeTimeAccumulator.hasNext(IteratorsV1.scala:100)
	at scala.collection.Iterator.isEmpty(Iterator.scala:385)
	at scala.collection.Iterator.isEmpty$(Iterator.scala:385)
	at org.apache.gluten.utils.iterator.IteratorsV1$LifeTimeAccumulator.isEmpty(IteratorsV1.scala:90)
	at org.apache.gluten.execution.VeloxColumnarToRowExec$.toRowIterator(VeloxColumnarToRowExec.scala:108)
	at org.apache.gluten.execution.VeloxColumnarToRowExec.$anonfun$doExecuteInternal$1(VeloxColumnarToRowExec.scala:79)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:949)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:949)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1471)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
```
Debug message:
```
batchSize: -2112458117
input type is ROW<n0_0:BIGINT,n0_1:VARCHAR,n0_2:BIGINT,n0_3:BIGINT,n0_4:VARCHAR>
```
The batchSize is negative in SortBuffer `PrepareOutput`.
vector_size_t is int32_t but numInputRows_, numOutputRows_ and maxOutputRows is uint32_t. So when (numInputRows_ - numOutputRows_) is bigger than 0x7fffffff, error occurs. We can control the batch size but can't control the partition size. So we should use uint64_t data type for numInputRows_ and numOutputRows_.
Relevant to: apache/incubator-gluten#6823

Pull Request resolved: facebookincubator#10848

Reviewed By: amitkdutta

Differential Revision: D62091115

Pulled By: xiaoxmeng

fbshipit-source-id: 0be2e58ed40d71cee17025adcf6dd7fb441bda72
  • Loading branch information
jinchengchenghh authored and facebook-github-bot committed Sep 2, 2024
1 parent 6e52cbd commit 1fd5d60
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 4 deletions.
3 changes: 1 addition & 2 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,8 @@ void SortBuffer::spillOutput() {
void SortBuffer::prepareOutput(vector_size_t maxOutputRows) {
VELOX_CHECK_GT(maxOutputRows, 0);
VELOX_CHECK_GT(numInputRows_, numOutputRows_);

const vector_size_t batchSize =
std::min<vector_size_t>(numInputRows_ - numOutputRows_, maxOutputRows);
std::min<uint64_t>(numInputRows_ - numOutputRows_, maxOutputRows);
if (output_ != nullptr) {
VectorPtr output = std::move(output_);
BaseVector::prepareForReuse(output, batchSize);
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/SortBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class SortBuffer {
// sort buffer object.
bool noMoreInput_ = false;
// The number of received input rows.
size_t numInputRows_ = 0;
uint64_t numInputRows_ = 0;
// Used to store the input data in row format.
std::unique_ptr<RowContainer> data_;
std::vector<char*> sortedRows_;
Expand All @@ -123,7 +123,7 @@ class SortBuffer {
// 'data_->estimateRowSize()' across all accumulated data set.
std::optional<uint64_t> estimatedOutputRowSize_{};
// The number of rows that has been returned.
size_t numOutputRows_{0};
uint64_t numOutputRows_{0};
};

} // namespace facebook::velox::exec

0 comments on commit 1fd5d60

Please sign in to comment.