Skip to content

Commit

Permalink
SPARK-1209 [CORE] (Take 2) SparkHadoop{MapRed,MapReduce}Util should n…
Browse files Browse the repository at this point in the history
…ot use package org.apache.hadoop

andrewor14 Another try at SPARK-1209, to address #2814 (comment)

I successfully tested with `mvn -Dhadoop.version=1.0.4 -DskipTests clean package; mvn -Dhadoop.version=1.0.4 test` I assume that is what failed Jenkins last time. I also tried `-Dhadoop.version1.2.1` and `-Phadoop-2.4 -Pyarn -Phive` for more coverage.

So this is why the class was put in `org.apache.hadoop` to begin with, I assume. One option is to leave this as-is for now and move it only when Hadoop 1.0.x support goes away.

This is the other option, which adds a call to force the constructor to be public at run-time. It's probably less surprising than putting Spark code in `org.apache.hadoop`, but, does involve reflection. A `SecurityManager` might forbid this, but it would forbid a lot of stuff Spark does. This would also only affect Hadoop 1.0.x it seems.

Author: Sean Owen <sowen@cloudera.com>

Closes #3048 from srowen/SPARK-1209 and squashes the following commits:

0d48f4b [Sean Owen] For Hadoop 1.0.x, make certain constructors public, which were public in later versions
466e179 [Sean Owen] Disable MIMA warnings resulting from moving the class -- this was also part of the PairRDDFunctions type hierarchy though?
eb61820 [Sean Owen] Move SparkHadoopMapRedUtil / SparkHadoopMapReduceUtil from org.apache.hadoop to org.apache.spark

(cherry picked from commit f8e5732)
Signed-off-by: Patrick Wendell <pwendell@gmail.com>
  • Loading branch information
srowen authored and pwendell committed Nov 10, 2014
1 parent a9debe8 commit 42d19ae
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hadoop.mapred._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,35 @@
* limitations under the License.
*/

package org.apache.hadoop.mapred
package org.apache.spark.mapred

private[apache]
import java.lang.reflect.Modifier

import org.apache.hadoop.mapred.{TaskAttemptID, JobID, JobConf, JobContext, TaskAttemptContext}

private[spark]
trait SparkHadoopMapRedUtil {
def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl",
"org.apache.hadoop.mapred.JobContext")
val ctor = klass.getDeclaredConstructor(classOf[JobConf],
classOf[org.apache.hadoop.mapreduce.JobID])
// In Hadoop 1.0.x, JobContext is an interface, and JobContextImpl is package private.
// Make it accessible if it's not in order to access it.
if (!Modifier.isPublic(ctor.getModifiers)) {
ctor.setAccessible(true)
}
ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
}

def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = {
val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl",
"org.apache.hadoop.mapred.TaskAttemptContext")
val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID])
// See above
if (!Modifier.isPublic(ctor.getModifiers)) {
ctor.setAccessible(true)
}
ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
* limitations under the License.
*/

package org.apache.hadoop.mapreduce
package org.apache.spark.mapreduce

import java.lang.{Boolean => JBoolean, Integer => JInteger}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{JobContext, JobID, TaskAttemptContext, TaskAttemptID}

private[apache]
private[spark]
trait SparkHadoopMapReduceUtil {
def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
val klass = firstAvailableClass(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.Partition
import org.apache.spark.SerializableWritable
import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat,
RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
RecordWriter => NewRecordWriter}

import org.apache.spark._
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.Utils
Expand Down
8 changes: 8 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ object MimaExcludes {
// SPARK-3822
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler")
) ++ Seq(
// SPARK-1209
ProblemFilters.exclude[MissingClassProblem](
"org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.hadoop.mapred.SparkHadoopMapRedUtil"),
ProblemFilters.exclude[MissingTypesProblem](
"org.apache.spark.rdd.PairRDDFunctions")
)

case v if v.startsWith("1.1") =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import parquet.hadoop.util.ContextUtil
import parquet.io.ParquetDecodingException
import parquet.schema.MessageType

import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.SQLConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred._

import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.sql.Row
import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc}
Expand Down

0 comments on commit 42d19ae

Please sign in to comment.