Skip to content

Commit

Permalink
Merge branch 'master' into sparkSqlShell
Browse files Browse the repository at this point in the history
  • Loading branch information
SaintBacchus committed Nov 30, 2014
2 parents 8e112c5 + 0fcd24c commit f1c5c8d
Show file tree
Hide file tree
Showing 33 changed files with 868 additions and 296 deletions.
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import java.io.{ObjectInputStream, Serializable}
import java.util.concurrent.atomic.AtomicLong

import scala.collection.generic.Growable
import scala.collection.mutable.Map
Expand Down Expand Up @@ -228,6 +229,7 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
*/
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
extends Accumulable[T,T](initialValue, param, name) {

def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
}

Expand Down Expand Up @@ -282,7 +284,7 @@ private object Accumulators {
val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
var lastId: Long = 0

def newId: Long = synchronized {
def newId(): Long = synchronized {
lastId += 1
lastId
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
throw new NullPointerException("null key")
}
if (value == null) {
throw new NullPointerException("null value")
throw new NullPointerException("null value for " + key)
}
settings(key) = value
this
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,11 @@ object SparkSubmit {
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}

// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}

val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
Expand Down
84 changes: 82 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import java.sql.{Connection, ResultSet}

import scala.reflect.ClassTag

import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.util.NextIterator
import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}

private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition {
override def index = idx
Expand Down Expand Up @@ -125,5 +128,82 @@ object JdbcRDD {
def resultSetToObjectArray(rs: ResultSet): Array[Object] = {
Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1))
}
}

trait ConnectionFactory extends Serializable {
@throws[Exception]
def getConnection: Connection
}

/**
* Create an RDD that executes an SQL query on a JDBC connection and reads results.
* For usage example, see test case JavaAPISuite.testJavaJdbcRDD.
*
* @param connectionFactory a factory that returns an open Connection.
* The RDD takes care of closing the connection.
* @param sql the text of the query.
* The query must contain two ? placeholders for parameters used to partition the results.
* E.g. "select title, author from books where ? <= id and id <= ?"
* @param lowerBound the minimum value of the first placeholder
* @param upperBound the maximum value of the second placeholder
* The lower and upper bounds are inclusive.
* @param numPartitions the number of partitions.
* Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
* the query would be executed twice, once with (1, 10) and once with (11, 20)
* @param mapRow a function from a ResultSet to a single row of the desired result type(s).
* This should only call getInt, getString, etc; the RDD takes care of calling next.
* The default maps a ResultSet to an array of Object.
*/
def create[T](
sc: JavaSparkContext,
connectionFactory: ConnectionFactory,
sql: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
mapRow: JFunction[ResultSet, T]): JavaRDD[T] = {

val jdbcRDD = new JdbcRDD[T](
sc.sc,
() => connectionFactory.getConnection,
sql,
lowerBound,
upperBound,
numPartitions,
(resultSet: ResultSet) => mapRow.call(resultSet))(fakeClassTag)

new JavaRDD[T](jdbcRDD)(fakeClassTag)
}

/**
* Create an RDD that executes an SQL query on a JDBC connection and reads results. Each row is
* converted into a `Object` array. For usage example, see test case JavaAPISuite.testJavaJdbcRDD.
*
* @param connectionFactory a factory that returns an open Connection.
* The RDD takes care of closing the connection.
* @param sql the text of the query.
* The query must contain two ? placeholders for parameters used to partition the results.
* E.g. "select title, author from books where ? <= id and id <= ?"
* @param lowerBound the minimum value of the first placeholder
* @param upperBound the maximum value of the second placeholder
* The lower and upper bounds are inclusive.
* @param numPartitions the number of partitions.
* Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
* the query would be executed twice, once with (1, 10) and once with (11, 20)
*/
def create(
sc: JavaSparkContext,
connectionFactory: ConnectionFactory,
sql: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int): JavaRDD[Array[Object]] = {

val mapRow = new JFunction[ResultSet, Array[Object]] {
override def call(resultSet: ResultSet): Array[Object] = {
resultSetToObjectArray(resultSet)
}
}

create(sc, connectionFactory, sql, lowerBound, upperBound, numPartitions, mapRow)
}
}
53 changes: 30 additions & 23 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ class DAGScheduler(
}
// data structures based on StageId
stageIdToStage -= stageId

logDebug("After removal of stage %d, remaining stages = %d"
.format(stageId, stageIdToStage.size))
}
Expand Down Expand Up @@ -902,6 +901,34 @@ class DAGScheduler(
}
}

