Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SortBuffer batchSize computation overflow #10848

Closed

Conversation

jinchengchenghh
Copy link
Contributor

Exception:

24/08/13 16:32:20 ERROR [Executor task launch worker for task 6588.0 in stage 2.0 (TID 138337)] util.TaskResources: Task 138337 failed by error: 
org.apache.gluten.exception.GlutenException: org.apache.gluten.exception.GlutenException: Exception: VeloxUserError
Error Source: USER
Error Code: ARITHMETIC_ERROR
Reason: integer overflow: 18446744071588638181 * 8
Retriable: False
Function: checkedMultiply
File: /home/binweiyang/gluten/ep/build-velox/build/velox_ep/velox/common/base/CheckedArithmetic.h
Line: 51
Stack trace:
# 0  _ZN8facebook5velox7process10StackTraceC1Ei
# 1  _ZN8facebook5velox14VeloxExceptionC1EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bNS1_4TypeES7_
# 2  _ZN8facebook5velox6detail14veloxCheckFailINS0_14VeloxUserErrorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEEEvRKNS1_18VeloxCheckFailArgsET0_
# 3  _ZN8facebook5velox15checkedMultiplyImEET_RKS2_S4_PKc.constprop.0
# 4  _ZN8facebook5velox13AlignedBuffer8allocateIlEEN5boost13intrusive_ptrINS0_6BufferEEEmPNS0_6memory10MemoryPoolERKSt8optionalIT_E
# 5  _ZN8facebook5velox10BaseVector14createInternalERKSt10shared_ptrIKNS0_4TypeEEiPNS0_6memory10MemoryPoolE
# 6  _ZN8facebook5velox10BaseVector6createIS1_EESt10shared_ptrIT_ERKS3_IKNS0_4TypeEEiPNS0_6memory10MemoryPoolE
# 7  _ZN8facebook5velox10BaseVector14createInternalERKSt10shared_ptrIKNS0_4TypeEEiPNS0_6memory10MemoryPoolE
# 8  _ZN8facebook5velox4exec10SortBuffer13prepareOutputEj
# 9  _ZN8facebook5velox4exec10SortBuffer9getOutputEj
# 10 _ZN8facebook5velox4exec7OrderBy9getOutputEv
# 11 _ZN8facebook5velox4exec6Driver11runInternalERSt10shared_ptrIS2_ERS3_INS1_13BlockingStateEERS3_INS0_9RowVectorEE
# 12 _ZN8facebook5velox4exec6Driver4nextERSt10shared_ptrINS1_13BlockingStateEE
# 13 _ZN8facebook5velox4exec4Task4nextEPN5folly10SemiFutureINS3_4UnitEEE
# 14 _ZN6gluten24WholeStageResultIterator4nextEv
# 15 Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeHasNext
# 16 0x00007f5a281a8b28

	at org.apache.gluten.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:39)
	at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
	at org.apache.gluten.utils.iterator.IteratorsV1$InvocationFlowProtection.hasNext(IteratorsV1.scala:159)
	at org.apache.gluten.utils.iterator.IteratorsV1$IteratorCompleter.hasNext(IteratorsV1.scala:71)
	at org.apache.gluten.utils.iterator.IteratorsV1$PayloadCloser.hasNext(IteratorsV1.scala:37)
	at org.apache.gluten.utils.iterator.IteratorsV1$LifeTimeAccumulator.hasNext(IteratorsV1.scala:100)
	at scala.collection.Iterator.isEmpty(Iterator.scala:385)
	at scala.collection.Iterator.isEmpty$(Iterator.scala:385)
	at org.apache.gluten.utils.iterator.IteratorsV1$LifeTimeAccumulator.isEmpty(IteratorsV1.scala:90)
	at org.apache.gluten.execution.VeloxColumnarToRowExec$.toRowIterator(VeloxColumnarToRowExec.scala:108)
	at org.apache.gluten.execution.VeloxColumnarToRowExec.$anonfun$doExecuteInternal$1(VeloxColumnarToRowExec.scala:79)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:949)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:949)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1471)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Debug message:

batchSize: -2112458117
input type is ROW<n0_0:BIGINT,n0_1:VARCHAR,n0_2:BIGINT,n0_3:BIGINT,n0_4:VARCHAR>

The batchSize is negative in SortBuffer PrepareOutput.
vector_size_t is int32_t but numInputRows_, numOutputRows_ and maxOutputRows is uint32_t. So when (numInputRows_ - numOutputRows_) is bigger than 0x7fffffff, error occurs. We can control the batch size but can't control the partition size. So we should use uint64_t data type for numInputRows_ and numOutputRows_.
Relevant to: apache/incubator-gluten#6823

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Aug 27, 2024
Copy link

