Skip to content

Commit

Permalink
Merge pull request #9 from apache/master
Browse files Browse the repository at this point in the history
sync
  • Loading branch information
GuoPhilipse authored Jun 2, 2020
2 parents c3b3c89 + 64cb6f7 commit c3546eb
Show file tree
Hide file tree
Showing 125 changed files with 1,658 additions and 727 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,13 @@
* limitations under the License.
*/

package org.apache.spark.sql.hive.thriftserver
package org.apache.spark.tags;

import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.catalog.CatalogTableType.{EXTERNAL, MANAGED, VIEW}
import java.lang.annotation.*;

/**
* Utils for metadata operations.
*/
private[hive] trait SparkMetadataOperationUtils {
import org.scalatest.TagAnnotation;

def tableTypeString(tableType: CatalogTableType): String = tableType match {
case EXTERNAL | MANAGED => "TABLE"
case VIEW => "VIEW"
case t =>
throw new IllegalArgumentException(s"Unknown table type is found: $t")
}
}
@TagAnnotation
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface ChromeUITest { }
10 changes: 4 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1780,10 +1780,9 @@ abstract class RDD[T: ClassTag](
* It will result in new executors with the resources specified being acquired to
* calculate the RDD.
*/
// PRIVATE for now, added for testing purposes, will be made public with SPARK-29150
@Experimental
@Since("3.0.0")
private[spark] def withResources(rp: ResourceProfile): this.type = {
@Since("3.1.0")
def withResources(rp: ResourceProfile): this.type = {
resourceProfile = Option(rp)
sc.resourceProfileManager.addResourceProfile(resourceProfile.get)
this
Expand All @@ -1794,10 +1793,9 @@ abstract class RDD[T: ClassTag](
* @return the user specified ResourceProfile or null (for Java compatibility) if
* none was specified
*/
// PRIVATE for now, added for testing purposes, will be made public with SPARK-29150
@Experimental
@Since("3.0.0")
private[spark] def getResourceProfile(): ResourceProfile = resourceProfile.getOrElse(null)
@Since("3.1.0")
def getResourceProfile(): ResourceProfile = resourceProfile.getOrElse(null)

// =======================================================================
// Other internal methods and fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.resource

import org.apache.spark.annotation.{Evolving, Since}

/**
* An Executor resource request. This is used in conjunction with the ResourceProfile to
* programmatically specify the resources needed for an RDD that will be applied at the
Expand Down Expand Up @@ -46,11 +48,10 @@ package org.apache.spark.resource
* allocated. The script runs on Executors startup to discover the addresses
* of the resources available.
* @param vendor Optional vendor, required for some cluster managers
*
* This api is currently private until the rest of the pieces are in place and then it
* will become public.
*/
private[spark] class ExecutorResourceRequest(
@Evolving
@Since("3.1.0")
class ExecutorResourceRequest(
val resourceName: String,
val amount: Long,
val discoveryScript: String = "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._

import org.apache.spark.annotation.{Evolving, Since}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.resource.ResourceProfile._

/**
* A set of Executor resource requests. This is used in conjunction with the ResourceProfile to
* programmatically specify the resources needed for an RDD that will be applied at the
* stage level.
*
* This api is currently private until the rest of the pieces are in place and then it
* will become public.
*/
private[spark] class ExecutorResourceRequests() extends Serializable {
@Evolving
@Since("3.1.0")
class ExecutorResourceRequests() extends Serializable {

private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.annotation.Evolving
import org.apache.spark.annotation.{Evolving, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
Expand All @@ -37,6 +37,7 @@ import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
* This is meant to be immutable so user can't change it after building.
*/
@Evolving
@Since("3.1.0")
class ResourceProfile(
val executorResources: Map[String, ExecutorResourceRequest],
val taskResources: Map[String, TaskResourceRequest]) extends Serializable with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,19 @@ import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._

import org.apache.spark.annotation.Evolving
import org.apache.spark.annotation.{Evolving, Since}


/**
* Resource profile builder to build a Resource profile to associate with an RDD.
* A ResourceProfile allows the user to specify executor and task requirements for an RDD
* that will get applied during a stage. This allows the user to change the resource
* requirements between stages.
*
*/
@Evolving
private[spark] class ResourceProfileBuilder() {
@Since("3.1.0")
class ResourceProfileBuilder() {

private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()
private val _executorResources = new ConcurrentHashMap[String, ExecutorResourceRequest]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

package org.apache.spark.resource

import org.apache.spark.annotation.{Evolving, Since}

/**
* A task resource request. This is used in conjuntion with the ResourceProfile to
* programmatically specify the resources needed for an RDD that will be applied at the
* stage level.
*
* Use TaskResourceRequests class as a convenience API.
*
* This api is currently private until the rest of the pieces are in place and then it
* will become public.
*/
private[spark] class TaskResourceRequest(val resourceName: String, val amount: Double)
@Evolving
@Since("3.1.0")
class TaskResourceRequest(val resourceName: String, val amount: Double)
extends Serializable {

assert(amount <= 0.5 || amount % 1 == 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._

import org.apache.spark.annotation.{Evolving, Since}
import org.apache.spark.resource.ResourceProfile._

/**
* A set of task resource requests. This is used in conjunction with the ResourceProfile to
* programmatically specify the resources needed for an RDD that will be applied at the
* stage level.
*
* This api is currently private until the rest of the pieces are in place and then it
* will become public.
*/
private[spark] class TaskResourceRequests() extends Serializable {
@Evolving
@Since("3.1.0")
class TaskResourceRequests() extends Serializable {

private val _taskResources = new ConcurrentHashMap[String, TaskResourceRequest]()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1107,10 +1107,19 @@ private[spark] class TaskSetManager(
def recomputeLocality(): Unit = {
// A zombie TaskSetManager may reach here while executorLost happens
if (isZombie) return
val previousLocalityIndex = currentLocalityIndex
val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
val previousMyLocalityLevels = myLocalityLevels
myLocalityLevels = computeValidLocalityLevels()
localityWaits = myLocalityLevels.map(getLocalityWait)
currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
if (currentLocalityIndex > previousLocalityIndex) {
// SPARK-31837: If the new level is more local, shift to the new most local locality
// level in terms of better data locality. For example, say the previous locality
// levels are [PROCESS, NODE, ANY] and current level is ANY. After recompute, the
// locality levels are [PROCESS, NODE, RACK, ANY]. Then, we'll shift to RACK level.
currentLocalityIndex = getLocalityIndex(myLocalityLevels.diff(previousMyLocalityLevels).head)
}
}

def executorAdded(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.history

import org.openqa.selenium.WebDriver
import org.openqa.selenium.chrome.{ChromeDriver, ChromeOptions}

import org.apache.spark.tags.ChromeUITest

/**
* Tests for HistoryServer with Chrome.
*/
@ChromeUITest
class ChromeUIHistoryServerSuite
extends RealBrowserUIHistoryServerSuite("webdriver.chrome.driver") {

override var webDriver: WebDriver = _

override def beforeAll(): Unit = {
super.beforeAll()
val chromeOptions = new ChromeOptions
chromeOptions.addArguments("--headless", "--disable-gpu")
webDriver = new ChromeDriver(chromeOptions)
}

override def afterAll(): Unit = {
try {
if (webDriver != null) {
webDriver.quit()
}
} finally {
super.afterAll()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import scala.concurrent.duration._
import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.eclipse.jetty.proxy.ProxyServlet
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods
import org.json4s.jackson.JsonMethods._
Expand Down Expand Up @@ -336,66 +334,6 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
assert(response.contains(SPARK_VERSION))
}

test("ajax rendered relative links are prefixed with uiRoot (spark.ui.proxyBase)") {
val uiRoot = "/testwebproxybase"
System.setProperty("spark.ui.proxyBase", uiRoot)

stop()
init()

val port = server.boundPort

val servlet = new ProxyServlet {
override def rewriteTarget(request: HttpServletRequest): String = {
// servlet acts like a proxy that redirects calls made on
// spark.ui.proxyBase context path to the normal servlet handlers operating off "/"
val sb = request.getRequestURL()

if (request.getQueryString() != null) {
sb.append(s"?${request.getQueryString()}")
}

val proxyidx = sb.indexOf(uiRoot)
sb.delete(proxyidx, proxyidx + uiRoot.length).toString
}
}

val contextHandler = new ServletContextHandler
val holder = new ServletHolder(servlet)
contextHandler.setContextPath(uiRoot)
contextHandler.addServlet(holder, "/")
server.attachHandler(contextHandler)

implicit val webDriver: WebDriver = new HtmlUnitDriver(true)

try {
val url = s"http://localhost:$port"

go to s"$url$uiRoot"

// expect the ajax call to finish in 5 seconds
implicitlyWait(org.scalatest.time.Span(5, org.scalatest.time.Seconds))

// once this findAll call returns, we know the ajax load of the table completed
findAll(ClassNameQuery("odd"))

val links = findAll(TagNameQuery("a"))
.map(_.attribute("href"))
.filter(_.isDefined)
.map(_.get)
.filter(_.startsWith(url)).toList

// there are at least some URL links that were generated via javascript,
// and they all contain the spark.ui.proxyBase (uiRoot)
links.length should be > 4
all(links) should startWith(url + uiRoot)
} finally {
contextHandler.stop()
quit()
}

}

/**
* Verify that the security manager needed for the history server can be instantiated
* when `spark.authenticate` is `true`, rather than raise an `IllegalArgumentException`.
Expand Down
Loading

0 comments on commit c3546eb

Please sign in to comment.