Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into rest
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Feb 4, 2015
2 parents 792e112 + e380d2d commit bbbd329
Show file tree
Hide file tree
Showing 246 changed files with 16,540 additions and 2,785 deletions.
12 changes: 11 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@
<artifactId>jetty-http</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-continuation</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
Expand Down Expand Up @@ -385,7 +395,7 @@
<overWriteIfNewer>true</overWriteIfNewer>
<useSubDirectoryPerType>true</useSubDirectoryPerType>
<includeArtifactIds>
guava,jetty-io,jetty-http,jetty-plus,jetty-util,jetty-server
guava,jetty-io,jetty-servlet,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server
</includeArtifactIds>
<silent>true</silent>
</configuration>
Expand Down
115 changes: 115 additions & 0 deletions core/src/main/java/org/apache/spark/SparkFirehoseListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark;

import org.apache.spark.scheduler.*;

/**
* Class that allows users to receive all SparkListener events.
* Users should override the onEvent method.
*
* This is a concrete Java class in order to ensure that we don't forget to update it when adding
* new methods to SparkListener: forgetting to add a method will result in a compilation error (if
* this was a concrete Scala class, default implementations of new event handlers would be inherited
* from the SparkListener trait).
*/
public class SparkFirehoseListener implements SparkListener {

public void onEvent(SparkListenerEvent event) { }

@Override
public final void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
onEvent(stageCompleted);
}

@Override
public final void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
onEvent(stageSubmitted);
}

@Override
public final void onTaskStart(SparkListenerTaskStart taskStart) {
onEvent(taskStart);
}

@Override
public final void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
onEvent(taskGettingResult);
}

@Override
public final void onTaskEnd(SparkListenerTaskEnd taskEnd) {
onEvent(taskEnd);
}

@Override
public final void onJobStart(SparkListenerJobStart jobStart) {
onEvent(jobStart);
}

@Override
public final void onJobEnd(SparkListenerJobEnd jobEnd) {
onEvent(jobEnd);
}

@Override
public final void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
onEvent(environmentUpdate);
}

@Override
public final void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
onEvent(blockManagerAdded);
}

@Override
public final void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
onEvent(blockManagerRemoved);
}

@Override
public final void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
onEvent(unpersistRDD);
}

@Override
public final void onApplicationStart(SparkListenerApplicationStart applicationStart) {
onEvent(applicationStart);
}

@Override
public final void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
onEvent(applicationEnd);
}

@Override
public final void onExecutorMetricsUpdate(
SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
onEvent(executorMetricsUpdate);
}

@Override
public final void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
onEvent(executorAdded);
}

@Override
public final void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) {
onEvent(executorRemoved);
}
}
126 changes: 0 additions & 126 deletions core/src/main/java/org/apache/spark/TaskContext.java

This file was deleted.

2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[spark] class HttpFileServer(
var serverUri : String = null

def initialize() {
baseDir = Utils.createTempDir()
baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd")
fileDir = new File(baseDir, "files")
jarDir = new File(baseDir, "jars")
fileDir.mkdir()
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import java.io.File

import org.eclipse.jetty.server.ssl.SslSocketConnector
import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.security.authentication.DigestAuthenticator
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
Expand Down Expand Up @@ -72,7 +73,10 @@ private[spark] class HttpServer(
*/
private def doStart(startPort: Int): (Server, Int) = {
val server = new Server()
val connector = new SocketConnector

val connector = securityManager.fileServerSSLOptions.createJettySslContextFactory()
.map(new SslSocketConnector(_)).getOrElse(new SocketConnector)

connector.setMaxIdleTime(60 * 1000)
connector.setSoLingerTime(-1)
connector.setPort(startPort)
Expand Down Expand Up @@ -149,13 +153,14 @@ private[spark] class HttpServer(
}

/**
* Get the URI of this HTTP server (http://host:port)
* Get the URI of this HTTP server (http://host:port or https://host:port)
*/
def uri: String = {
if (server == null) {
throw new ServerStateException("Server is not started")
} else {
"http://" + Utils.localIpAddress + ":" + port
val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http"
s"$scheme://${Utils.localIpAddress}:$port"
}
}
}
Loading

0 comments on commit bbbd329

Please sign in to comment.