Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49795][CORE][SQL][SS][DSTREAM][ML][MLLIB][K8S][YARN][EXAMPLES] Clean up deprecated Guava API usage #48248

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ public Iterator<T> iterator() {
iteratorTracker.add(new WeakReference<>(it));
return it;
} catch (Exception e) {
throw Throwables.propagate(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public boolean hasNext() {
try {
close();
} catch (IOException ioe) {
throw Throwables.propagate(ioe);
throw new RuntimeException(ioe);
}
}
return next != null;
Expand All @@ -151,7 +151,8 @@ public T next() {
next = null;
return ret;
} catch (Exception e) {
throw Throwables.propagate(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ public Iterator<T> iterator() {
iteratorTracker.add(new WeakReference<>(it));
return it;
} catch (Exception e) {
throw Throwables.propagate(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public boolean hasNext() {
try {
close();
} catch (IOException ioe) {
throw Throwables.propagate(ioe);
throw new RuntimeException(ioe);
}
}
return next != null;
Expand All @@ -137,7 +137,8 @@ public T next() {
next = null;
return ret;
} catch (Exception e) {
throw Throwables.propagate(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,11 @@ public void onFailure(Throwable e) {
try {
return result.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());
Throwables.throwIfUnchecked(e.getCause());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unable to confirm what type of Throwable is wrapped in ExecutionException, so still use

Throwables.throwIfUnchecked(e.getCause());
 throw new RuntimeException(e.getCause());

throw new RuntimeException(e.getCause());
} catch (Exception e) {
throw Throwables.propagate(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ public void operationComplete(final Future<Channel> handshakeFuture) {
logger.error("Exception while bootstrapping client after {} ms", e,
MDC.of(LogKeys.BOOTSTRAP_TIME$.MODULE$, bootstrapTimeMs));
client.close();
throw Throwables.propagate(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
long postBootstrap = System.nanoTime();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.security.GeneralSecurityException;
import java.util.concurrent.TimeoutException;

import com.google.common.base.Throwables;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -80,7 +79,7 @@ public void doBootstrap(TransportClient client, Channel channel) {
doSparkAuth(client, channel);
client.setClientId(appId);
} catch (GeneralSecurityException | IOException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
} catch (RuntimeException e) {
// There isn't a good exception that can be caught here to know whether it's really
// OK to switch back to SASL (because the server doesn't speak the new protocol). So
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ protected boolean doAuthChallenge(
try {
engine.close();
} catch (Exception e) {
throw Throwables.propagate(e);
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;

import org.apache.spark.internal.SparkLogger;
Expand Down Expand Up @@ -62,7 +61,7 @@ public SparkSaslClient(String secretKeyId, SecretKeyHolder secretKeyHolder, bool
this.saslClient = Sasl.createSaslClient(new String[] { DIGEST }, null, null, DEFAULT_REALM,
saslProps, new ClientCallbackHandler());
} catch (SaslException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

Expand All @@ -72,7 +71,7 @@ public synchronized byte[] firstToken() {
try {
return saslClient.evaluateChallenge(new byte[0]);
} catch (SaslException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
} else {
return new byte[0];
Expand All @@ -98,7 +97,7 @@ public synchronized byte[] response(byte[] token) {
try {
return saslClient != null ? saslClient.evaluateChallenge(token) : new byte[0];
} catch (SaslException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.Map;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -94,7 +93,7 @@ public SparkSaslServer(
this.saslServer = Sasl.createSaslServer(DIGEST, null, DEFAULT_REALM, saslProps,
new DigestCallbackHandler());
} catch (SaslException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

Expand All @@ -119,7 +118,7 @@ public synchronized byte[] response(byte[] token) {
try {
return saslServer != null ? saslServer.evaluateResponse(token) : new byte[0];
} catch (SaslException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.network.shuffledb;

import com.google.common.base.Throwables;

import java.io.IOException;
import java.util.Map;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -47,7 +45,7 @@ public boolean hasNext() {
try {
close();
} catch (IOException ioe) {
throw Throwables.propagate(ioe);
throw new RuntimeException(ioe);
}
}
return next != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.IOException;

import com.google.common.base.Throwables;
import org.rocksdb.RocksDBException;

/**
Expand All @@ -37,7 +36,7 @@ public void put(byte[] key, byte[] value) {
try {
db.put(key, value);
} catch (RocksDBException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

Expand All @@ -46,7 +45,7 @@ public byte[] get(byte[] key) {
try {
return db.get(key);
} catch (RocksDBException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

Expand All @@ -55,7 +54,7 @@ public void delete(byte[] key) {
try {
db.delete(key);
} catch (RocksDBException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Map;
import java.util.NoSuchElementException;

import com.google.common.base.Throwables;
import org.rocksdb.RocksIterator;

/**
Expand Down Expand Up @@ -52,7 +51,7 @@ public boolean hasNext() {
try {
close();
} catch (IOException ioe) {
throw Throwables.propagate(ioe);
throw new RuntimeException(ioe);
}
}
return next != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class KafkaTestUtils(
}

kdc.getKrb5conf.delete()
Files.write(krb5confStr, kdc.getKrb5conf, StandardCharsets.UTF_8)
Files.asCharSink(kdc.getKrb5conf, StandardCharsets.UTF_8).write(krb5confStr)
logDebug(s"krb5.conf file content: $krb5confStr")
}

Expand Down Expand Up @@ -240,7 +240,7 @@ class KafkaTestUtils(
| principal="$kafkaServerUser@$realm";
|};
""".stripMargin.trim
Files.write(content, file, StandardCharsets.UTF_8)
Files.asCharSink(file, StandardCharsets.UTF_8).write(content)
logDebug(s"Created JAAS file: ${file.getPath}")
logDebug(s"JAAS file content: $content")
file.getAbsolutePath()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ private boolean isEndOfStream() {

private void checkReadException() throws IOException {
if (readAborted) {
Throwables.propagateIfPossible(readException, IOException.class);
Throwables.throwIfInstanceOf(readException, IOException.class);
Throwables.throwIfUnchecked(readException);
throw new IOException(readException);
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ private[spark] object TestUtils extends SparkTestUtils {
def createTempScriptWithExpectedOutput(dir: File, prefix: String, output: String): String = {
val file = File.createTempFile(prefix, ".sh", dir)
val script = s"cat <<EOF\n$output\nEOF\n"
Files.write(script, file, StandardCharsets.UTF_8)
Files.asCharSink(file, StandardCharsets.UTF_8).write(script)
JavaFiles.setPosixFilePermissions(file.toPath,
EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE))
file.getPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets

import scala.jdk.CollectionConverters._

import com.google.common.io.Files
import com.google.common.io.{Files, FileWriteMode}

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil}
Expand Down Expand Up @@ -216,7 +216,7 @@ private[deploy] class DriverRunner(
val redactedCommand = Utils.redactCommandLineArgs(conf, builder.command.asScala.toSeq)
.mkString("\"", "\" \"", "\"")
val header = "Launch Command: %s\n%s\n\n".format(redactedCommand, "=" * 40)
Files.append(header, stderr, StandardCharsets.UTF_8)
Files.asCharSink(stderr, StandardCharsets.UTF_8, FileWriteMode.APPEND).write(header)
CommandUtils.redirectStream(process.getErrorStream, stderr)
}
runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private[deploy] class ExecutorRunner(
stdoutAppender = FileAppender(process.getInputStream, stdout, conf, true)

val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, StandardCharsets.UTF_8)
Files.asCharSink(stderr, StandardCharsets.UTF_8).write(header)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf, true)

state = ExecutorState.RUNNING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
/**
* Re-hash a value to deal better with hash functions that don't differ in the lower bits.
*/
private def rehash(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt()
private def rehash(h: Int): Int = Hashing.murmur3_32_fixed().hashInt(h).asInt()

/** Double the table's size and re-hash everything */
protected def growTable(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class OpenHashSet[@specialized(Long, Int, Double, Float) T: ClassTag](
/**
* Re-hash a value to deal better with hash functions that don't differ in the lower bits.
*/
private def hashcode(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt()
private def hashcode(h: Int): Int = Hashing.murmur3_32_fixed().hashInt(h).asInt()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, this is a different implementation that fixes the original one's bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/google/guava/blob/3c7c173e9c6ac93f154bfe40876f0c792d849f6e/guava/src/com/google/common/hash/Hashing.java#L117-L133

  /**
   * Returns a hash function implementing the <a
   * href="https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp">32-bit murmur3
   * algorithm, x86 variant</a> (little-endian variant), using the given seed value, <b>with a known
   * bug</b> as described in the deprecation text.
   *
   * <p>The C++ equivalent is the MurmurHash3_x86_32 function (Murmur3A), which however does not
   * have the bug.
   *
   * @deprecated This implementation produces incorrect hash values from the {@link
   *     HashFunction#hashString} method if the string contains non-BMP characters. Use {@link
   *     #murmur3_32_fixed()} instead.
   */
  @Deprecated
  public static HashFunction murmur3_32() {
    return Murmur3_32HashFunction.MURMUR3_32;
  }

Yes, but this is the official fix provided, and there seems to be no other equivalent alternative.

@pan3793 any better suggestions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pan3793

I think this change is safe for Spark. The difference between MURMUR3_32 and MURMUR3_32_FIXED lies in the different supplementaryPlaneFix parameters passed when constructing the Murmur3_32HashFunction:

https://github.com/google/guava/blob/0c33dd12b193402cdf6962d43d69743521aa2f76/guava/src/com/google/common/hash/Murmur3_32HashFunction.java#L56-L59

  static final HashFunction MURMUR3_32 =
      new Murmur3_32HashFunction(0, /* supplementaryPlaneFix= */ false);
  static final HashFunction MURMUR3_32_FIXED =
      new Murmur3_32HashFunction(0, /* supplementaryPlaneFix= */ true);

However, the supplementaryPlaneFix parameter is only used in Murmur3_32HashFunction#hashString, and Spark only utilizes Murmur3_32HashFunction#hashInt. Therefore, there will be no logical changes to this method after this change.

https://github.com/google/guava/blob/0c33dd12b193402cdf6962d43d69743521aa2f76/guava/src/com/google/common/hash/Murmur3_32HashFunction.java#L108-L114

  @Override
  public HashCode hashInt(int input) {
    int k1 = mixK1(input);
    int h1 = mixH1(seed, k1);

    return fmix(h1, Ints.BYTES);
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for checking, it is not a problem then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pan3793


private def nextPowerOf2(n: Int): Int = {
if (n == 0) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/java/test/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ public void textFiles() throws IOException {
rdd.saveAsTextFile(outputDir);
// Read the plain text file and check it's OK
File outputFile = new File(outputDir, "part-00000");
String content = Files.toString(outputFile, StandardCharsets.UTF_8);
String content = Files.asCharSource(outputFile, StandardCharsets.UTF_8).read();
assertEquals("1\n2\n3\n4\n", content);
// Also try reading it in as a text file RDD
List<String> expected = Arrays.asList("1", "2", "3", "4");
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,8 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {

for (i <- 0 until 8) {
val tempFile = new File(tempDir, s"part-0000$i")
Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", tempFile,
StandardCharsets.UTF_8)
Files.asCharSink(tempFile, StandardCharsets.UTF_8)
.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1")
}

for (p <- Seq(1, 2, 8)) {
Expand Down
Loading