From 39112e4f2f8c1401ffa73c84398d3b8f0afa211a Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 16 Oct 2024 09:21:48 +0200 Subject: [PATCH] [SPARK-49946][CORE] Require an error class in `SparkOutOfMemoryError` ### What changes were proposed in this pull request? In the PR, I propose to remove the constructors that accept a plan string as an error message, and leave only constructors with the error classes. ### Why are the changes needed? To migrate the code which uses `SparkOutOfMemoryError` on new error framework. ### Does this PR introduce _any_ user-facing change? No, it shouldn't because the exception is supposed to raised by Spark. ### How was this patch tested? By running the modified test suites: ``` $ build/sbt "core/testOnly *ExecutorSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #48442 from MaxGekk/req-error-cond-SparkOutOfMemoryError. Authored-by: Max Gekk Signed-off-by: Max Gekk --- .../main/resources/error/error-conditions.json | 15 +++++++++++++++ .../spark/memory/SparkOutOfMemoryError.java | 8 -------- .../apache/spark/memory/TaskMemoryManager.java | 16 +++++++--------- .../unsafe/sort/UnsafeInMemorySorter.java | 3 ++- .../apache/spark/executor/ExecutorSuite.scala | 10 ++++++++-- .../spark/sql/errors/QueryExecutionErrors.scala | 3 ++- .../execution/aggregate/HashAggregateExec.scala | 2 +- .../aggregate/TungstenAggregationIterator.scala | 4 +++- 8 files changed, 38 insertions(+), 23 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index d9880899347a3..502558c21faa9 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -8711,6 +8711,21 @@ "Doesn't support month or year interval: " ] }, + "_LEGACY_ERROR_TEMP_3300" : { + "message" : [ + "error while calling spill() on : " + ] + }, + "_LEGACY_ERROR_TEMP_3301" : { + "message" : [ + "Not enough memory to grow pointer array" + ] + }, + "_LEGACY_ERROR_TEMP_3302" : { + "message" : [ + "No enough memory for aggregation" + ] + }, "_LEGACY_ERROR_USER_RAISED_EXCEPTION" : { "message" : [ "" diff --git a/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java index fa71eb066ff89..0e35ebecfd270 100644 --- a/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java +++ b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java @@ -32,14 +32,6 @@ public final class SparkOutOfMemoryError extends OutOfMemoryError implements Spa String errorClass; Map messageParameters; - public SparkOutOfMemoryError(String s) { - super(s); - } - - public SparkOutOfMemoryError(OutOfMemoryError e) { - super(e.getMessage()); - } - public SparkOutOfMemoryError(String errorClass, Map messageParameters) { super(SparkThrowableHelper.getMessage(errorClass, messageParameters)); this.errorClass = errorClass; diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index df224bc902bff..bd9f58bf7415f 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -21,13 +21,7 @@ import java.io.InterruptedIOException; import java.io.IOException; import java.nio.channels.ClosedByInterruptException; -import java.util.Arrays; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; +import java.util.*; import com.google.common.annotations.VisibleForTesting; @@ -291,8 +285,12 @@ private long trySpillAndAcquire( logger.error("error while calling spill() on {}", e, MDC.of(LogKeys.MEMORY_CONSUMER$.MODULE$, consumerToSpill)); // checkstyle.off: RegexpSinglelineJava - throw new SparkOutOfMemoryError("error while calling spill() on " + consumerToSpill + " : " - + e.getMessage()); + throw new SparkOutOfMemoryError( + "_LEGACY_ERROR_TEMP_3300", + new HashMap() {{ + put("consumerToSpill", consumerToSpill.toString()); + put("message", e.getMessage()); + }}); // checkstyle.on: RegexpSinglelineJava } } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 7579c0aefb250..761ced66f78cf 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -18,6 +18,7 @@ package org.apache.spark.util.collection.unsafe.sort; import java.util.Comparator; +import java.util.HashMap; import java.util.LinkedList; import javax.annotation.Nullable; @@ -215,7 +216,7 @@ public void expandPointerArray(LongArray newArray) { if (array != null) { if (newArray.size() < array.size()) { // checkstyle.off: RegexpSinglelineJava - throw new SparkOutOfMemoryError("Not enough memory to grow pointer array"); + throw new SparkOutOfMemoryError("_LEGACY_ERROR_TEMP_3301", new HashMap()); // checkstyle.on: RegexpSinglelineJava } Platform.copyMemory( diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 805e7ca467497..fa13092dc47aa 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -21,7 +21,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.lang.Thread.UncaughtExceptionHandler import java.net.URL import java.nio.ByteBuffer -import java.util.Properties +import java.util.{HashMap, Properties} import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean @@ -522,7 +522,13 @@ class ExecutorSuite extends SparkFunSuite testThrowable(new OutOfMemoryError(), depthToCheck, isFatal = true) testThrowable(new InterruptedException(), depthToCheck, isFatal = false) testThrowable(new RuntimeException("test"), depthToCheck, isFatal = false) - testThrowable(new SparkOutOfMemoryError("test"), depthToCheck, isFatal = false) + testThrowable( + new SparkOutOfMemoryError( + "_LEGACY_ERROR_USER_RAISED_EXCEPTION", + new HashMap[String, String]() { + put("errorMessage", "test") + }), + depthToCheck, isFatal = false) } // Verify we can handle the cycle in the exception chain diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 43fc0b567dcc2..ebcc98a3af27a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1112,7 +1112,8 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE def cannotAcquireMemoryToBuildUnsafeHashedRelationError(): Throwable = { new SparkOutOfMemoryError( - "_LEGACY_ERROR_TEMP_2107") + "_LEGACY_ERROR_TEMP_2107", + new java.util.HashMap[String, String]()) } def rowLargerThan256MUnsupportedError(): SparkUnsupportedOperationException = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 8f2b7ca5cba25..750b74aab384f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -682,7 +682,7 @@ case class HashAggregateExec( | $unsafeRowKeys, $unsafeRowKeyHash); | if ($unsafeRowBuffer == null) { | // failed to allocate the first page - | throw new $oomeClassName("No enough memory for aggregation"); + | throw new $oomeClassName("_LEGACY_ERROR_TEMP_3302", new java.util.HashMap()); | } |} """.stripMargin diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index 1ebf0d143bd1f..2f1cda9d0f9be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.aggregate +import java.util + import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.memory.SparkOutOfMemoryError @@ -210,7 +212,7 @@ class TungstenAggregationIterator( if (buffer == null) { // failed to allocate the first page // scalastyle:off throwerror - throw new SparkOutOfMemoryError("No enough memory for aggregation") + throw new SparkOutOfMemoryError("_LEGACY_ERROR_TEMP_3302", new util.HashMap()) // scalastyle:on throwerror } }