Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
planga82 committed Sep 7, 2019
2 parents 247d667 + 723faad commit 5956468
Show file tree
Hide file tree
Showing 170 changed files with 8,783 additions and 1,393 deletions.
1 change: 1 addition & 0 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ javax.xml.stream:stax-api https://jcp.org/en/jsr/detail?id=173
Common Development and Distribution License (CDDL) 1.1
------------------------------------------------------

javax.el:javax.el-api https://javaee.github.io/uel-ri/
javax.servlet:javax.servlet-api https://javaee.github.io/servlet-spec/
javax.transaction:jta http://www.oracle.com/technetwork/java/index.html
javax.xml.bind:jaxb-api https://github.com/javaee/jaxb-v2
Expand Down
19 changes: 11 additions & 8 deletions R/WINDOWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,28 @@ license: |

To build SparkR on Windows, the following steps are required

1. Install R (>= 3.1) and [Rtools](https://cloud.r-project.org/bin/windows/Rtools/). Make sure to
1. Make sure `bash` is available and in `PATH` if you already have a built-in `bash` on Windows. If you do not have, install [Cygwin](https://www.cygwin.com/).

2. Install R (>= 3.1) and [Rtools](https://cloud.r-project.org/bin/windows/Rtools/). Make sure to
include Rtools and R in `PATH`. Note that support for R prior to version 3.4 is deprecated as of Spark 3.0.0.

2. Install
[JDK8](https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) and set
`JAVA_HOME` in the system environment variables.
3. Install JDK that SparkR supports (see `R/pkg/DESCRIPTION`), and set `JAVA_HOME` in the system environment variables.

3. Download and install [Maven](https://maven.apache.org/download.html). Also include the `bin`
4. Download and install [Maven](https://maven.apache.org/download.html). Also include the `bin`
directory in Maven in `PATH`.

4. Set `MAVEN_OPTS` as described in [Building Spark](https://spark.apache.org/docs/latest/building-spark.html).
5. Set `MAVEN_OPTS` as described in [Building Spark](https://spark.apache.org/docs/latest/building-spark.html).

5. Open a command shell (`cmd`) in the Spark directory and build Spark with [Maven](https://spark.apache.org/docs/latest/building-spark.html#buildmvn) and include the `-Psparkr` profile to build the R package. For example to use the default Hadoop versions you can run
6. Open a command shell (`cmd`) in the Spark directory and build Spark with [Maven](https://spark.apache.org/docs/latest/building-spark.html#buildmvn) and include the `-Psparkr` profile to build the R package. For example to use the default Hadoop versions you can run

```bash
mvn.cmd -DskipTests -Psparkr package
```

`.\build\mvn` is a shell script so `mvn.cmd` should be used directly on Windows.
Note that `.\build\mvn` is a shell script so `mvn.cmd` on the system should be used directly on Windows.
Make sure your Maven version is matched to `maven.version` in `./pom.xml`.

Note that it is a workaround for SparkR developers on Windows. Apache Spark does not officially support to _build_ on Windows yet whereas it supports to _run_ on Windows.

## Unit tests

Expand Down
3 changes: 2 additions & 1 deletion R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,8 @@ loadDF <- function(path = NULL, source = NULL, schema = NULL, ...) {
#'
#' @param url JDBC database url of the form \code{jdbc:subprotocol:subname}
#' @param tableName the name of the table in the external database
#' @param partitionColumn the name of a column of integral type that will be used for partitioning
#' @param partitionColumn the name of a column of numeric, date, or timestamp type
#' that will be used for partitioning.
#' @param lowerBound the minimum value of \code{partitionColumn} used to decide partition stride
#' @param upperBound the maximum value of \code{partitionColumn} used to decide partition stride
#' @param numPartitions the number of partitions, This, along with \code{lowerBound} (inclusive),
Expand Down
9 changes: 9 additions & 0 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ install_mvn() {
fi
if [ $(version $MVN_DETECTED_VERSION) -lt $(version $MVN_VERSION) ]; then
local APACHE_MIRROR=${APACHE_MIRROR:-'https://www.apache.org/dyn/closer.lua?action=download&filename='}

if [ $(command -v curl) ]; then
local TEST_MIRROR_URL="${APACHE_MIRROR}/maven/maven-3/${MVN_VERSION}/binaries/apache-maven-${MVN_VERSION}-bin.tar.gz"
if ! curl -L --output /dev/null --silent --head --fail "$TEST_MIRROR_URL" ; then
# Fall back to archive.apache.org for older Maven
echo "Falling back to archive.apache.org to download Maven"
APACHE_MIRROR="https://archive.apache.org/dist"
fi
fi

install_app \
"${APACHE_MIRROR}/maven/maven-3/${MVN_VERSION}/binaries" \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,13 @@ public MapIterator iterator() {
return new MapIterator(numValues, loc, false);
}

/**
* Returns a thread safe iterator that iterates of the entries of this map.
*/
public MapIterator safeIterator() {
return new MapIterator(numValues, new Location(), false);
}

/**
* Returns a destructive iterator for iterating over the entries of this map. It frees each page
* as it moves onto next one. Notice: it is illegal to call any method on the map after
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ <h4 class="title-table">Executors</h4>
Off Heap Storage Memory</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Disk Used">Disk Used</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Cores">Cores</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Resources">Resources</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Active Tasks">Active Tasks</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Failed Tasks">Failed Tasks</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Complete Tasks">Complete Tasks</span></th>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ function formatStatus(status, type, row) {
return "Dead"
}

function formatResourceCells(resources) {
var result = ""
var count = 0
$.each(resources, function (name, resInfo) {
if (count > 0) {
result += ", "
}
result += name + ': [' + resInfo.addresses.join(", ") + ']'
count += 1
});
return result
}

jQuery.extend(jQuery.fn.dataTableExt.oSort, {
"title-numeric-pre": function (a) {
var x = a.match(/title="*(-?[0-9\.]+)/)[1];
Expand Down Expand Up @@ -106,7 +119,7 @@ function totalDurationColor(totalGCTime, totalDuration) {
}

var sumOptionalColumns = [3, 4];
var execOptionalColumns = [5, 6];
var execOptionalColumns = [5, 6, 9];
var execDataTable;
var sumDataTable;

Expand Down Expand Up @@ -401,6 +414,7 @@ $(document).ready(function () {
},
{data: 'diskUsed', render: formatBytes},
{data: 'totalCores'},
{name: 'resourcesCol', data: 'resources', render: formatResourceCells, orderable: false},
{
data: 'activeTasks',
"fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
Expand Down Expand Up @@ -446,7 +460,8 @@ $(document).ready(function () {
"order": [[0, "asc"]],
"columnDefs": [
{"visible": false, "targets": 5},
{"visible": false, "targets": 6}
{"visible": false, "targets": 6},
{"visible": false, "targets": 9}
]
};

Expand Down Expand Up @@ -553,6 +568,7 @@ $(document).ready(function () {
"<div><input type='checkbox' class='toggle-vis' id='select-all-box'>Select All</div>" +
"<div id='on_heap_memory' class='on-heap-memory-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='3' data-exec-col-idx='5'>On Heap Memory</div>" +
"<div id='off_heap_memory' class='off-heap-memory-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='4' data-exec-col-idx='6'>Off Heap Memory</div>" +
"<div id='extra_resources' class='resources-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='9'>Resources</div>" +
"</div>");

reselectCheckboxesBasedOnTaskTableState();
Expand Down Expand Up @@ -584,8 +600,10 @@ $(document).ready(function () {
var execCol = execDataTable.column(execColIdx);
execCol.visible(!execCol.visible());
var sumColIdx = thisBox.attr("data-sum-col-idx");
var sumCol = sumDataTable.column(sumColIdx);
sumCol.visible(!sumCol.visible());
if (sumColIdx) {
var sumCol = sumDataTable.column(sumColIdx);
sumCol.visible(!sumCol.visible());
}
}
});

Expand Down
50 changes: 10 additions & 40 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -678,8 +678,11 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
val mapStatuses: Map[Int, Array[MapStatus]] =
new ConcurrentHashMap[Int, Array[MapStatus]]().asScala

/** Remembers which map output locations are currently being fetched on an executor. */
private val fetching = new HashSet[Int]
/**
* A [[KeyLock]] whose key is a shuffle id to ensure there is only one thread fetching
* the same shuffle block.
*/
private val fetchingLock = new KeyLock[Int]

// Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result.
override def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
Expand Down Expand Up @@ -707,51 +710,18 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr
if (statuses == null) {
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
val startTimeNs = System.nanoTime()
var fetchedStatuses: Array[MapStatus] = null
fetching.synchronized {
// Someone else is fetching it; wait for them to be done
while (fetching.contains(shuffleId)) {
try {
fetching.wait()
} catch {
case e: InterruptedException =>
}
}

// Either while we waited the fetch happened successfully, or
// someone fetched it in between the get and the fetching.synchronized.
fetchedStatuses = mapStatuses.get(shuffleId).orNull
fetchingLock.withLock(shuffleId) {
var fetchedStatuses = mapStatuses.get(shuffleId).orNull
if (fetchedStatuses == null) {
// We have to do the fetch, get others to wait for us.
fetching += shuffleId
}
}

if (fetchedStatuses == null) {
// We won the race to fetch the statuses; do so
logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
// This try-finally prevents hangs due to timeouts:
try {
logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses)
} finally {
fetching.synchronized {
fetching -= shuffleId
fetching.notifyAll()
}
}
}
logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms")

if (fetchedStatuses != null) {
logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms")
fetchedStatuses
} else {
logError("Missing all output locations for shuffle " + shuffleId)
throw new MetadataFetchFailedException(
shuffleId, -1, "Missing all output locations for shuffle " + shuffleId)
}
} else {
statuses
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.broadcast

import java.util.Collections
import java.util.concurrent.atomic.AtomicLong

import scala.reflect.ClassTag
Expand Down Expand Up @@ -55,9 +56,11 @@ private[spark] class BroadcastManager(

private val nextBroadcastId = new AtomicLong(0)

private[broadcast] val cachedValues = {
new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
}
private[broadcast] val cachedValues =
Collections.synchronizedMap(
new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
.asInstanceOf[java.util.Map[Any, Any]]
)

def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
val bid = nextBroadcastId.getAndIncrement()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.internal.{config, Logging}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage._
import org.apache.spark.util.Utils
import org.apache.spark.util.{KeyLock, Utils}
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}

/**
Expand Down Expand Up @@ -167,7 +167,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
bm.getLocalBytes(pieceId) match {
case Some(block) =>
blocks(pid) = block
releaseLock(pieceId)
releaseBlockManagerLock(pieceId)
case None =>
bm.getRemoteBytes(pieceId) match {
case Some(b) =>
Expand Down Expand Up @@ -215,8 +215,10 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
}

private def readBroadcastBlock(): T = Utils.tryOrIOException {
val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
broadcastCache.synchronized {
TorrentBroadcast.torrentBroadcastLock.withLock(broadcastId) {
// As we only lock based on `broadcastId`, whenever using `broadcastCache`, we should only
// touch `broadcastId`.
val broadcastCache = SparkEnv.get.broadcastManager.cachedValues

Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
setConf(SparkEnv.get.conf)
Expand All @@ -225,7 +227,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
case Some(blockResult) =>
if (blockResult.data.hasNext) {
val x = blockResult.data.next().asInstanceOf[T]
releaseLock(broadcastId)
releaseBlockManagerLock(broadcastId)

if (x != null) {
broadcastCache.put(broadcastId, x)
Expand Down Expand Up @@ -270,7 +272,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
* If running in a task, register the given block's locks for release upon task completion.
* Otherwise, if not running in a task then immediately release the lock.
*/
private def releaseLock(blockId: BlockId): Unit = {
private def releaseBlockManagerLock(blockId: BlockId): Unit = {
val blockManager = SparkEnv.get.blockManager
Option(TaskContext.get()) match {
case Some(taskContext) =>
Expand All @@ -290,6 +292,12 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)

private object TorrentBroadcast extends Logging {

/**
* A [[KeyLock]] whose key is [[BroadcastBlockId]] to ensure there is only one thread fetching
* the same [[TorrentBroadcast]] block.
*/
private val torrentBroadcastLock = new KeyLock[BroadcastBlockId]

def blockifyObject[T: ClassTag](
obj: T,
blockSize: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private[spark] class HistoryAppStatusStore(
source.totalGCTime, source.totalInputBytes, source.totalShuffleRead,
source.totalShuffleWrite, source.isBlacklisted, source.maxMemory, source.addTime,
source.removeTime, source.removeReason, newExecutorLogs, source.memoryMetrics,
source.blacklistedInStages, source.peakMemoryMetrics, source.attributes)
source.blacklistedInStages, source.peakMemoryMetrics, source.attributes, source.resources)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils

private[spark] case class WorkerResourceInfo(name: String, addresses: Seq[String])
extends ResourceAllocator(name, addresses) {
extends ResourceAllocator {

override protected def resourceName = this.name
override protected def resourceAddresses = this.addresses

def acquire(amount: Int): ResourceInformation = {
val allocated = availableAddrs.take(amount)
acquire(allocated)
new ResourceInformation(name, allocated.toArray)
new ResourceInformation(resourceName, allocated.toArray)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class PortableDataStream(

@transient private lazy val conf = {
val bais = new ByteArrayInputStream(confBytes)
val nconf = new Configuration()
val nconf = new Configuration(false)
nconf.readFields(new DataInputStream(bais))
nconf
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private[spark] class WholeTextFileRecordReader(

override def nextKeyValue(): Boolean = {
if (!processed) {
val conf = new Configuration
val conf = getConf
val factory = new CompressionCodecFactory(conf)
val codec = factory.getCodec(path) // infers from file ext.
val fileIn = fs.open(path)
Expand Down Expand Up @@ -108,8 +108,17 @@ private[spark] class ConfigurableCombineFileRecordReader[K, V](
override def initNextRecordReader(): Boolean = {
val r = super.initNextRecordReader()
if (r) {
this.curReader.asInstanceOf[HConfigurable].setConf(getConf)
if (getConf != null) {
this.curReader.asInstanceOf[HConfigurable].setConf(getConf)
}
}
r
}

override def setConf(c: Configuration): Unit = {
super.setConf(c)
if (this.curReader != null) {
this.curReader.asInstanceOf[HConfigurable].setConf(c)
}
}
}
Loading

0 comments on commit 5956468

Please sign in to comment.