Skip to content

Commit

Permalink
change the parameter type back to Configuration
Browse files Browse the repository at this point in the history
code sync
  • Loading branch information
CodingCat committed Mar 18, 2014
1 parent a8583ee commit 7643c88
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])

/**
* Output the RDD to any Hadoop-supported storage system, using
* a org.apache.hadoop.mapreduce.Job object for that storage system.
* a Configuration object for that storage system.
*/
def saveAsNewAPIHadoopDataset(job: Job) {
rdd.saveAsNewAPIHadoopDataset(job)
def saveAsNewAPIHadoopDataset(conf: Configuration) {
rdd.saveAsNewAPIHadoopDataset(conf)
}

/** Output the RDD to any Hadoop-supported file system. */
Expand Down
32 changes: 14 additions & 18 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,13 @@ import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import com.clearspring.analytics.stream.cardinality.HyperLogLog
<<<<<<< HEAD
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.{FileSystem, Path}
=======
import org.apache.hadoop.conf.Configuration
>>>>>>> Create a saveAsNewAPIHadoopDataset method
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, JobContext, SparkHadoopMapReduceUtil}
<<<<<<< HEAD
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
=======

>>>>>>> Create a saveAsNewAPIHadoopDataset method

// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
import org.apache.hadoop.mapred.SparkHadoopWriter
Expand Down Expand Up @@ -613,7 +605,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass)
job.getConfiguration.set("mapred.output.dir", path)
saveAsNewAPIHadoopDataset(job)
saveAsNewAPIHadoopDataset(job.getConfiguration)
}

/**
Expand Down Expand Up @@ -661,22 +653,22 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])

/**
* Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop
* Job object for that storage system. The Job should set an OutputFormat and any output paths
* Configuration object for that storage system. The Conf should set an OutputFormat and any output paths
* required (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
* MapReduce job.
*/
def saveAsNewAPIHadoopDataset(job: NewAPIHadoopJob) {
def saveAsNewAPIHadoopDataset(conf: Configuration) {
val job = new NewAPIHadoopJob(conf)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
val wrappedConf = new SerializableWritable(job.getConfiguration)
val outfmt = job.getOutputFormatClass
val outputFormatInstance = outfmt.newInstance()
val jobFormat = outfmt.newInstance

if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
// FileOutputFormat ignores the filesystem parameter
val conf = job.getConfiguration
outputFormatInstance.checkOutputSpecs(job)
jobFormat.checkOutputSpecs(job)
}

def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
Expand All @@ -688,6 +680,10 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
attemptNumber)
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
val format = outfmt.newInstance
format match {
case c: Configurable => c.setConf(wrappedConf.value)
case _ => ()
}
val committer = format.getOutputCommitter(hadoopContext)
committer.setupTask(hadoopContext)
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
Expand All @@ -699,7 +695,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
committer.commitTask(hadoopContext)
return 1
}
val jobFormat = outfmt.newInstance

val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.mapred.FileAlreadyExistsException
import org.scalatest.FunSuite

import org.apache.spark.SparkContext._
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}

class FileSuite extends FunSuite with LocalSparkContext {

Expand Down Expand Up @@ -244,8 +244,8 @@ class FileSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local", "test")
val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
randomRDD.saveAsTextFile(tempdir.getPath + "/output")
assert(new File(tempdir.getPath + "/output/part-00000").exists() === true)
randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath + "/output")
assert(new File(tempdir.getPath + "/output/part-r-00000").exists() === true)
intercept[FileAlreadyExistsException] {
randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
}
Expand Down

0 comments on commit 7643c88

Please sign in to comment.