Skip to content

Commit

Permalink
[SPARK-4668] Fix some documentation typos.
Browse files Browse the repository at this point in the history
Author: Ryan Williams <ryan.blake.williams@gmail.com>

Closes #3523 from ryan-williams/tweaks and squashes the following commits:

d2eddaa [Ryan Williams] code review feedback
ce27fc1 [Ryan Williams] CoGroupedRDD comment nit
c6cfad9 [Ryan Williams] remove unnecessary if statement
b74ea35 [Ryan Williams] comment fix
b0221f0 [Ryan Williams] fix a gendered pronoun
c71ffed [Ryan Williams] use names on a few boolean parameters
89954aa [Ryan Williams] clarify some comments in {Security,Shuffle}Manager
e465dac [Ryan Williams] Saved building-spark.md with Dillinger.io
83e8358 [Ryan Williams] fix pom.xml typo
dc4662b [Ryan Williams] typo fixes in tuning.md, configuration.md
  • Loading branch information
ryan-williams authored and pwendell committed Dec 15, 2014
1 parent 38703bb commit 8176b7a
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 37 deletions.
14 changes: 6 additions & 8 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,12 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
var fetchedStatuses: Array[MapStatus] = null
fetching.synchronized {
if (fetching.contains(shuffleId)) {
// Someone else is fetching it; wait for them to be done
while (fetching.contains(shuffleId)) {
try {
fetching.wait()
} catch {
case e: InterruptedException =>
}
// Someone else is fetching it; wait for them to be done
while (fetching.contains(shuffleId)) {
try {
fetching.wait()
} catch {
case e: InterruptedException =>
}
}

Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,19 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* Note that SASL is pluggable as to what mechanism it uses. We currently use
* DIGEST-MD5 but this could be changed to use Kerberos or other in the future.
* Spark currently supports "auth" for the quality of protection, which means
* the connection is not supporting integrity or privacy protection (encryption)
* the connection does not support integrity or privacy protection (encryption)
* after authentication. SASL also supports "auth-int" and "auth-conf" which
* SPARK could be support in the future to allow the user to specify the quality
* SPARK could support in the future to allow the user to specify the quality
* of protection they want. If we support those, the messages will also have to
* be wrapped and unwrapped via the SaslServer/SaslClient.wrap/unwrap API's.
*
* Since the NioBlockTransferService does asynchronous messages passing, the SASL
* authentication is a bit more complex. A ConnectionManager can be both a client
* and a Server, so for a particular connection is has to determine what to do.
* and a Server, so for a particular connection it has to determine what to do.
* A ConnectionId was added to be able to track connections and is used to
* match up incoming messages with connections waiting for authentication.
* The ConnectionManager tracks all the sendingConnections using the ConnectionId
* and waits for the response from the server and does the handshake before sending
* The ConnectionManager tracks all the sendingConnections using the ConnectionId,
* waits for the response from the server, and does the handshake before sending
* the real message.
*
* The NettyBlockTransferService ensures that SASL authentication is performed
Expand All @@ -114,14 +114,14 @@ import org.apache.spark.network.sasl.SecretKeyHolder
*
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
* can be used. Yarn requires a specific AmIpFilter be installed for security to work
* properly. For non-Yarn deployments, users can write a filter to go through a
* companies normal login service. If an authentication filter is in place then the
* properly. For non-Yarn deployments, users can write a filter to go through their
* organization's normal login service. If an authentication filter is in place then the
* SparkUI can be configured to check the logged in user against the list of users who
* have view acls to see if that user is authorized.
* The filters can also be used for many different purposes. For instance filters
* could be used for logging, encryption, or compression.
*
* The exact mechanisms used to generate/distributed the shared secret is deployment specific.
* The exact mechanisms used to generate/distribute the shared secret are deployment-specific.
*
* For Yarn deployments, the secret is automatically generated using the Akka remote
* Crypt.generateSecureCookie() API. The secret is placed in the Hadoop UGI which gets passed
Expand All @@ -138,7 +138,7 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* All the nodes (Master and Workers) and the applications need to have the same shared secret.
* This again is not ideal as one user could potentially affect another users application.
* This should be enhanced in the future to provide better protection.
* If the UI needs to be secured the user needs to install a javax servlet filter to do the
* If the UI needs to be secure, the user needs to install a javax servlet filter to do the
* authentication. Spark will then use that user to compare against the view acls to do
* authorization. If not filter is in place the user is generally null and no authorization
* can take place.
Expand Down
22 changes: 19 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,15 @@ object SparkEnv extends Logging {
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
val hostname = conf.get("spark.driver.host")
val port = conf.get("spark.driver.port").toInt
create(conf, SparkContext.DRIVER_IDENTIFIER, hostname, port, true, isLocal, listenerBus)
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
hostname,
port,
isDriver = true,
isLocal = isLocal,
listenerBus = listenerBus
)
}

/**
Expand All @@ -171,8 +179,16 @@ object SparkEnv extends Logging {
numCores: Int,
isLocal: Boolean,
actorSystem: ActorSystem = null): SparkEnv = {
create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem,
numUsableCores = numCores)
create(
conf,
executorId,
hostname,
port,
isDriver = false,
isLocal = isLocal,
defaultActorSystem = actorSystem,
numUsableCores = numCores
)
}

/**
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]
* A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
* tuple with the list of values for that key.
*
* Note: This is an internal API. We recommend users use RDD.coGroup(...) instead of
* Note: This is an internal API. We recommend users use RDD.cogroup(...) instead of
* instantiating this directly.
* @param rdds parent RDDs.
Expand All @@ -70,8 +70,8 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {

// For example, `(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs).
// Each ArrayBuffer is represented as a CoGroup, and the resulting Seq as a CoGroupCombiner.
// For example, `(k, a) cogroup (k, b)` produces k -> Array(ArrayBuffer as, ArrayBuffer bs).
// Each ArrayBuffer is represented as a CoGroup, and the resulting Array as a CoGroupCombiner.
// CoGroupValue is the intermediate state of each value before being merged in compute.
private type CoGroup = CompactBuffer[Any]
private type CoGroupValue = (Any, Int) // Int is dependency number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private[spark] class CompressedMapStatus(

/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
* plus a bitmap for tracking which blocks are non-empty. During serialization, this bitmap
* plus a bitmap for tracking which blocks are empty. During serialization, this bitmap
* is compressed.
*
* @param loc location where the task is being executed
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.util.Utils
private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {

final def run(attemptId: Long): T = {
context = new TaskContextImpl(stageId, partitionId, attemptId, false)
context = new TaskContextImpl(stageId, partitionId, attemptId, runningLocally = false)
TaskContextHelper.setTaskContext(context)
context.taskMetrics.hostname = Utils.localHostName()
taskThread = Thread.currentThread()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.shuffle
import org.apache.spark.{TaskContext, ShuffleDependency}

/**
* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on both the
* driver and executors, based on the spark.shuffle.manager setting. The driver registers shuffles
* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the driver
* and on each executor, based on the spark.shuffle.manager setting. The driver registers shuffles
* with it, and executors (or tasks running locally in the driver) can ask to read and write data.
*
* NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
Expand Down
16 changes: 15 additions & 1 deletion docs/building-spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,21 @@ We use the scala-maven-plugin which supports incremental and continuous compilat

mvn scala:cc

should run continuous compilation (i.e. wait for changes). However, this has not been tested extensively.
should run continuous compilation (i.e. wait for changes). However, this has not been tested
extensively. A couple of gotchas to note:
* it only scans the paths `src/main` and `src/test` (see
[docs](http://scala-tools.org/mvnsites/maven-scala-plugin/usage_cc.html)), so it will only work
from within certain submodules that have that structure.
* you'll typically need to run `mvn install` from the project root for compilation within
specific submodules to work; this is because submodules that depend on other submodules do so via
the `spark-parent` module).

Thus, the full flow for running continuous-compilation of the `core` submodule may look more like:
```
$ mvn install
$ cd core
$ mvn scala:cc
```

# Using With IntelliJ IDEA

Expand Down
6 changes: 3 additions & 3 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ in the `spark-defaults.conf` file.

The application web UI at `http://<driver>:4040` lists Spark properties in the "Environment" tab.
This is a useful place to check to make sure that your properties have been set correctly. Note
that only values explicitly specified through either `spark-defaults.conf` or SparkConf will
appear. For all other configuration properties, you can assume the default value is used.
that only values explicitly specified through `spark-defaults.conf`, `SparkConf`, or the command
line will appear. For all other configuration properties, you can assume the default value is used.

## Available Properties

Expand Down Expand Up @@ -310,7 +310,7 @@ Apart from these, the following properties are also available, and may be useful
<td>(none)</td>
<td>
Add the environment variable specified by <code>EnvironmentVariableName</code> to the Executor
process. The user can specify multiple of these and to set multiple environment variables.
process. The user can specify multiple of these to set multiple environment variables.
</td>
</tr>
<tr>
Expand Down
8 changes: 4 additions & 4 deletions docs/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pointer-based data structures and wrapper objects. There are several ways to do
3. Consider using numeric IDs or enumeration objects instead of strings for keys.
4. If you have less than 32 GB of RAM, set the JVM flag `-XX:+UseCompressedOops` to make pointers be
four bytes instead of eight. You can add these options in
[`spark-env.sh`](configuration.html#environment-variables-in-spark-envsh).
[`spark-env.sh`](configuration.html#environment-variables).

## Serialized RDD Storage

Expand Down Expand Up @@ -154,7 +154,7 @@ By default, Spark uses 60% of the configured executor memory (`spark.executor.me
cache RDDs. This means that 40% of memory is available for any objects created during task execution.

In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of
memory, lowering this value will help reduce the memory consumption. To change this to say 50%, you can call
memory, lowering this value will help reduce the memory consumption. To change this to, say, 50%, you can call
`conf.set("spark.storage.memoryFraction", "0.5")` on your SparkConf. Combined with the use of serialized caching,
using a smaller cache should be sufficient to mitigate most of the garbage collection problems.
In case you are interested in further tuning the Java GC, continue reading below.
Expand Down Expand Up @@ -190,7 +190,7 @@ temporary objects created during task execution. Some steps which may be useful

* As an example, if your task is reading data from HDFS, the amount of memory used by the task can be estimated using
the size of the data block read from HDFS. Note that the size of a decompressed block is often 2 or 3 times the
size of the block. So if we wish to have 3 or 4 tasks worth of working space, and the HDFS block size is 64 MB,
size of the block. So if we wish to have 3 or 4 tasks' worth of working space, and the HDFS block size is 64 MB,
we can estimate size of Eden to be `4*3*64MB`.

* Monitor how the frequency and time taken by garbage collection changes with the new settings.
Expand Down Expand Up @@ -219,7 +219,7 @@ working set of one of your tasks, such as one of the reduce tasks in `groupByKey
Spark's shuffle operations (`sortByKey`, `groupByKey`, `reduceByKey`, `join`, etc) build a hash table
within each task to perform the grouping, which can often be large. The simplest fix here is to
*increase the level of parallelism*, so that each task's input set is smaller. Spark can efficiently
support tasks as short as 200 ms, because it reuses one worker JVMs across all tasks and it has
support tasks as short as 200 ms, because it reuses one executor JVM across many tasks and it has
a low task launching cost, so you can safely increase the level of parallelism to more than the
number of cores in your clusters.

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@
<version>1.0.0</version>
</dependency>
<!--
This depndency has been added to provided scope as it is needed for excuting build
This depndency has been added to provided scope as it is needed for executing build
specific groovy scripts using gmaven+ and not required for downstream project building
with spark.
-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorRec
* As Actors can also be used to receive data from almost any stream source.
* A nice set of abstraction(s) for actors as receivers is already provided for
* a few general cases. It is thus exposed as an API where user may come with
* his own Actor to run as receiver for Spark Streaming input source.
* their own Actor to run as receiver for Spark Streaming input source.
*
* This starts a supervisor actor which starts workers and also provides
* [http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html fault-tolerance].
Expand Down

0 comments on commit 8176b7a

Please sign in to comment.