diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java index 13a9d89f4705c..7f8d6c58aec7e 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java @@ -255,7 +255,8 @@ public Iterator iterator() { iteratorTracker.add(new WeakReference<>(it)); return it; } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } }; diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java index 69757fdc65d68..29ed37ffa44e5 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java @@ -127,7 +127,7 @@ public boolean hasNext() { try { close(); } catch (IOException ioe) { - throw Throwables.propagate(ioe); + throw new RuntimeException(ioe); } } return next != null; @@ -151,7 +151,8 @@ public T next() { next = null; return ret; } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java index dc7ad0be5c007..4bc2b233fe12d 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java @@ -287,7 +287,8 @@ public Iterator iterator() { iteratorTracker.add(new WeakReference<>(it)); return it; } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } }; diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java index a98b0482e35cc..e350ddc2d445a 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java @@ -113,7 +113,7 @@ public boolean hasNext() { try { close(); } catch (IOException ioe) { - throw Throwables.propagate(ioe); + throw new RuntimeException(ioe); } } return next != null; @@ -137,7 +137,8 @@ public T next() { next = null; return ret; } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index 4c144a73a9299..a9df47645d36f 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -290,9 +290,11 @@ public void onFailure(Throwable e) { try { return result.get(timeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { - throw Throwables.propagate(e.getCause()); + Throwables.throwIfUnchecked(e.getCause()); + throw new RuntimeException(e.getCause()); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index e1f19f956cc0a..d64b8c8f838e9 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -342,7 +342,8 @@ public void operationComplete(final Future handshakeFuture) { logger.error("Exception while bootstrapping client after {} ms", e, MDC.of(LogKeys.BOOTSTRAP_TIME$.MODULE$, bootstrapTimeMs)); client.close(); - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } long postBootstrap = System.nanoTime(); diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java index 08e2c084fe67b..2e9ccd0e0ad21 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java @@ -22,7 +22,6 @@ import java.security.GeneralSecurityException; import java.util.concurrent.TimeoutException; -import com.google.common.base.Throwables; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; @@ -80,7 +79,7 @@ public void doBootstrap(TransportClient client, Channel channel) { doSparkAuth(client, channel); client.setClientId(appId); } catch (GeneralSecurityException | IOException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } catch (RuntimeException e) { // There isn't a good exception that can be caught here to know whether it's really // OK to switch back to SASL (because the server doesn't speak the new protocol). So diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java index 65367743e24f9..087e3d21e22bb 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java @@ -132,7 +132,8 @@ protected boolean doAuthChallenge( try { engine.close(); } catch (Exception e) { - throw Throwables.propagate(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); } } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java b/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java index 3600c1045dbf4..a61b1c3c0c416 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java @@ -29,7 +29,6 @@ import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import org.apache.spark.internal.SparkLogger; @@ -62,7 +61,7 @@ public SparkSaslClient(String secretKeyId, SecretKeyHolder secretKeyHolder, bool this.saslClient = Sasl.createSaslClient(new String[] { DIGEST }, null, null, DEFAULT_REALM, saslProps, new ClientCallbackHandler()); } catch (SaslException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @@ -72,7 +71,7 @@ public synchronized byte[] firstToken() { try { return saslClient.evaluateChallenge(new byte[0]); } catch (SaslException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } else { return new byte[0]; @@ -98,7 +97,7 @@ public synchronized byte[] response(byte[] token) { try { return saslClient != null ? saslClient.evaluateChallenge(token) : new byte[0]; } catch (SaslException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java b/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java index b897650afe832..f32fd5145c7c5 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java @@ -31,7 +31,6 @@ import java.util.Map; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -94,7 +93,7 @@ public SparkSaslServer( this.saslServer = Sasl.createSaslServer(DIGEST, null, DEFAULT_REALM, saslProps, new DigestCallbackHandler()); } catch (SaslException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @@ -119,7 +118,7 @@ public synchronized byte[] response(byte[] token) { try { return saslServer != null ? saslServer.evaluateResponse(token) : new byte[0]; } catch (SaslException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDBIterator.java b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDBIterator.java index 5796e34a6f05e..2ac549775449a 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDBIterator.java +++ b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDBIterator.java @@ -17,8 +17,6 @@ package org.apache.spark.network.shuffledb; -import com.google.common.base.Throwables; - import java.io.IOException; import java.util.Map; import java.util.NoSuchElementException; @@ -47,7 +45,7 @@ public boolean hasNext() { try { close(); } catch (IOException ioe) { - throw Throwables.propagate(ioe); + throw new RuntimeException(ioe); } } return next != null; diff --git a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDB.java b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDB.java index d33895d6c2d62..2737ab8ed754c 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDB.java +++ b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDB.java @@ -19,7 +19,6 @@ import java.io.IOException; -import com.google.common.base.Throwables; import org.rocksdb.RocksDBException; /** @@ -37,7 +36,7 @@ public void put(byte[] key, byte[] value) { try { db.put(key, value); } catch (RocksDBException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @@ -46,7 +45,7 @@ public byte[] get(byte[] key) { try { return db.get(key); } catch (RocksDBException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @@ -55,7 +54,7 @@ public void delete(byte[] key) { try { db.delete(key); } catch (RocksDBException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java index 78562f91a4b75..829a7ded6330b 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java +++ b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.NoSuchElementException; -import com.google.common.base.Throwables; import org.rocksdb.RocksIterator; /** @@ -52,7 +51,7 @@ public boolean hasNext() { try { close(); } catch (IOException ioe) { - throw Throwables.propagate(ioe); + throw new RuntimeException(ioe); } } return next != null; diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 7852bc814ccd4..c3f02eebab23a 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -176,7 +176,7 @@ class KafkaTestUtils( } kdc.getKrb5conf.delete() - Files.write(krb5confStr, kdc.getKrb5conf, StandardCharsets.UTF_8) + Files.asCharSink(kdc.getKrb5conf, StandardCharsets.UTF_8).write(krb5confStr) logDebug(s"krb5.conf file content: $krb5confStr") } @@ -240,7 +240,7 @@ class KafkaTestUtils( | principal="$kafkaServerUser@$realm"; |}; """.stripMargin.trim - Files.write(content, file, StandardCharsets.UTF_8) + Files.asCharSink(file, StandardCharsets.UTF_8).write(content) logDebug(s"Created JAAS file: ${file.getPath}") logDebug(s"JAAS file content: $content") file.getAbsolutePath() diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java index 5e9f1b78273a5..7dd87df713e6e 100644 --- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java +++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java @@ -120,7 +120,8 @@ private boolean isEndOfStream() { private void checkReadException() throws IOException { if (readAborted) { - Throwables.propagateIfPossible(readException, IOException.class); + Throwables.throwIfInstanceOf(readException, IOException.class); + Throwables.throwIfUnchecked(readException); throw new IOException(readException); } } diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 5e3078d7292ba..fed15a067c00f 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -421,7 +421,7 @@ private[spark] object TestUtils extends SparkTestUtils { def createTempScriptWithExpectedOutput(dir: File, prefix: String, output: String): String = { val file = File.createTempFile(prefix, ".sh", dir) val script = s"cat < expected = Arrays.asList("1", "2", "3", "4"); diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 5651dc9b2dbdc..5f9912cbd021d 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -334,8 +334,8 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { for (i <- 0 until 8) { val tempFile = new File(tempDir, s"part-0000$i") - Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", tempFile, - StandardCharsets.UTF_8) + Files.asCharSink(tempFile, StandardCharsets.UTF_8) + .write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1") } for (p <- Seq(1, 2, 8)) { diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 12f9d2f83c777..44b2da603a1f6 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -119,8 +119,8 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu val absolutePath2 = file2.getAbsolutePath try { - Files.write("somewords1", file1, StandardCharsets.UTF_8) - Files.write("somewords2", file2, StandardCharsets.UTF_8) + Files.asCharSink(file1, StandardCharsets.UTF_8).write("somewords1") + Files.asCharSink(file2, StandardCharsets.UTF_8).write("somewords2") val length1 = file1.length() val length2 = file2.length() @@ -178,10 +178,10 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu s"${jarFile.getParent}/../${jarFile.getParentFile.getName}/${jarFile.getName}#zoo" try { - Files.write("somewords1", file1, StandardCharsets.UTF_8) - Files.write("somewords22", file2, StandardCharsets.UTF_8) - Files.write("somewords333", file3, StandardCharsets.UTF_8) - Files.write("somewords4444", file4, StandardCharsets.UTF_8) + Files.asCharSink(file1, StandardCharsets.UTF_8).write("somewords1") + Files.asCharSink(file2, StandardCharsets.UTF_8).write("somewords22") + Files.asCharSink(file3, StandardCharsets.UTF_8).write("somewords333") + Files.asCharSink(file4, StandardCharsets.UTF_8).write("somewords4444") val length1 = file1.length() val length2 = file2.length() val length3 = file1.length() @@ -373,8 +373,8 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(subdir2.mkdir()) val file1 = new File(subdir1, "file") val file2 = new File(subdir2, "file") - Files.write("old", file1, StandardCharsets.UTF_8) - Files.write("new", file2, StandardCharsets.UTF_8) + Files.asCharSink(file1, StandardCharsets.UTF_8).write("old") + Files.asCharSink(file2, StandardCharsets.UTF_8).write("new") sc = new SparkContext("local-cluster[1,1,1024]", "test") sc.addFile(file1.getAbsolutePath) def getAddedFileContents(): String = { @@ -503,12 +503,15 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu try { // Create 5 text files. - Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1, - StandardCharsets.UTF_8) - Files.write("someline1 in file2\nsomeline2 in file2", file2, StandardCharsets.UTF_8) - Files.write("someline1 in file3", file3, StandardCharsets.UTF_8) - Files.write("someline1 in file4\nsomeline2 in file4", file4, StandardCharsets.UTF_8) - Files.write("someline1 in file2\nsomeline2 in file5", file5, StandardCharsets.UTF_8) + Files.asCharSink(file1, StandardCharsets.UTF_8) + .write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1") + Files.asCharSink(file2, StandardCharsets.UTF_8) + .write("someline1 in file2\nsomeline2 in file2") + Files.asCharSink(file3, StandardCharsets.UTF_8).write("someline1 in file3") + Files.asCharSink(file4, StandardCharsets.UTF_8) + .write("someline1 in file4\nsomeline2 in file4") + Files.asCharSink(file5, StandardCharsets.UTF_8) + .write("someline1 in file2\nsomeline2 in file5") sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala index f34f792881f90..7501a98a1a573 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala @@ -221,7 +221,7 @@ class SingleFileEventLogFileReaderSuite extends EventLogFileReadersSuite { val entry = is.getNextEntry assert(entry != null) val actual = new String(ByteStreams.toByteArray(is), StandardCharsets.UTF_8) - val expected = Files.toString(new File(logPath.toString), StandardCharsets.UTF_8) + val expected = Files.asCharSource(new File(logPath.toString), StandardCharsets.UTF_8).read() assert(actual === expected) assert(is.getNextEntry === null) } @@ -368,8 +368,8 @@ class RollingEventLogFilesReaderSuite extends EventLogFileReadersSuite { assert(allFileNames.contains(fileName)) val actual = new String(ByteStreams.toByteArray(is), StandardCharsets.UTF_8) - val expected = Files.toString(new File(logPath.toString, fileName), - StandardCharsets.UTF_8) + val expected = Files.asCharSource( + new File(logPath.toString, fileName), StandardCharsets.UTF_8).read() assert(actual === expected) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 3013a5bf4a294..852f94bda870d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -708,7 +708,8 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P while (entry != null) { val actual = new String(ByteStreams.toByteArray(inputStream), StandardCharsets.UTF_8) val expected = - Files.toString(logs.find(_.getName == entry.getName).get, StandardCharsets.UTF_8) + Files.asCharSource(logs.find(_.getName == entry.getName).get, StandardCharsets.UTF_8) + .read() actual should be (expected) totalEntries += 1 entry = inputStream.getNextEntry diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala index 2b9b110a41424..807e5ec3e823e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala @@ -45,8 +45,8 @@ class HistoryServerArgumentsSuite extends SparkFunSuite { test("Properties File Arguments Parsing --properties-file") { withTempDir { tmpDir => val outFile = File.createTempFile("test-load-spark-properties", "test", tmpDir) - Files.write("spark.test.CustomPropertyA blah\n" + - "spark.test.CustomPropertyB notblah\n", outFile, UTF_8) + Files.asCharSink(outFile, UTF_8).write("spark.test.CustomPropertyA blah\n" + + "spark.test.CustomPropertyB notblah\n") val argStrings = Array("--properties-file", outFile.getAbsolutePath) val hsa = new HistoryServerArguments(conf, argStrings) assert(conf.get("spark.test.CustomPropertyA") === "blah") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index abb5ae720af07..6b2bd90cd4314 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -283,7 +283,7 @@ abstract class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with val expectedFile = { new File(logDir, entry.getName) } - val expected = Files.toString(expectedFile, StandardCharsets.UTF_8) + val expected = Files.asCharSource(expectedFile, StandardCharsets.UTF_8).read() val actual = new String(ByteStreams.toByteArray(zipStream), StandardCharsets.UTF_8) actual should be (expected) filesCompared += 1 diff --git a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala index 79fa8d21bf3f1..fc8f48df2cb7d 100644 --- a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala @@ -383,7 +383,7 @@ object NonLocalModeSparkPlugin { resources: Map[String, ResourceInformation]): Unit = { val path = conf.get(TEST_PATH_CONF) val strToWrite = createFileStringWithGpuAddrs(id, resources) - Files.write(strToWrite, new File(path, s"$filePrefix$id"), StandardCharsets.UTF_8) + Files.asCharSink(new File(path, s"$filePrefix$id"), StandardCharsets.UTF_8).write(strToWrite) } def reset(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala index ff7d680352177..edf138df9e207 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala @@ -148,7 +148,7 @@ object TestResourceDiscoveryPlugin { def writeFile(conf: SparkConf, id: String): Unit = { val path = conf.get(TEST_PATH_CONF) val fileName = s"$id - ${UUID.randomUUID.toString}" - Files.write(id, new File(path, fileName), StandardCharsets.UTF_8) + Files.asCharSink(new File(path, fileName), StandardCharsets.UTF_8).write(id) } } diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index 3ef382573517b..66b1ee7b58ac8 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -868,23 +868,23 @@ abstract class RpcEnvSuite extends SparkFunSuite { val conf = createSparkConf() val file = new File(tempDir, "file") - Files.write(UUID.randomUUID().toString(), file, UTF_8) + Files.asCharSink(file, UTF_8).write(UUID.randomUUID().toString) val fileWithSpecialChars = new File(tempDir, "file name") - Files.write(UUID.randomUUID().toString(), fileWithSpecialChars, UTF_8) + Files.asCharSink(fileWithSpecialChars, UTF_8).write(UUID.randomUUID().toString) val empty = new File(tempDir, "empty") - Files.write("", empty, UTF_8); + Files.asCharSink(empty, UTF_8).write("") val jar = new File(tempDir, "jar") - Files.write(UUID.randomUUID().toString(), jar, UTF_8) + Files.asCharSink(jar, UTF_8).write(UUID.randomUUID().toString) val dir1 = new File(tempDir, "dir1") assert(dir1.mkdir()) val subFile1 = new File(dir1, "file1") - Files.write(UUID.randomUUID().toString(), subFile1, UTF_8) + Files.asCharSink(subFile1, UTF_8).write(UUID.randomUUID().toString) val dir2 = new File(tempDir, "dir2") assert(dir2.mkdir()) val subFile2 = new File(dir2, "file2") - Files.write(UUID.randomUUID().toString(), subFile2, UTF_8) + Files.asCharSink(subFile2, UTF_8).write(UUID.randomUUID().toString) val fileUri = env.fileServer.addFile(file) val fileWithSpecialCharsUri = env.fileServer.addFile(fileWithSpecialChars) diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index 35ef0587b9b4c..4497ea1b2b798 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -54,11 +54,11 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter { val inputStream = new ByteArrayInputStream(testString.getBytes(StandardCharsets.UTF_8)) // The `header` should not be covered val header = "Add header" - Files.write(header, testFile, StandardCharsets.UTF_8) + Files.asCharSink(testFile, StandardCharsets.UTF_8).write(header) val appender = new FileAppender(inputStream, testFile) inputStream.close() appender.awaitTermination() - assert(Files.toString(testFile, StandardCharsets.UTF_8) === header + testString) + assert(Files.asCharSource(testFile, StandardCharsets.UTF_8).read() === header + testString) } test("SPARK-35027: basic file appender - close stream") { @@ -392,7 +392,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter { IOUtils.closeQuietly(inputStream) } } else { - Files.toString(file, StandardCharsets.UTF_8) + Files.asCharSource(file, StandardCharsets.UTF_8).read() } }.mkString("") assert(allText === expectedText) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index a694e08def89c..a6e3345fc600c 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -735,8 +735,8 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties { withTempDir { tmpDir => val outFile = File.createTempFile("test-load-spark-properties", "test", tmpDir) System.setProperty("spark.test.fileNameLoadB", "2") - Files.write("spark.test.fileNameLoadA true\n" + - "spark.test.fileNameLoadB 1\n", outFile, UTF_8) + Files.asCharSink(outFile, UTF_8).write("spark.test.fileNameLoadA true\n" + + "spark.test.fileNameLoadB 1\n") val properties = Utils.getPropertiesFromFile(outFile.getAbsolutePath) properties .filter { case (k, v) => k.startsWith("spark.")} @@ -765,7 +765,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties { val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath) val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir) val targetDir = new File(tempDir, "target-dir") - Files.write("some text", sourceFile, UTF_8) + Files.asCharSink(sourceFile, UTF_8).write("some text") val path = if (Utils.isWindows) { diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java index 0c11c40cfe7ed..1052f47ea496e 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.regex.Pattern; +import com.google.common.io.FileWriteMode; import scala.Tuple2; import com.google.common.io.Files; @@ -152,7 +153,8 @@ private static JavaStreamingContext createContext(String ip, System.out.println(output); System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally"); System.out.println("Appending to " + outputFile.getAbsolutePath()); - Files.append(output + "\n", outputFile, Charset.defaultCharset()); + Files.asCharSink(outputFile, Charset.defaultCharset(), FileWriteMode.APPEND) + .write(output + "\n"); }); return ssc; diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index 98539d6494231..1ec6ee4abd327 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -21,7 +21,7 @@ package org.apache.spark.examples.streaming import java.io.File import java.nio.charset.Charset -import com.google.common.io.Files +import com.google.common.io.{Files, FileWriteMode} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.broadcast.Broadcast @@ -134,7 +134,8 @@ object RecoverableNetworkWordCount { println(output) println(s"Dropped ${droppedWordsCounter.value} word(s) totally") println(s"Appending to ${outputFile.getAbsolutePath}") - Files.append(output + "\n", outputFile, Charset.defaultCharset()) + Files.asCharSink(outputFile, Charset.defaultCharset(), FileWriteMode.APPEND) + .write(output + "\n") } ssc } diff --git a/mllib/src/test/java/org/apache/spark/ml/source/libsvm/JavaLibSVMRelationSuite.java b/mllib/src/test/java/org/apache/spark/ml/source/libsvm/JavaLibSVMRelationSuite.java index c3038fa9e1f8f..5f0d22ea2a8aa 100644 --- a/mllib/src/test/java/org/apache/spark/ml/source/libsvm/JavaLibSVMRelationSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/source/libsvm/JavaLibSVMRelationSuite.java @@ -50,7 +50,7 @@ public void setUp() throws IOException { tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource"); File file = new File(tempDir, "part-00000"); String s = "1 1:1.0 3:2.0 5:3.0\n0\n0 2:4.0 4:5.0 6:6.0"; - Files.write(s, file, StandardCharsets.UTF_8); + Files.asCharSink(file, StandardCharsets.UTF_8).write(s); path = tempDir.toURI().toString(); } diff --git a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala index f2bb145614725..6a0d7b1237ee4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala @@ -65,9 +65,9 @@ class LibSVMRelationSuite val succ = new File(dir, "_SUCCESS") val file0 = new File(dir, "part-00000") val file1 = new File(dir, "part-00001") - Files.write("", succ, StandardCharsets.UTF_8) - Files.write(lines0, file0, StandardCharsets.UTF_8) - Files.write(lines1, file1, StandardCharsets.UTF_8) + Files.asCharSink(succ, StandardCharsets.UTF_8).write("") + Files.asCharSink(file0, StandardCharsets.UTF_8).write(lines0) + Files.asCharSink(file1, StandardCharsets.UTF_8).write(lines1) path = dir.getPath } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index a90c9c80d4959..1a02e26b9260c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -93,7 +93,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { """.stripMargin val tempDir = Utils.createTempDir() val file = new File(tempDir.getPath, "part-00000") - Files.write(lines, file, StandardCharsets.UTF_8) + Files.asCharSink(file, StandardCharsets.UTF_8).write(lines) val path = tempDir.toURI.toString val pointsWithNumFeatures = loadLibSVMFile(sc, path, 6).collect() @@ -126,7 +126,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { """.stripMargin val tempDir = Utils.createTempDir() val file = new File(tempDir.getPath, "part-00000") - Files.write(lines, file, StandardCharsets.UTF_8) + Files.asCharSink(file, StandardCharsets.UTF_8).write(lines) val path = tempDir.toURI.toString intercept[SparkException] { @@ -143,7 +143,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { """.stripMargin val tempDir = Utils.createTempDir() val file = new File(tempDir.getPath, "part-00000") - Files.write(lines, file, StandardCharsets.UTF_8) + Files.asCharSink(file, StandardCharsets.UTF_8).write(lines) val path = tempDir.toURI.toString intercept[SparkException] { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index 79f76e96474e3..2c28dc380046c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -107,7 +107,7 @@ object SparkKubernetesClientFactory extends Logging { (token, configBuilder) => configBuilder.withOauthToken(token) }.withOption(oauthTokenFile) { (file, configBuilder) => - configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8)) + configBuilder.withOauthToken(Files.asCharSource(file, Charsets.UTF_8).read()) }.withOption(caCertFile) { (file, configBuilder) => configBuilder.withCaCertFile(file) }.withOption(clientKeyFile) { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala index e266d0f904e46..d64378a65d66f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala @@ -116,7 +116,7 @@ private[spark] class HadoopConfDriverFeatureStep(conf: KubernetesConf) override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { if (confDir.isDefined) { val fileMap = confFiles.map { file => - (file.getName(), Files.toString(file, StandardCharsets.UTF_8)) + (file.getName(), Files.asCharSource(file, StandardCharsets.UTF_8).read()) }.toMap.asJava Seq(new ConfigMapBuilder() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala index 82bda88892d04..89aefe47e46d1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala @@ -229,7 +229,7 @@ private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri .endMetadata() .withImmutable(true) .addToData( - Map(file.getName() -> Files.toString(file, StandardCharsets.UTF_8)).asJava) + Map(file.getName() -> Files.asCharSource(file, StandardCharsets.UTF_8).read()).asJava) .build() } } ++ { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala index cdc0112294113..f94dad2d15dc1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala @@ -81,7 +81,7 @@ private[spark] class PodTemplateConfigMapStep(conf: KubernetesConf) val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf.sparkConf) val uri = downloadFile(podTemplateFile, Utils.createTempDir(), conf.sparkConf, hadoopConf) val file = new java.net.URI(uri).getPath - val podTemplateString = Files.toString(new File(file), StandardCharsets.UTF_8) + val podTemplateString = Files.asCharSource(new File(file), StandardCharsets.UTF_8).read() Seq(new ConfigMapBuilder() .withNewMetadata() .withName(configmapName) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala index f1dd8b94f17ff..a72152a851c4f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala @@ -128,7 +128,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite { private def writeCredentials(credentialsFileName: String, credentialsContents: String): File = { val credentialsFile = new File(credentialsTempDirectory, credentialsFileName) - Files.write(credentialsContents, credentialsFile, Charsets.UTF_8) + Files.asCharSink(credentialsFile, Charsets.UTF_8).write(credentialsContents) credentialsFile } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala index 8f21b95236a9c..4310ac0220e5e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala @@ -48,7 +48,7 @@ class HadoopConfDriverFeatureStepSuite extends SparkFunSuite { val confFiles = Set("core-site.xml", "hdfs-site.xml") confFiles.foreach { f => - Files.write("some data", new File(confDir, f), UTF_8) + Files.asCharSink(new File(confDir, f), UTF_8).write("some data") } val sparkConf = new SparkConfWithEnv(Map(ENV_HADOOP_CONF_DIR -> confDir.getAbsolutePath())) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala index a60227814eb13..04e20258d068f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala @@ -36,7 +36,7 @@ class HadoopConfExecutorFeatureStepSuite extends SparkFunSuite { val confFiles = Set("core-site.xml", "hdfs-site.xml") confFiles.foreach { f => - Files.write("some data", new File(confDir, f), UTF_8) + Files.asCharSink(new File(confDir, f), UTF_8).write("some data") } Seq( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala index 163d87643abd3..b172bdc06ddca 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala @@ -55,7 +55,7 @@ class KerberosConfDriverFeatureStepSuite extends SparkFunSuite { test("create krb5.conf config map if local config provided") { val krbConf = File.createTempFile("krb5", ".conf", tmpDir) - Files.write("some data", krbConf, UTF_8) + Files.asCharSink(krbConf, UTF_8).write("some data") val sparkConf = new SparkConf(false) .set(KUBERNETES_KERBEROS_KRB5_FILE, krbConf.getAbsolutePath()) @@ -70,7 +70,7 @@ class KerberosConfDriverFeatureStepSuite extends SparkFunSuite { test("create keytab secret if client keytab file used") { val keytab = File.createTempFile("keytab", ".bin", tmpDir) - Files.write("some data", keytab, UTF_8) + Files.asCharSink(keytab, UTF_8).write("some data") val sparkConf = new SparkConf(false) .set(KEYTAB, keytab.getAbsolutePath()) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala index ae5f037c6b7d4..950079dcb5362 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala @@ -40,7 +40,7 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite => val logConfFilePath = s"${sparkHomeDir.toFile}/conf/log4j2.properties" try { - Files.write( + Files.asCharSink(new File(logConfFilePath), StandardCharsets.UTF_8).write( """rootLogger.level = info |rootLogger.appenderRef.stdout.ref = console |appender.console.type = Console @@ -51,9 +51,7 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite => | |logger.spark.name = org.apache.spark |logger.spark.level = debug - """.stripMargin, - new File(logConfFilePath), - StandardCharsets.UTF_8) + """.stripMargin) f() } finally { diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 0b0b30e5e04fd..cf129677ad9c2 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -129,7 +129,7 @@ class KubernetesSuite extends SparkFunSuite val tagFile = new File(path) require(tagFile.isFile, s"No file found for image tag at ${tagFile.getAbsolutePath}.") - Files.toString(tagFile, Charsets.UTF_8).trim + Files.asCharSource(tagFile, Charsets.UTF_8).read().trim } .orElse(sys.props.get(CONFIG_KEY_IMAGE_TAG)) .getOrElse { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index f0177541accc1..e0dfac62847ea 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -86,7 +86,7 @@ abstract class BaseYarnClusterSuite extends SparkFunSuite with Matchers { logConfDir.mkdir() val logConfFile = new File(logConfDir, "log4j2.properties") - Files.write(LOG4J_CONF, logConfFile, StandardCharsets.UTF_8) + Files.asCharSink(logConfFile, StandardCharsets.UTF_8).write(LOG4J_CONF) // Disable the disk utilization check to avoid the test hanging when people's disks are // getting full. @@ -232,11 +232,11 @@ abstract class BaseYarnClusterSuite extends SparkFunSuite with Matchers { // an error message val output = new Object() { override def toString: String = outFile - .map(Files.toString(_, StandardCharsets.UTF_8)) + .map(Files.asCharSource(_, StandardCharsets.UTF_8).read()) .getOrElse("(stdout/stderr was not captured)") } assert(finalState === SparkAppHandle.State.FINISHED, output) - val resultString = Files.toString(result, StandardCharsets.UTF_8) + val resultString = Files.asCharSource(result, StandardCharsets.UTF_8).read() assert(resultString === expected, output) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 806efd39800fb..92d9f2d62d1c1 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -141,7 +141,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { | | |""".stripMargin - Files.write(coreSite, new File(customConf, "core-site.xml"), StandardCharsets.UTF_8) + Files.asCharSink(new File(customConf, "core-site.xml"), StandardCharsets.UTF_8).write(coreSite) val result = File.createTempFile("result", null, tempDir) val finalState = runSpark(false, @@ -295,23 +295,22 @@ class YarnClusterSuite extends BaseYarnClusterSuite { test("running Spark in yarn-cluster mode displays driver log links") { val log4jConf = new File(tempDir, "log4j.properties") val logOutFile = new File(tempDir, "logs") - Files.write( + Files.asCharSink(log4jConf, StandardCharsets.UTF_8).write( s"""rootLogger.level = debug |rootLogger.appenderRef.file.ref = file |appender.file.type = File |appender.file.name = file |appender.file.fileName = $logOutFile |appender.file.layout.type = PatternLayout - |""".stripMargin, - log4jConf, StandardCharsets.UTF_8) + |""".stripMargin) // Since this test is trying to extract log output from the SparkSubmit process itself, // standard options to the Spark process don't take effect. Leverage the java-opts file which // will get picked up for the SparkSubmit process. val confDir = new File(tempDir, "conf") confDir.mkdir() val javaOptsFile = new File(confDir, "java-opts") - Files.write(s"-Dlog4j.configurationFile=file://$log4jConf\n", javaOptsFile, - StandardCharsets.UTF_8) + Files.asCharSink(javaOptsFile, StandardCharsets.UTF_8) + .write(s"-Dlog4j.configurationFile=file://$log4jConf\n") val result = File.createTempFile("result", null, tempDir) val finalState = runSpark(clientMode = false, @@ -320,7 +319,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { extraEnv = Map("SPARK_CONF_DIR" -> confDir.getAbsolutePath), extraConf = Map(CLIENT_INCLUDE_DRIVER_LOGS_LINK.key -> true.toString)) checkResult(finalState, result) - val logOutput = Files.toString(logOutFile, StandardCharsets.UTF_8) + val logOutput = Files.asCharSource(logOutFile, StandardCharsets.UTF_8).read() val logFilePattern = raw"""(?s).+\sDriver Logs \(\): https?://.+/(\?\S+)?\s.+""" logOutput should fullyMatch regex logFilePattern.replace("", "stdout") logOutput should fullyMatch regex logFilePattern.replace("", "stderr") @@ -374,7 +373,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { extraEnv: Map[String, String] = Map()): Unit = { assume(isPythonAvailable) val primaryPyFile = new File(tempDir, "test.py") - Files.write(TEST_PYFILE, primaryPyFile, StandardCharsets.UTF_8) + Files.asCharSink(primaryPyFile, StandardCharsets.UTF_8).write(TEST_PYFILE) // When running tests, let's not assume the user has built the assembly module, which also // creates the pyspark archive. Instead, let's use PYSPARK_ARCHIVES_PATH to point at the @@ -396,7 +395,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { subdir } val pyModule = new File(moduleDir, "mod1.py") - Files.write(TEST_PYMODULE, pyModule, StandardCharsets.UTF_8) + Files.asCharSink(pyModule, StandardCharsets.UTF_8).write(TEST_PYMODULE) val mod2Archive = TestUtils.createJarWithFiles(Map("mod2.py" -> TEST_PYMODULE), moduleDir) val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",") @@ -443,7 +442,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { def createEmptyIvySettingsFile: File = { val emptyIvySettings = File.createTempFile("ivy", ".xml") - Files.write("", emptyIvySettings, StandardCharsets.UTF_8) + Files.asCharSink(emptyIvySettings, StandardCharsets.UTF_8).write("") emptyIvySettings } @@ -555,7 +554,7 @@ private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with Matc } result = "success" } finally { - Files.write(result, status, StandardCharsets.UTF_8) + Files.asCharSink(status, StandardCharsets.UTF_8).write(result) sc.stop() } } @@ -658,7 +657,7 @@ private object YarnClusterDriver extends Logging with Matchers { assert(driverAttributes === expectationAttributes) } } finally { - Files.write(result, status, StandardCharsets.UTF_8) + Files.asCharSink(status, StandardCharsets.UTF_8).write(result) sc.stop() } } @@ -707,7 +706,7 @@ private object YarnClasspathTest extends Logging { case t: Throwable => error(s"loading test.resource to $resultPath", t) } finally { - Files.write(result, new File(resultPath), StandardCharsets.UTF_8) + Files.asCharSink(new File(resultPath), StandardCharsets.UTF_8).write(result) } } @@ -751,7 +750,7 @@ private object YarnAddJarTest extends Logging { result = "success" } } finally { - Files.write(result, new File(resultPath), StandardCharsets.UTF_8) + Files.asCharSink(new File(resultPath), StandardCharsets.UTF_8).write(result) sc.stop() } } @@ -796,7 +795,7 @@ private object ExecutorEnvTestApp { executorEnvs.get(k).contains(v) } - Files.write(result.toString, new File(status), StandardCharsets.UTF_8) + Files.asCharSink(new File(status), StandardCharsets.UTF_8).write(result.toString) sc.stop() } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index f745265eddfd9..f8d69c0ae568e 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -181,7 +181,7 @@ private object YarnExternalShuffleDriver extends Logging with Matchers { if (execStateCopy != null) { FileUtils.deleteDirectory(execStateCopy) } - Files.write(result, status, StandardCharsets.UTF_8) + Files.asCharSink(status, StandardCharsets.UTF_8).write(result) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala index 275b35947182c..c90b1d3ca5978 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala @@ -1217,8 +1217,8 @@ class ArrowConvertersSuite extends SharedSparkSession { val tempFile1 = new File(tempDataPath, "testData2-ints-part1.json") val tempFile2 = new File(tempDataPath, "testData2-ints-part2.json") - Files.write(json1, tempFile1, StandardCharsets.UTF_8) - Files.write(json2, tempFile2, StandardCharsets.UTF_8) + Files.asCharSink(tempFile1, StandardCharsets.UTF_8).write(json1) + Files.asCharSink(tempFile2, StandardCharsets.UTF_8).write(json2) validateConversion(schema, arrowBatches(0), tempFile1) validateConversion(schema, arrowBatches(1), tempFile2) @@ -1501,7 +1501,7 @@ class ArrowConvertersSuite extends SharedSparkSession { // NOTE: coalesce to single partition because can only load 1 batch in validator val batchBytes = df.coalesce(1).toArrowBatchRdd.collect().head val tempFile = new File(tempDataPath, file) - Files.write(json, tempFile, StandardCharsets.UTF_8) + Files.asCharSink(tempFile, StandardCharsets.UTF_8).write(json) validateConversion(df.schema, batchBytes, tempFile, timeZoneId, errorOnDuplicatedFieldNames) } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 4575549005f33..f1f0befcb0d30 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -1222,7 +1222,7 @@ abstract class HiveThriftServer2TestBase extends SparkFunSuite with BeforeAndAft // overrides all other potential log4j configurations contained in other dependency jar files. val tempLog4jConf = Utils.createTempDir().getCanonicalPath - Files.write( + Files.asCharSink(new File(s"$tempLog4jConf/log4j2.properties"), StandardCharsets.UTF_8).write( """rootLogger.level = info |rootLogger.appenderRef.stdout.ref = console |appender.console.type = Console @@ -1230,9 +1230,7 @@ abstract class HiveThriftServer2TestBase extends SparkFunSuite with BeforeAndAft |appender.console.target = SYSTEM_ERR |appender.console.layout.type = PatternLayout |appender.console.layout.pattern = %d{HH:mm:ss.SSS} %p %c: %maxLen{%m}{512}%n%ex{8}%n - """.stripMargin, - new File(s"$tempLog4jConf/log4j2.properties"), - StandardCharsets.UTF_8) + """.stripMargin) tempLog4jConf } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala index 2b2cbec41d643..8d4a9886a2b25 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala @@ -75,7 +75,7 @@ class UISeleniumSuite // overrides all other potential log4j configurations contained in other dependency jar files. val tempLog4jConf = org.apache.spark.util.Utils.createTempDir().getCanonicalPath - Files.write( + Files.asCharSink(new File(s"$tempLog4jConf/log4j2.properties"), StandardCharsets.UTF_8).write( """rootLogger.level = info |rootLogger.appenderRef.file.ref = console |appender.console.type = Console @@ -83,9 +83,7 @@ class UISeleniumSuite |appender.console.target = SYSTEM_ERR |appender.console.layout.type = PatternLayout |appender.console.layout.pattern = %d{HH:mm:ss.SSS} %p %c: %maxLen{%m}{512}%n%ex{8}%n - """.stripMargin, - new File(s"$tempLog4jConf/log4j2.properties"), - StandardCharsets.UTF_8) + """.stripMargin) tempLog4jConf } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 14051034a588e..1c45b02375b30 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import java.util.{Locale, Set} -import com.google.common.io.Files +import com.google.common.io.{Files, FileWriteMode} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{SparkException, TestUtils} @@ -1947,10 +1947,10 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi val path = dir.toURI.toString.stripSuffix("/") val dirPath = dir.getAbsoluteFile for (i <- 1 to 3) { - Files.write(s"$i", new File(dirPath, s"part-r-0000$i"), StandardCharsets.UTF_8) + Files.asCharSink(new File(dirPath, s"part-r-0000$i"), StandardCharsets.UTF_8).write(s"$i") } for (i <- 5 to 7) { - Files.write(s"$i", new File(dirPath, s"part-s-0000$i"), StandardCharsets.UTF_8) + Files.asCharSink(new File(dirPath, s"part-s-0000$i"), StandardCharsets.UTF_8).write(s"$i") } withTable("load_t") { @@ -1971,7 +1971,7 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi val path = dir.toURI.toString.stripSuffix("/") val dirPath = dir.getAbsoluteFile for (i <- 1 to 3) { - Files.write(s"$i", new File(dirPath, s"part-r-0000 $i"), StandardCharsets.UTF_8) + Files.asCharSink(new File(dirPath, s"part-r-0000 $i"), StandardCharsets.UTF_8).write(s"$i") } withTable("load_t") { sql("CREATE TABLE load_t (a STRING) USING hive") @@ -1986,7 +1986,7 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi val path = dir.toURI.toString.stripSuffix("/") val dirPath = dir.getAbsoluteFile for (i <- 1 to 3) { - Files.write(s"$i", new File(dirPath, s"part-r-0000$i"), StandardCharsets.UTF_8) + Files.asCharSink(new File(dirPath, s"part-r-0000$i"), StandardCharsets.UTF_8).write(s"$i") } withTable("load_t") { sql("CREATE TABLE load_t (a STRING) USING hive") @@ -2010,7 +2010,7 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi val path = dir.toURI.toString.stripSuffix("/") val dirPath = dir.getAbsoluteFile for (i <- 1 to 3) { - Files.write(s"$i", new File(dirPath, s"part-r-0000$i"), StandardCharsets.UTF_8) + Files.asCharSink(new File(dirPath, s"part-r-0000$i"), StandardCharsets.UTF_8).write(s"$i") } withTable("load_t1") { sql("CREATE TABLE load_t1 (a STRING) USING hive") @@ -2025,7 +2025,7 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi val path = dir.toURI.toString.stripSuffix("/") val dirPath = dir.getAbsoluteFile for (i <- 1 to 3) { - Files.write(s"$i", new File(dirPath, s"part-r-0000$i"), StandardCharsets.UTF_8) + Files.asCharSink(new File(dirPath, s"part-r-0000$i"), StandardCharsets.UTF_8).write(s"$i") } withTable("load_t2") { sql("CREATE TABLE load_t2 (a STRING) USING hive") @@ -2039,7 +2039,8 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi withTempDir { dir => val path = dir.toURI.toString.stripSuffix("/") val dirPath = dir.getAbsoluteFile - Files.append("1", new File(dirPath, "part-r-000011"), StandardCharsets.UTF_8) + Files.asCharSink( + new File(dirPath, "part-r-000011"), StandardCharsets.UTF_8, FileWriteMode.APPEND).write("1") withTable("part_table") { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { sql( diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java index f8d961fa8dd8e..73c2e89f3729a 100644 --- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java @@ -1641,7 +1641,7 @@ public void testRawSocketStream() { private static List> fileTestPrepare(File testDir) throws IOException { File existingFile = new File(testDir, "0"); - Files.write("0\n", existingFile, StandardCharsets.UTF_8); + Files.asCharSink(existingFile, StandardCharsets.UTF_8).write("0\n"); Assertions.assertTrue(existingFile.setLastModified(1000)); Assertions.assertEquals(1000, existingFile.lastModified()); return Arrays.asList(Arrays.asList("0")); diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 43b0835df7cbf..4aeb0e043a973 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -649,7 +649,7 @@ class CheckpointSuite extends TestSuiteBase with LocalStreamingContext with DStr */ def writeFile(i: Int, clock: Clock): Unit = { val file = new File(testDir, i.toString) - Files.write(s"$i\n", file, StandardCharsets.UTF_8) + Files.asCharSink(file, StandardCharsets.UTF_8).write(s"$i\n") assert(file.setLastModified(clock.getTimeMillis())) // Check that the file's modification date is actually the value we wrote, since rounding or // truncation will break the test: diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 66fd1ac7bb22e..64335a96045bf 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -132,7 +132,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val batchDuration = Seconds(2) // Create a file that exists before the StreamingContext is created: val existingFile = new File(testDir, "0") - Files.write("0\n", existingFile, StandardCharsets.UTF_8) + Files.asCharSink(existingFile, StandardCharsets.UTF_8).write("0\n") assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000) // Set up the streaming context and input streams @@ -191,7 +191,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Create a file that exists before the StreamingContext is created: val existingFile = new File(testDir, "0") - Files.write("0\n", existingFile, StandardCharsets.UTF_8) + Files.asCharSink(existingFile, StandardCharsets.UTF_8).write("0\n") assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000) val pathWithWildCard = testDir.toString + "/*/" @@ -215,7 +215,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { def createFileAndAdvanceTime(data: Int, dir: File): Unit = { val file = new File(testSubDir1, data.toString) - Files.write(s"$data\n", file, StandardCharsets.UTF_8) + Files.asCharSink(file, StandardCharsets.UTF_8).write(s"$data\n") assert(file.setLastModified(clock.getTimeMillis())) assert(file.lastModified === clock.getTimeMillis()) logInfo(s"Created file $file") @@ -478,7 +478,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val batchDuration = Seconds(2) // Create a file that exists before the StreamingContext is created: val existingFile = new File(testDir, "0") - Files.write("0\n", existingFile, StandardCharsets.UTF_8) + Files.asCharSink(existingFile, StandardCharsets.UTF_8).write("0\n") assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000) // Set up the streaming context and input streams @@ -502,7 +502,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val input = Seq(1, 2, 3, 4, 5) input.foreach { i => val file = new File(testDir, i.toString) - Files.write(s"$i\n", file, StandardCharsets.UTF_8) + Files.asCharSink(file, StandardCharsets.UTF_8).write(s"$i\n") assert(file.setLastModified(clock.getTimeMillis())) assert(file.lastModified === clock.getTimeMillis()) logInfo("Created file " + file) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala index 771e65ed40b51..2dc43a231d9b8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala @@ -375,7 +375,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) val localFile = new File(localTestDir, (i + 1).toString) val hadoopFile = new Path(testDir, (i + 1).toString) val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString) - Files.write(input(i) + "\n", localFile, StandardCharsets.UTF_8) + Files.asCharSink(localFile, StandardCharsets.UTF_8).write(input(i) + "\n") var tries = 0 var done = false while (!done && tries < maxTries) {