Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into llm_error
Browse files Browse the repository at this point in the history
  • Loading branch information
itholic committed Aug 1, 2023
2 parents f941b4e + 4649140 commit 5095f9f
Show file tree
Hide file tree
Showing 346 changed files with 9,572 additions and 4,095 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ jobs:

breaking-changes-buf:
needs: [precondition]
if: always() && fromJson(needs.precondition.outputs.required).breaking-changes-buf == 'true'
if: (!cancelled()) && fromJson(needs.precondition.outputs.required).breaking-changes-buf == 'true'
# Change 'branch-3.5' to 'branch-4.0' in master branch after cutting branch-4.0 branch.
name: Breaking change detection with Buf (branch-3.5)
runs-on: ubuntu-22.04
Expand Down
3 changes: 2 additions & 1 deletion common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<artifactId>spark-common-utils_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<!--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public int connectionTimeoutMs() {
return defaultTimeoutMs < 0 ? 0 : (int) defaultTimeoutMs;
}

/** Connect creation timeout in milliseconds. Default 30 secs. */
/** Connect creation timeout in milliseconds. Default 120 secs. */
public int connectionCreationTimeoutMs() {
long connectionTimeoutS = TimeUnit.MILLISECONDS.toSeconds(connectionTimeoutMs());
long defaultTimeoutMs = JavaUtils.timeStringAsSec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.util.ByteArrayWritableChannel;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.util.*;

/**
* Jointly tests SparkSaslClient and SparkSaslServer, as both are black boxes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,11 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Locale;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.Unpooled;
import org.apache.commons.lang3.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -72,15 +68,15 @@ public static int nonNegativeHash(Object obj) {
* converted back to the same string through {@link #bytesToString(ByteBuffer)}.
*/
public static ByteBuffer stringToBytes(String s) {
return Unpooled.wrappedBuffer(s.getBytes(StandardCharsets.UTF_8)).nioBuffer();
return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
}

/**
* Convert the given byte buffer to a string. The resulting string can be
* converted back to the same byte buffer through {@link #stringToBytes(String)}.
*/
public static String bytesToString(ByteBuffer b) {
return Unpooled.wrappedBuffer(b).toString(StandardCharsets.UTF_8);
return StandardCharsets.UTF_8.decode(b.slice()).toString();
}

/**
Expand Down Expand Up @@ -191,7 +187,7 @@ private static File[] listFilesSafely(File file, FilenameFilter filter) throws I
}

private static boolean isSymlink(File file) throws IOException {
Preconditions.checkNotNull(file);
Objects.requireNonNull(file);
File fileInCanonicalDir = null;
if (file.getParent() == null) {
fileInCanonicalDir = file;
Expand All @@ -201,31 +197,35 @@ private static boolean isSymlink(File file) throws IOException {
return !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile());
}

private static final ImmutableMap<String, TimeUnit> timeSuffixes =
ImmutableMap.<String, TimeUnit>builder()
.put("us", TimeUnit.MICROSECONDS)
.put("ms", TimeUnit.MILLISECONDS)
.put("s", TimeUnit.SECONDS)
.put("m", TimeUnit.MINUTES)
.put("min", TimeUnit.MINUTES)
.put("h", TimeUnit.HOURS)
.put("d", TimeUnit.DAYS)
.build();

private static final ImmutableMap<String, ByteUnit> byteSuffixes =
ImmutableMap.<String, ByteUnit>builder()
.put("b", ByteUnit.BYTE)
.put("k", ByteUnit.KiB)
.put("kb", ByteUnit.KiB)
.put("m", ByteUnit.MiB)
.put("mb", ByteUnit.MiB)
.put("g", ByteUnit.GiB)
.put("gb", ByteUnit.GiB)
.put("t", ByteUnit.TiB)
.put("tb", ByteUnit.TiB)
.put("p", ByteUnit.PiB)
.put("pb", ByteUnit.PiB)
.build();
private static final Map<String, TimeUnit> timeSuffixes;

private static final Map<String, ByteUnit> byteSuffixes;

static {
final Map<String, TimeUnit> timeSuffixesBuilder = new HashMap<>();
timeSuffixesBuilder.put("us", TimeUnit.MICROSECONDS);
timeSuffixesBuilder.put("ms", TimeUnit.MILLISECONDS);
timeSuffixesBuilder.put("s", TimeUnit.SECONDS);
timeSuffixesBuilder.put("m", TimeUnit.MINUTES);
timeSuffixesBuilder.put("min", TimeUnit.MINUTES);
timeSuffixesBuilder.put("h", TimeUnit.HOURS);
timeSuffixesBuilder.put("d", TimeUnit.DAYS);
timeSuffixes = Collections.unmodifiableMap(timeSuffixesBuilder);

final Map<String, ByteUnit> byteSuffixesBuilder = new HashMap<>();
byteSuffixesBuilder.put("b", ByteUnit.BYTE);
byteSuffixesBuilder.put("k", ByteUnit.KiB);
byteSuffixesBuilder.put("kb", ByteUnit.KiB);
byteSuffixesBuilder.put("m", ByteUnit.MiB);
byteSuffixesBuilder.put("mb", ByteUnit.MiB);
byteSuffixesBuilder.put("g", ByteUnit.GiB);
byteSuffixesBuilder.put("gb", ByteUnit.GiB);
byteSuffixesBuilder.put("t", ByteUnit.TiB);
byteSuffixesBuilder.put("tb", ByteUnit.TiB);
byteSuffixesBuilder.put("p", ByteUnit.PiB);
byteSuffixesBuilder.put("pb", ByteUnit.PiB);
byteSuffixes = Collections.unmodifiableMap(byteSuffixesBuilder);
}

/**
* Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count in the given unit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
*/
package org.apache.spark.util

import java.io.IOException
import java.io.{Closeable, IOException, PrintWriter}
import java.nio.charset.StandardCharsets.UTF_8

import scala.util.control.NonFatal

import org.apache.spark.internal.Logging

object SparkErrorUtils extends Logging {
private[spark] trait SparkErrorUtils extends Logging {
/**
* Execute a block of code that returns a value, re-throwing any non-fatal uncaught
* exceptions as IOException. This is used when implementing Externalizable and Serializable's
Expand All @@ -41,4 +42,52 @@ object SparkErrorUtils extends Logging {
throw new IOException(e)
}
}

def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = {
val resource = createResource
try f.apply(resource) finally resource.close()
}

/**
* Execute a block of code, then a finally block, but if exceptions happen in
* the finally block, do not suppress the original exception.
*
* This is primarily an issue with `finally { out.close() }` blocks, where
* close needs to be called to clean up `out`, but if an exception happened
* in `out.write`, it's likely `out` may be corrupted and `out.close` will
* fail as well. This would then suppress the original/likely more meaningful
* exception from the original `out.write` call.
*/
def tryWithSafeFinally[T](block: => T)(finallyBlock: => Unit): T = {
var originalThrowable: Throwable = null
try {
block
} catch {
case t: Throwable =>
// Purposefully not using NonFatal, because even fatal exceptions
// we don't want to have our finallyBlock suppress
originalThrowable = t
throw originalThrowable
} finally {
try {
finallyBlock
} catch {
case t: Throwable if (originalThrowable != null && originalThrowable != t) =>
originalThrowable.addSuppressed(t)
logWarning(s"Suppressing exception in finally: ${t.getMessage}", t)
throw originalThrowable
}
}
}

def stackTraceToString(t: Throwable): String = {
val out = new java.io.ByteArrayOutputStream
SparkErrorUtils.tryWithResource(new PrintWriter(out)) { writer =>
t.printStackTrace(writer)
writer.flush()
}
new String(out.toByteArray, UTF_8)
}
}

object SparkErrorUtils extends SparkErrorUtils
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ package org.apache.spark.util

import java.io.File
import java.net.{URI, URISyntaxException}
import java.nio.file.Files

private[spark] object SparkFileUtils {
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils

private[spark] trait SparkFileUtils extends Logging {
/**
* Return a well-formed URI for the file described by a user input string.
*
Expand All @@ -44,4 +48,78 @@ private[spark] object SparkFileUtils {
}
new File(path).getCanonicalFile().toURI()
}

/**
* Lists files recursively.
*/
def recursiveList(f: File): Array[File] = {
require(f.isDirectory)
val result = f.listFiles.toBuffer
val dirList = result.filter(_.isDirectory)
while (dirList.nonEmpty) {
val curDir = dirList.remove(0)
val files = curDir.listFiles()
result ++= files
dirList ++= files.filter(_.isDirectory)
}
result.toArray
}

/**
* Create a directory given the abstract pathname
* @return true, if the directory is successfully created; otherwise, return false.
*/
def createDirectory(dir: File): Boolean = {
try {
// SPARK-35907: The check was required by File.mkdirs() because it could sporadically
// fail silently. After switching to Files.createDirectories(), ideally, there should
// no longer be silent fails. But the check is kept for the safety concern. We can
// remove the check when we're sure that Files.createDirectories() would never fail silently.
Files.createDirectories(dir.toPath)
if ( !dir.exists() || !dir.isDirectory) {
logError(s"Failed to create directory " + dir)
}
dir.isDirectory
} catch {
case e: Exception =>
logError(s"Failed to create directory " + dir, e)
false
}
}

/**
* Create a directory inside the given parent directory. The directory is guaranteed to be
* newly created, and is not marked for automatic deletion.
*/
def createDirectory(root: String, namePrefix: String = "spark"): File = {
JavaUtils.createDirectory(root, namePrefix)
}

/**
* Create a temporary directory inside the `java.io.tmpdir` prefixed with `spark`.
* The directory will be automatically deleted when the VM shuts down.
*/
def createTempDir(): File =
createTempDir(System.getProperty("java.io.tmpdir"), "spark")

/**
* Create a temporary directory inside the given parent directory. The directory will be
* automatically deleted when the VM shuts down.
*/
def createTempDir(
root: String = System.getProperty("java.io.tmpdir"),
namePrefix: String = "spark"): File = {
createDirectory(root, namePrefix)
}

/**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
* Throws an exception if deletion is unsuccessful.
*/
def deleteRecursively(file: File): Unit = {
JavaUtils.deleteRecursively(file)
}
}

private[spark] object SparkFileUtils extends SparkFileUtils
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.util

import java.io.{ByteArrayOutputStream, ObjectOutputStream}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}

object SparkSerDeUtils {
/** Serialize an object using Java serialization */
Expand All @@ -27,4 +27,11 @@ object SparkSerDeUtils {
oos.close()
bos.toByteArray
}

/** Deserialize an object using Java serialization */
def deserialize[T](bytes: Array[Byte]): T = {
val bis = new ByteArrayInputStream(bytes)
val ois = new ObjectInputStream(bis)
ois.readObject.asInstanceOf[T]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ import org.apache.spark.SPARK_VERSION_SHORT
import org.apache.spark.sql.{SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.types._

// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificInternalRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.types._

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, NoopF
import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, GenericInternalRow, Literal}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.LegacyBehaviorPolicy
import org.apache.spark.sql.sources.{EqualTo, Not}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -286,7 +286,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
schema,
dataType,
false,
RebaseSpec(SQLConf.LegacyBehaviorPolicy.CORRECTED),
RebaseSpec(LegacyBehaviorPolicy.CORRECTED),
filters)
val deserialized = deserializer.deserialize(data)
expected match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.internal.LegacyBehaviorPolicy._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.v2.avro.AvroScan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.avro.generic.GenericRecordBuilder
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.NoopFilters
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.CORRECTED
import org.apache.spark.sql.internal.LegacyBehaviorPolicy.CORRECTED
import org.apache.spark.sql.types.{IntegerType, StructType}

/**
Expand Down
Loading

0 comments on commit 5095f9f

Please sign in to comment.