Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
bilna committed Jan 8, 2015
2 parents acea3a3 + c082385 commit ae56514
Show file tree
Hide file tree
Showing 97 changed files with 1,157 additions and 859 deletions.
11 changes: 0 additions & 11 deletions bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Expand All @@ -58,11 +53,5 @@
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
4 changes: 2 additions & 2 deletions bagel/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
# limitations under the License.
#

# Set everything to be logged to the file bagel/target/unit-tests.log
# Set everything to be logged to the file target/unit-tests.log
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.append=true
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
Expand Down
18 changes: 0 additions & 18 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,6 @@
<artifactId>selenium-java</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
Expand Down Expand Up @@ -326,19 +321,6 @@
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Unzip py4j so we can include its files in the jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
try {
dagScheduler = new DAGScheduler(this)
} catch {
case e: Exception => throw
new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))
case e: Exception => {
try {
stop()
} finally {
throw new SparkException("Error while constructing DAGScheduler", e)
}
}
}

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ object Client {
val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

// Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
Master.toAkkaUrl(driverArgs.master)
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))

actorSystem.awaitTermination()
Expand Down
22 changes: 9 additions & 13 deletions core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}

import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
Expand All @@ -47,6 +47,8 @@ private[spark] class AppClient(
conf: SparkConf)
extends Logging {

val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)

val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3

Expand Down Expand Up @@ -75,9 +77,9 @@ private[spark] class AppClient(
}

def tryRegisterAllMasters() {
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Connecting to master " + masterAkkaUrl + "...")
val actor = context.actorSelection(masterAkkaUrl)
actor ! RegisterApplication(appDescription)
}
}
Expand All @@ -103,20 +105,14 @@ private[spark] class AppClient(
}

def changeMaster(url: String) {
// activeMasterUrl is a valid Spark url since we receive it from master.
activeMasterUrl = url
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
masterAddress = activeMasterUrl match {
case Master.sparkUrlRegex(host, port) =>
Address("akka.tcp", Master.systemName, host, port.toInt)
case x =>
throw new SparkException("Invalid spark URL: " + x)
}
masterAddress = Master.toAkkaAddress(activeMasterUrl)
}

private def isPossibleMaster(remoteUrl: Address) = {
masterUrls.map(s => Master.toAkkaUrl(s))
.map(u => AddressFromURIString(u).hostPort)
.contains(remoteUrl.hostPort)
masterAkkaUrls.map(AddressFromURIString(_).hostPort).contains(remoteUrl.hostPort)
}

override def receiveWithLogging = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ private[spark] case class ApplicationHistoryInfo(
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String)
sparkUser: String,
completed: Boolean = false)

private[spark] abstract class ApplicationHistoryProvider {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,20 +173,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val logInfos = statusList
.filter { entry =>
try {
val isFinishedApplication =
if (isLegacyLogDirectory(entry)) {
fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
} else {
!entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
}

if (isFinishedApplication) {
val modTime = getModificationTime(entry)
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
modTime >= lastModifiedTime
} else {
false
}
val modTime = getModificationTime(entry)
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
modTime >= lastModifiedTime
} catch {
case e: AccessControlException =>
// Do not use "logInfo" since these messages can get pretty noisy if printed on
Expand All @@ -204,7 +193,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
None
}
}
.sortBy { info => -info.endTime }
.sortBy { info => (-info.endTime, -info.startTime) }

lastModifiedTime = newLastModifiedTime

Expand Down Expand Up @@ -261,7 +250,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog),
appListener.sparkUser.getOrElse(NOT_STARTED))
appListener.sparkUser.getOrElse(NOT_STARTED),
isApplicationCompleted(eventLog))
} finally {
logInput.close()
}
Expand Down Expand Up @@ -329,6 +319,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
/** Returns the system's mononotically increasing time. */
private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000)

/**
* Return true when the application has completed.
*/
private def isApplicationCompleted(entry: FileStatus): Boolean = {
if (isLegacyLogDirectory(entry)) {
fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
} else {
!entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
}
}

}

