Skip to content

Commit

Permalink
SPARK-1316. Remove use of Commons IO
Browse files Browse the repository at this point in the history
(This follows from a side point on SPARK-1133, in discussion of the PR: apache#164 )

Commons IO is barely used in the project, and can easily be replaced with equivalent calls to Guava or the existing Spark `Utils.scala` class.

Removing a dependency feels good, and this one in particular can get a little problematic since Hadoop uses it too.

Author: Sean Owen <sowen@cloudera.com>

Closes apache#226 from srowen/SPARK-1316 and squashes the following commits:

21efef3 [Sean Owen] Remove use of Commons IO
  • Loading branch information
srowen authored and pdeyhim committed Jun 25, 2014
1 parent fbe8151 commit ef0c8eb
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 54 deletions.
5 changes: 0 additions & 5 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,6 @@
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,10 @@ private[spark] object Utils extends Logging {
}
}
if (!file.delete()) {
throw new IOException("Failed to delete: " + file)
// Delete can also fail if the file simply did not exist
if (file.exists()) {
throw new IOException("Failed to delete: " + file.getAbsolutePath)
}
}
}

Expand Down
3 changes: 1 addition & 2 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.nio.{ByteBuffer, ByteOrder}

import com.google.common.base.Charsets
import com.google.common.io.Files
import org.apache.commons.io.FileUtils
import org.scalatest.FunSuite

class UtilsSuite extends FunSuite {
Expand Down Expand Up @@ -136,7 +135,7 @@ class UtilsSuite extends FunSuite {
// Read some nonexistent bytes on both ends
assert(Utils.offsetBytes(f1Path, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n")

FileUtils.deleteDirectory(tmpDir2)
Utils.deleteRecursively(tmpDir2)
}

test("deserialize long value") {
Expand Down
5 changes: 0 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,6 @@
<version>1.9.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
Expand Down
8 changes: 2 additions & 6 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,7 @@ object SparkBuild extends Build {
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
"com.novocode" % "junit-interface" % "0.10" % "test",
"org.easymock" % "easymock" % "3.1" % "test",
"org.mockito" % "mockito-all" % "1.8.5" % "test",
"commons-io" % "commons-io" % "2.4" % "test"
"org.mockito" % "mockito-all" % "1.8.5" % "test"
),

testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
Expand Down Expand Up @@ -442,10 +441,7 @@ object SparkBuild extends Build {

def streamingSettings = sharedSettings ++ Seq(
name := "spark-streaming",
previousArtifact := sparkPreviousArtifact("spark-streaming"),
libraryDependencies ++= Seq(
"commons-io" % "commons-io" % "2.4"
)
previousArtifact := sparkPreviousArtifact("spark-streaming")
)

def yarnCommonSettings = sharedSettings ++ Seq(
Expand Down
4 changes: 0 additions & 4 deletions streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.reflect.ClassTag

import java.io.{File, ObjectInputStream, IOException}
import java.nio.charset.Charset
import java.util.UUID

import com.google.common.io.Files

import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.{FileUtil, FileSystem, Path}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration


Expand Down Expand Up @@ -389,7 +389,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
val localFile = new File(localTestDir, (i + 1).toString)
val hadoopFile = new Path(testDir, (i + 1).toString)
val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString)
FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
Files.write(input(i) + "\n", localFile, Charset.forName("UTF-8"))
var tries = 0
var done = false
while (!done && tries < maxTries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@
package org.apache.spark.streaming

import java.io.File
import java.nio.charset.Charset

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import org.apache.commons.io.FileUtils
import com.google.common.io.Files
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
import org.apache.spark.SparkConf

/**
* This test suites tests the checkpointing functionality of DStreams -
Expand All @@ -46,13 +45,13 @@ class CheckpointSuite extends TestSuiteBase {

override def beforeFunction() {
super.beforeFunction()
FileUtils.deleteDirectory(new File(checkpointDir))
Utils.deleteRecursively(new File(checkpointDir))
}

override def afterFunction() {
super.afterFunction()
if (ssc != null) ssc.stop()
FileUtils.deleteDirectory(new File(checkpointDir))
Utils.deleteRecursively(new File(checkpointDir))
}

test("basic rdd checkpoints + dstream graph checkpoint recovery") {
Expand Down Expand Up @@ -256,7 +255,7 @@ class CheckpointSuite extends TestSuiteBase {
//var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Thread.sleep(1000)
for (i <- Seq(1, 2, 3)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
// wait to make sure that the file is written such that it gets shown in the file listings
Thread.sleep(1000)
}
Expand All @@ -273,7 +272,7 @@ class CheckpointSuite extends TestSuiteBase {

// Create files while the master is down
for (i <- Seq(4, 5, 6)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
Thread.sleep(1000)
}

Expand All @@ -289,7 +288,7 @@ class CheckpointSuite extends TestSuiteBase {
// Restart stream computation
ssc.start()
for (i <- Seq(7, 8, 9)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
Thread.sleep(1000)
}
Thread.sleep(1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,9 @@ package org.apache.spark.streaming

import org.apache.spark.Logging
import org.apache.spark.streaming.util.MasterFailureTest
import StreamingContext._
import org.apache.spark.util.Utils

import org.scalatest.{FunSuite, BeforeAndAfter}
import com.google.common.io.Files
import java.io.File
import org.apache.commons.io.FileUtils
import collection.mutable.ArrayBuffer


/**
* This testsuite tests master failures at random times while the stream is running using
Expand All @@ -43,12 +38,12 @@ class FailureSuite extends TestSuiteBase with Logging {

override def beforeFunction() {
super.beforeFunction()
FileUtils.deleteDirectory(new File(directory))
Utils.deleteRecursively(new File(directory))
}

override def afterFunction() {
super.afterFunction()
FileUtils.deleteDirectory(new File(directory))
Utils.deleteRecursively(new File(directory))
}

test("multiple failures with map") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,23 @@ import akka.actor.IOManager
import akka.actor.Props
import akka.util.ByteString

import org.apache.spark.streaming.dstream.{NetworkReceiver}
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.net.{InetSocketAddress, SocketException, ServerSocket}
import java.nio.charset.Charset
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import util.ManualClock
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}

import com.google.common.io.Files
import org.scalatest.BeforeAndAfter

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.NetworkReceiver
import org.apache.spark.streaming.receivers.Receiver
import org.apache.spark.Logging
import scala.util.Random
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
import collection.JavaConversions._
import com.google.common.io.Files
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils

class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {

Expand Down Expand Up @@ -112,7 +114,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
Thread.sleep(1000)
for (i <- 0 until input.size) {
val file = new File(testDir, i.toString)
FileUtils.writeStringToFile(file, input(i).toString + "\n")
Files.write(input(i) + "\n", file, Charset.forName("UTF-8"))
logInfo("Created file " + file)
Thread.sleep(batchDuration.milliseconds)
Thread.sleep(1000)
Expand All @@ -136,7 +138,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// (whether the elements were received one in each interval is not verified)
assert(output.toList === expectedOutput.toList)

FileUtils.deleteDirectory(testDir)
Utils.deleteRecursively(testDir)

// Enable manual clock back again for other tests
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
Expand Down

0 comments on commit ef0c8eb

Please sign in to comment.