/** Merge updates from a task to our local accumulator values */
private def updateAccumulators(event: CompletionEvent): Unit = {
val task = event.task
val stage = stageIdToStage(task.stageId)
if (event.accumUpdates != null) {
try {
Accumulators.add(event.accumUpdates)
event.accumUpdates.foreach { case (id, partialValue) =>
val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && partialValue != acc.zero) {
val name = acc.name.get
val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
val stringValue = Accumulators.stringifyValue(acc.value)
stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue)
event.taskInfo.accumulables +=
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
}
}
} catch {
// If we see an exception during accumulator update, just log the
// error and move on.
case e: Exception =>
logError(s"Failed to update accumulators for $task", e)
}
}
}

/**
* Responds to a task finishing. This is called inside the event loop so it assumes that it can
* modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
Expand Down Expand Up @@ -942,27 +969,6 @@ class DAGScheduler(
}
event.reason match {
case Success =>
if (event.accumUpdates != null) {
try {
Accumulators.add(event.accumUpdates)
event.accumUpdates.foreach { case (id, partialValue) =>
val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && partialValue != acc.zero) {
val name = acc.name.get
val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
val stringValue = Accumulators.stringifyValue(acc.value)
stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue)
event.taskInfo.accumulables +=
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
}
}
} catch {
// If we see an exception during accumulator update, just log the error and move on.
case e: Exception =>
logError(s"Failed to update accumulators for $task", e)
}
}
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, event.taskMetrics))
stage.pendingTasks -= task
Expand All @@ -971,6 +977,7 @@ class DAGScheduler(
stage.resultOfJob match {
case Some(job) =>
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
job.finished(rt.outputId) = true
job.numFinished += 1
// If the whole job has finished, remove it
Expand All @@ -995,6 +1002,7 @@ class DAGScheduler(
}

case smt: ShuffleMapTask =>
updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
Expand Down Expand Up @@ -1083,7 +1091,6 @@ class DAGScheduler(
}
failedStages += failedStage
failedStages += mapStage

// Mark the map whose fetch failed as broken in the map stage
if (mapId != -1) {
mapStage.removeOutputLoc(mapId, bmAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
makeOffers()

case KillTask(taskId, executorId, interruptThread) =>
executorDataMap(executorId).executorActor ! KillTask(taskId, executorId, interruptThread)
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.executorActor ! KillTask(taskId, executorId, interruptThread)
case None =>
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}

case StopDriver =>
sender ! true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ final class ShuffleBlockFetcherIterator(

// Get Local Blocks
fetchLocalBlocks()
logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))
}

override def hasNext: Boolean = numBlocksProcessed < numBlocksToFetch
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ private[spark] object Utils extends Logging {
if (dir.exists() || !dir.mkdirs()) {
dir = null
}
} catch { case e: IOException => ; }
} catch { case e: SecurityException => dir = null; }
}

registerShutdownDeleteDir(dir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.util.collection

import scala.reflect.ClassTag

/**
* An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers.
* ArrayBuffer always allocates an Object array to store the data, with 16 entries by default,
Expand All @@ -25,7 +27,7 @@ package org.apache.spark.util.collection
* entries than that. This makes it more efficient for operations like groupBy where we expect
* some keys to have very few elements.
*/
private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable {
// First two elements
private var element0: T = _
private var element1: T = _
Expand All @@ -34,7 +36,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
private var curSize = 0

// Array for extra elements
private var otherElements: Array[AnyRef] = null
private var otherElements: Array[T] = null

def apply(position: Int): T = {
if (position < 0 || position >= curSize) {
Expand All @@ -45,7 +47,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
} else if (position == 1) {
element1
} else {
otherElements(position - 2).asInstanceOf[T]
otherElements(position - 2)
}
}

Expand All @@ -58,7 +60,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
} else if (position == 1) {
element1 = value
} else {
otherElements(position - 2) = value.asInstanceOf[AnyRef]
otherElements(position - 2) = value
}
}

Expand All @@ -72,7 +74,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
curSize = 2
} else {
growToSize(curSize + 1)
otherElements(newIndex - 2) = value.asInstanceOf[AnyRef]
otherElements(newIndex - 2) = value
}
this
}
Expand Down Expand Up @@ -139,7 +141,7 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
newArrayLen = Int.MaxValue - 2
}
}
val newArray = new Array[AnyRef](newArrayLen)
val newArray = new Array[T](newArrayLen)
if (otherElements != null) {
System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
}
Expand All @@ -150,9 +152,9 @@ private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
}

private[spark] object CompactBuffer {
def apply[T](): CompactBuffer[T] = new CompactBuffer[T]
def apply[T: ClassTag](): CompactBuffer[T] = new CompactBuffer[T]

def apply[T](value: T): CompactBuffer[T] = {
def apply[T: ClassTag](value: T): CompactBuffer[T] = {
val buf = new CompactBuffer[T]
buf += value
}
Expand Down
Loading

0 comments on commit f1c5c8d

Please sign in to comment.