private object FsHistoryProvider {
Expand All @@ -342,5 +343,6 @@ private class FsApplicationHistoryInfo(
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String)
extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser)
sparkUser: String,
completed: Boolean = true)
extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed)
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node] = {
val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt
val requestedFirst = (requestedPage - 1) * pageSize
val requestedIncomplete =
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean

val allApps = parent.getApplicationList()
val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete)
val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))

Expand Down Expand Up @@ -65,25 +67,26 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {

<h4>
Showing {actualFirst + 1}-{last + 1} of {allApps.size}
<span style="float: right">
{
if (actualPage > 1) {
<a href={"/?page=" + (actualPage - 1)}>&lt; </a>
<a href={"/?page=1"}>1</a>
}
{if (requestedIncomplete) "(Incomplete applications)"}
<span style="float: right">
{
if (actualPage > 1) {
<a href={makePageLink(actualPage - 1, requestedIncomplete)}>&lt; </a>
<a href={makePageLink(1, requestedIncomplete)}>1</a>
}
{if (actualPage - plusOrMinus > secondPageFromLeft) " ... "}
{leftSideIndices}
{actualPage}
{rightSideIndices}
{if (actualPage + plusOrMinus < secondPageFromRight) " ... "}
{
if (actualPage < pageCount) {
<a href={"/?page=" + pageCount}>{pageCount}</a>
<a href={"/?page=" + (actualPage + 1)}> &gt;</a>
}
}
{if (actualPage - plusOrMinus > secondPageFromLeft) " ... "}
{leftSideIndices}
{actualPage}
{rightSideIndices}
{if (actualPage + plusOrMinus < secondPageFromRight) " ... "}
{
if (actualPage < pageCount) {
<a href={makePageLink(pageCount, requestedIncomplete)}>{pageCount}</a>
<a href={makePageLink(actualPage + 1, requestedIncomplete)}> &gt;</a>
}
</span>
}
</span>
</h4> ++
appTable
} else {
Expand All @@ -96,6 +99,15 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
</p>
}
}
<a href={makePageLink(actualPage, !requestedIncomplete)}>
{
if (requestedIncomplete) {
"Back to completed applications"
} else {
"Show incomplete applications"
}
}
</a>
</div>
</div>
UIUtils.basicSparkPage(content, "History Server")
Expand All @@ -117,8 +129,9 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
val startTime = UIUtils.formatDate(info.startTime)
val endTime = UIUtils.formatDate(info.endTime)
val duration = UIUtils.formatDuration(info.endTime - info.startTime)
val endTime = if (info.endTime > 0) UIUtils.formatDate(info.endTime) else "-"
val duration =
if (info.endTime > 0) UIUtils.formatDuration(info.endTime - info.startTime) else "-"
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
<tr>
<td><a href={uiAddress}>{info.id}</a></td>
Expand All @@ -130,4 +143,11 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
<td sorttable_customkey={info.lastUpdated.toString}>{lastUpdated}</td>
</tr>
}

private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = {
"/?" + Array(
"page=" + linkPage,
"showIncomplete=" + showIncomplete
).mkString("&")
}
}
25 changes: 17 additions & 8 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,6 @@ private[spark] class Master(
private[spark] object Master extends Logging {
val systemName = "sparkMaster"
private val actorName = "Master"
val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r

def main(argStrings: Array[String]) {
SignalLogger.register(log)
Expand All @@ -855,14 +854,24 @@ private[spark] object Master extends Logging {
actorSystem.awaitTermination()
}

/** Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
/**
* Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:port`.
*
* @throws SparkException if the url is invalid
*/
def toAkkaUrl(sparkUrl: String): String = {
sparkUrl match {
case sparkUrlRegex(host, port) =>
"akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
case _ =>
throw new SparkException("Invalid master URL: " + sparkUrl)
}
val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
"akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
}

/**
* Returns an akka `Address` for the Master actor given a sparkUrl `spark://host:port`.
*
* @throws SparkException if the url is invalid
*/
def toAkkaAddress(sparkUrl: String): Address = {
val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
Address("akka.tcp", systemName, host, port)
}

def startSystemAndActor(
Expand Down
Loading

0 comments on commit ae56514

Please sign in to comment.