Skip to content

Commit

Permalink
[SPARK-49946][CORE] Require an error class in SparkOutOfMemoryError
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
In the PR, I propose to remove the constructors that accept a plan string as an error message, and leave only constructors with the error classes.

### Why are the changes needed?
To migrate the code which uses `SparkOutOfMemoryError` on new error framework.

### Does this PR introduce _any_ user-facing change?
No, it shouldn't because the exception is supposed to raised by Spark.

### How was this patch tested?
By running the modified test suites:
```
$ build/sbt "core/testOnly *ExecutorSuite"
```

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#48442 from MaxGekk/req-error-cond-SparkOutOfMemoryError.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
MaxGekk committed Oct 16, 2024
1 parent 2a13011 commit 39112e4
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 23 deletions.
15 changes: 15 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -8711,6 +8711,21 @@
"Doesn't support month or year interval: <interval>"
]
},
"_LEGACY_ERROR_TEMP_3300" : {
"message" : [
"error while calling spill() on <consumerToSpill> : <message>"
]
},
"_LEGACY_ERROR_TEMP_3301" : {
"message" : [
"Not enough memory to grow pointer array"
]
},
"_LEGACY_ERROR_TEMP_3302" : {
"message" : [
"No enough memory for aggregation"
]
},
"_LEGACY_ERROR_USER_RAISED_EXCEPTION" : {
"message" : [
"<errorMessage>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ public final class SparkOutOfMemoryError extends OutOfMemoryError implements Spa
String errorClass;
Map<String, String> messageParameters;

public SparkOutOfMemoryError(String s) {
super(s);
}

public SparkOutOfMemoryError(OutOfMemoryError e) {
super(e.getMessage());
}

public SparkOutOfMemoryError(String errorClass, Map<String, String> messageParameters) {
super(SparkThrowableHelper.getMessage(errorClass, messageParameters));
this.errorClass = errorClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,7 @@
import java.io.InterruptedIOException;
import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.*;

import com.google.common.annotations.VisibleForTesting;

Expand Down Expand Up @@ -291,8 +285,12 @@ private long trySpillAndAcquire(
logger.error("error while calling spill() on {}", e,
MDC.of(LogKeys.MEMORY_CONSUMER$.MODULE$, consumerToSpill));
// checkstyle.off: RegexpSinglelineJava
throw new SparkOutOfMemoryError("error while calling spill() on " + consumerToSpill + " : "
+ e.getMessage());
throw new SparkOutOfMemoryError(
"_LEGACY_ERROR_TEMP_3300",
new HashMap<String, String>() {{
put("consumerToSpill", consumerToSpill.toString());
put("message", e.getMessage());
}});
// checkstyle.on: RegexpSinglelineJava
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.util.collection.unsafe.sort;

import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -215,7 +216,7 @@ public void expandPointerArray(LongArray newArray) {
if (array != null) {
if (newArray.size() < array.size()) {
// checkstyle.off: RegexpSinglelineJava
throw new SparkOutOfMemoryError("Not enough memory to grow pointer array");
throw new SparkOutOfMemoryError("_LEGACY_ERROR_TEMP_3301", new HashMap());
// checkstyle.on: RegexpSinglelineJava
}
Platform.copyMemory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.lang.Thread.UncaughtExceptionHandler
import java.net.URL
import java.nio.ByteBuffer
import java.util.Properties
import java.util.{HashMap, Properties}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean

Expand Down Expand Up @@ -522,7 +522,13 @@ class ExecutorSuite extends SparkFunSuite
testThrowable(new OutOfMemoryError(), depthToCheck, isFatal = true)
testThrowable(new InterruptedException(), depthToCheck, isFatal = false)
testThrowable(new RuntimeException("test"), depthToCheck, isFatal = false)
testThrowable(new SparkOutOfMemoryError("test"), depthToCheck, isFatal = false)
testThrowable(
new SparkOutOfMemoryError(
"_LEGACY_ERROR_USER_RAISED_EXCEPTION",
new HashMap[String, String]() {
put("errorMessage", "test")
}),
depthToCheck, isFatal = false)
}

// Verify we can handle the cycle in the exception chain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,8 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE

def cannotAcquireMemoryToBuildUnsafeHashedRelationError(): Throwable = {
new SparkOutOfMemoryError(
"_LEGACY_ERROR_TEMP_2107")
"_LEGACY_ERROR_TEMP_2107",
new java.util.HashMap[String, String]())
}

def rowLargerThan256MUnsupportedError(): SparkUnsupportedOperationException = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ case class HashAggregateExec(
| $unsafeRowKeys, $unsafeRowKeyHash);
| if ($unsafeRowBuffer == null) {
| // failed to allocate the first page
| throw new $oomeClassName("No enough memory for aggregation");
| throw new $oomeClassName("_LEGACY_ERROR_TEMP_3302", new java.util.HashMap());
| }
|}
""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.aggregate

import java.util

import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.memory.SparkOutOfMemoryError
Expand Down Expand Up @@ -210,7 +212,7 @@ class TungstenAggregationIterator(
if (buffer == null) {
// failed to allocate the first page
// scalastyle:off throwerror
throw new SparkOutOfMemoryError("No enough memory for aggregation")
throw new SparkOutOfMemoryError("_LEGACY_ERROR_TEMP_3302", new util.HashMap())
// scalastyle:on throwerror
}
}
Expand Down

0 comments on commit 39112e4

Please sign in to comment.