Skip to content

Commit

Permalink
add test cases for saveAsHadoopDataSet (new&old API)
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingCat committed Mar 18, 2014
1 parent a8d11ba commit 6ba0c83
Showing 1 changed file with 32 additions and 5 deletions.
37 changes: 32 additions & 5 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import scala.io.Source
import com.google.common.io.Files
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress.DefaultCodec
import org.apache.hadoop.mapred.FileAlreadyExistsException
import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, TextOutputFormat}
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
import org.apache.hadoop.mapreduce.Job
import org.scalatest.FunSuite

import org.apache.spark.SparkContext._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}

class FileSuite extends FunSuite with LocalSparkContext {

Expand Down Expand Up @@ -236,18 +237,44 @@ class FileSuite extends FunSuite with LocalSparkContext {
val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
intercept[FileAlreadyExistsException] {
randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath)
}
}

test ("prevent user from overwriting the non-empty directory (new Hadoop API)") {
sc = new SparkContext("local", "test")
val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath + "/output")
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath + "/output")
assert(new File(tempdir.getPath + "/output/part-r-00000").exists() === true)
intercept[FileAlreadyExistsException] {
randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath)
}
}

test ("save Hadoop Dataset through old Hadoop API") {
sc = new SparkContext("local", "test")
val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val job = new JobConf()
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
job.set("mapred.output.format.class", classOf[TextOutputFormat[String, String]].getName)
job.set("mapred.output.dir", tempdir.getPath + "/outputDataset_old")
randomRDD.saveAsHadoopDataset(job)
assert(new File(tempdir.getPath + "/outputDataset_old/part-00000").exists() === true)
}

test ("save Hadoop Dataset through new Hadoop API") {
sc = new SparkContext("local", "test")
val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]])
job.getConfiguration.set("mapred.output.dir", tempdir.getPath + "/outputDataset_new")
randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
assert(new File(tempdir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
}
}

0 comments on commit 6ba0c83

Please sign in to comment.