netlify bot commented Aug 27, 2024

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit 20797d5
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/66d51d1c1764df00086edadf

@jinchengchenghh
Copy link
Contributor Author

Can you help review? @mbasmanova Thanks! I don't add a unit test for it because if we need to sort a very big RowVector to trigger it, if you think it's ok, I will add one.

@@ -287,9 +287,11 @@ void SortBuffer::spillOutput() {
void SortBuffer::prepareOutput(uint32_t maxOutputRows) {
VELOX_CHECK_GT(maxOutputRows, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxOutputRows may be used as batchSize, we need to convert it to vector_size_t as check, to make sure it's less than 0x7fffffff

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will create a separate PR to solve it, it involves much refactor

std::min<vector_size_t>(numInputRows_ - numOutputRows_, maxOutputRows);
batchSizeMaybe > std::numeric_limits<vector_size_t>::max()
? maxOutputRows
: std::min<vector_size_t>(batchSizeMaybe, maxOutputRows);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use std::min<uint64_t>(batchSizeMaybe, maxOutputRows) since have the check above.

@jinchengchenghh
Copy link
Contributor Author

Can you help review this PR? Thanks! @mbasmanova

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinchengchenghh Thank you for fixing this issue. Some comments.

@@ -285,11 +285,13 @@ void SortBuffer::spillOutput() {
}

void SortBuffer::prepareOutput(uint32_t maxOutputRows) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did @FelixYBW common about needing to change the type of maxOutputRows to vector_size_t? If so, I agree.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks the purpose of vector_size_t is to use as data type of batch size. But currently there are lots of violation in the code.

const vector_size_t batchSize =
std::min<vector_size_t>(numInputRows_ - numOutputRows_, maxOutputRows);
batchSizeMaybe > std::numeric_limits<vector_size_t>::max()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxOutputRows is uint32_t, so can still overflow when cast to vector_size_t, no?

If it is hard to change the signature of this method, perhaps we can add a check that maxOutputRows doesn't exceed the max for vector_size_t

VELOX_CHECK_GT(numInputRows_, numOutputRows_);

const uint64_t batchSizeMaybe = numInputRows_ - numOutputRows_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is not obvious. Can we add a comment that we are trying to avoid overflow here?

I'm wondering if this is relatively common need and if we could use a helper function (+ a unit test) for this.

CC: @Yuhta @xiaoxmeng

@@ -285,11 +285,13 @@ void SortBuffer::spillOutput() {
}

void SortBuffer::prepareOutput(uint32_t maxOutputRows) {
VELOX_CHECK_GT(maxOutputRows, 0);
VELOX_CHECK_GT(static_cast<vector_size_t>(maxOutputRows), 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will overflow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check fails on overflow. Or we need to add another check
VELOX_CHECK_LE(maxOutputRows, 0x7fffffff);

Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinchengchenghh I assume we just need #10868? Thanks!

@FelixYBW
Copy link
Contributor

FelixYBW commented Aug 29, 2024

We should have some rule like all data type for row vector size should use vector_size_t (int32_t), all partition size should use partition_size_t (int64_t), etc.

@xiaoxmeng #10868 tries to replace all vector size data type as vector_size_t but there are too much work. It doesn't solve the overflow issue here.

Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinchengchenghh thanks % comment on the change.

const vector_size_t batchSize =
std::min<vector_size_t>(numInputRows_ - numOutputRows_, maxOutputRows);
batchSizeMaybe > std::numeric_limits<vector_size_t>::max()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about?

vector_size_t batchSize = maxOutputRows;
if (numOutputRows_ + batchSize > numInputRows_) {
   batchSize = numInputRows_ - numOutputRows_;
}
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numOutputRows_ + batchSize may overflow when numOutputRows_ is UINT64_MAX and numInputRows_ is UINT64_MAX - 1

Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@facebook-github-bot
Copy link
Contributor

@xiaoxmeng has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@facebook-github-bot
Copy link
Contributor

@xiaoxmeng merged this pull request in 1fd5d60.

Copy link

Conbench analyzed the 1 benchmark run on commit 1fd5d60d.

There were no benchmark performance regressions. 🎉

The full Conbench report has more details.

Joe-Abraham pushed a commit to Joe-Abraham/velox that referenced this pull request Sep 3, 2024
Summary:
Exception:
```
24/08/13 16:32:20 ERROR [Executor task launch worker for task 6588.0 in stage 2.0 (TID 138337)] util.TaskResources: Task 138337 failed by error:
org.apache.gluten.exception.GlutenException: org.apache.gluten.exception.GlutenException: Exception: VeloxUserError
Error Source: USER
Error Code: ARITHMETIC_ERROR
Reason: integer overflow: 18446744071588638181 * 8
Retriable: False
Function: checkedMultiply
File: /home/binweiyang/gluten/ep/build-velox/build/velox_ep/velox/common/base/CheckedArithmetic.h
Line: 51
Stack trace:
# 0  _ZN8facebook5velox7process10StackTraceC1Ei
# 1  _ZN8facebook5velox14VeloxExceptionC1EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bNS1_4TypeES7_
# 2  _ZN8facebook5velox6detail14veloxCheckFailINS0_14VeloxUserErrorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEEEvRKNS1_18VeloxCheckFailArgsET0_
# 3  _ZN8facebook5velox15checkedMultiplyImEET_RKS2_S4_PKc.constprop.0
# 4  _ZN8facebook5velox13AlignedBuffer8allocateIlEEN5boost13intrusive_ptrINS0_6BufferEEEmPNS0_6memory10MemoryPoolERKSt8optionalIT_E
# 5  _ZN8facebook5velox10BaseVector14createInternalERKSt10shared_ptrIKNS0_4TypeEEiPNS0_6memory10MemoryPoolE
# 6  _ZN8facebook5velox10BaseVector6createIS1_EESt10shared_ptrIT_ERKS3_IKNS0_4TypeEEiPNS0_6memory10MemoryPoolE
# 7  _ZN8facebook5velox10BaseVector14createInternalERKSt10shared_ptrIKNS0_4TypeEEiPNS0_6memory10MemoryPoolE
# 8  _ZN8facebook5velox4exec10SortBuffer13prepareOutputEj
# 9  _ZN8facebook5velox4exec10SortBuffer9getOutputEj
# 10 _ZN8facebook5velox4exec7OrderBy9getOutputEv
# 11 _ZN8facebook5velox4exec6Driver11runInternalERSt10shared_ptrIS2_ERS3_INS1_13BlockingStateEERS3_INS0_9RowVectorEE
# 12 _ZN8facebook5velox4exec6Driver4nextERSt10shared_ptrINS1_13BlockingStateEE
# 13 _ZN8facebook5velox4exec4Task4nextEPN5folly10SemiFutureINS3_4UnitEEE
# 14 _ZN6gluten24WholeStageResultIterator4nextEv
# 15 Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeHasNext
# 16 0x00007f5a281a8b28

	at org.apache.gluten.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:39)
	at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
	at org.apache.gluten.utils.iterator.IteratorsV1$InvocationFlowProtection.hasNext(IteratorsV1.scala:159)
	at org.apache.gluten.utils.iterator.IteratorsV1$IteratorCompleter.hasNext(IteratorsV1.scala:71)
	at org.apache.gluten.utils.iterator.IteratorsV1$PayloadCloser.hasNext(IteratorsV1.scala:37)
	at org.apache.gluten.utils.iterator.IteratorsV1$LifeTimeAccumulator.hasNext(IteratorsV1.scala:100)
	at scala.collection.Iterator.isEmpty(Iterator.scala:385)
	at scala.collection.Iterator.isEmpty$(Iterator.scala:385)
	at org.apache.gluten.utils.iterator.IteratorsV1$LifeTimeAccumulator.isEmpty(IteratorsV1.scala:90)
	at org.apache.gluten.execution.VeloxColumnarToRowExec$.toRowIterator(VeloxColumnarToRowExec.scala:108)
	at org.apache.gluten.execution.VeloxColumnarToRowExec.$anonfun$doExecuteInternal$1(VeloxColumnarToRowExec.scala:79)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:949)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:949)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1471)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
```
Debug message:
```
batchSize: -2112458117
input type is ROW<n0_0:BIGINT,n0_1:VARCHAR,n0_2:BIGINT,n0_3:BIGINT,n0_4:VARCHAR>
```
The batchSize is negative in SortBuffer `PrepareOutput`.
vector_size_t is int32_t but numInputRows_, numOutputRows_ and maxOutputRows is uint32_t. So when (numInputRows_ - numOutputRows_) is bigger than 0x7fffffff, error occurs. We can control the batch size but can't control the partition size. So we should use uint64_t data type for numInputRows_ and numOutputRows_.
Relevant to: apache/incubator-gluten#6823

Pull Request resolved: facebookincubator#10848

Reviewed By: amitkdutta

Differential Revision: D62091115

Pulled By: xiaoxmeng

fbshipit-source-id: 0be2e58ed40d71cee17025adcf6dd7fb441bda72
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. Merged
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants