Skip to content

Commit

Permalink
SPARK-1096, a space after comment start style checker.
Browse files Browse the repository at this point in the history
Author: Prashant Sharma <prashant.s@imaginea.com>

Closes apache#124 from ScrapCodes/SPARK-1096/scalastyle-comment-check and squashes the following commits:

214135a [Prashant Sharma] Review feedback.
5eba88c [Prashant Sharma] Fixed style checks for ///+ comments.
e54b2f8 [Prashant Sharma] improved message, work around.
83e7144 [Prashant Sharma] removed dependency on scalastyle in plugin, since scalastyle sbt plugin already depends on the right version. Incase we update the plugin we will have to adjust our spark-style project to depend on right scalastyle version.
810a1d6 [Prashant Sharma] SPARK-1096, a space after comment style checker.
ba33193 [Prashant Sharma] scala style as a project
  • Loading branch information
ScrapCodes authored and pwendell committed Mar 28, 2014
1 parent 632c322 commit 60abc25
Show file tree
Hide file tree
Showing 55 changed files with 180 additions and 88 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class SparkEnv private[spark] (
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
//actorSystem.awaitTermination()
// actorSystem.awaitTermination()
}

private[spark]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ extends Logging {
private var initialized = false
private var conf: SparkConf = null
def initialize(_isDriver: Boolean, conf: SparkConf) {
TorrentBroadcast.conf = conf //TODO: we might have to fix it in tests
TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
synchronized {
if (!initialized) {
initialized = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
// TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors!
// This is unfortunate, but for now we just comment it out.
workerActorSystems.foreach(_.shutdown())
//workerActorSystems.foreach(_.awaitTermination())
// workerActorSystems.foreach(_.awaitTermination())
masterActorSystems.foreach(_.shutdown())
//masterActorSystems.foreach(_.awaitTermination())
// masterActorSystems.foreach(_.awaitTermination())
masterActorSystems.clear()
workerActorSystems.clear()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
* [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
*/
private[spark] trait LeaderElectionAgent extends Actor {
//TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
// TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
val masterActor: ActorRef
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ private[spark] class Executor(
// have left some weird state around depending on when the exception was thrown, but on
// the other hand, maybe we could detect that when future tasks fail and exit then.
logError("Exception in task ID " + taskId, t)
//System.exit(1)
}
} finally {
// TODO: Unregister shuffle memory only for ResultTask
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
}

def initialize() {
//Add default properties in case there's no properties file
// Add default properties in case there's no properties file
setDefaultProperties(properties)

// If spark.metrics.conf is not set, try to get file in class path
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/org/apache/spark/network/Connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
channel.socket.setTcpNoDelay(true)
channel.socket.setReuseAddress(true)
channel.socket.setKeepAlive(true)
/*channel.socket.setReceiveBufferSize(32768) */
/* channel.socket.setReceiveBufferSize(32768) */

@volatile private var closed = false
var onCloseCallback: Connection => Unit = null
Expand Down Expand Up @@ -206,12 +206,12 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,

private class Outbox {
val messages = new Queue[Message]()
val defaultChunkSize = 65536 //32768 //16384
val defaultChunkSize = 65536
var nextMessageToBeUsed = 0

def addMessage(message: Message) {
messages.synchronized{
/*messages += message*/
/* messages += message*/
messages.enqueue(message)
logDebug("Added [" + message + "] to outbox for sending to " +
"[" + getRemoteConnectionManagerId() + "]")
Expand All @@ -221,8 +221,8 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
def getChunk(): Option[MessageChunk] = {
messages.synchronized {
while (!messages.isEmpty) {
/*nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
/*val message = messages(nextMessageToBeUsed)*/
/* nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
/* val message = messages(nextMessageToBeUsed)*/
val message = messages.dequeue
val chunk = message.getChunkForSending(defaultChunkSize)
if (chunk.isDefined) {
Expand Down Expand Up @@ -262,7 +262,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,

val currentBuffers = new ArrayBuffer[ByteBuffer]()

/*channel.socket.setSendBufferSize(256 * 1024)*/
/* channel.socket.setSendBufferSize(256 * 1024)*/

override def getRemoteAddress() = address

Expand Down Expand Up @@ -355,7 +355,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
}
case None => {
// changeConnectionKeyInterest(0)
/*key.interestOps(0)*/
/* key.interestOps(0)*/
return false
}
}
Expand Down Expand Up @@ -540,10 +540,10 @@ private[spark] class ReceivingConnection(
return false
}

/*logDebug("Read " + bytesRead + " bytes for the buffer")*/
/* logDebug("Read " + bytesRead + " bytes for the buffer")*/

if (currentChunk.buffer.remaining == 0) {
/*println("Filled buffer at " + System.currentTimeMillis)*/
/* println("Filled buffer at " + System.currentTimeMillis)*/
val bufferMessage = inbox.getMessageForChunk(currentChunk).get
if (bufferMessage.isCompletelyReceived) {
bufferMessage.flip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
}
}
handleMessageExecutor.execute(runnable)
/*handleMessage(connection, message)*/
/* handleMessage(connection, message)*/
}

private def handleClientAuthentication(
Expand Down Expand Up @@ -733,7 +733,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
logTrace("Sending Security [" + message + "] to [" + connManagerId + "]")
val connection = connectionsById.getOrElseUpdate(connManagerId, startNewConnection())

//send security message until going connection has been authenticated
// send security message until going connection has been authenticated
connection.send(message)

wakeupSelector()
Expand Down Expand Up @@ -859,14 +859,14 @@ private[spark] object ConnectionManager {
None
})

/*testSequentialSending(manager)*/
/*System.gc()*/
/* testSequentialSending(manager)*/
/* System.gc()*/

/*testParallelSending(manager)*/
/*System.gc()*/
/* testParallelSending(manager)*/
/* System.gc()*/

/*testParallelDecreasingSending(manager)*/
/*System.gc()*/
/* testParallelDecreasingSending(manager)*/
/* System.gc()*/

testContinuousSending(manager)
System.gc()
Expand Down Expand Up @@ -948,7 +948,7 @@ private[spark] object ConnectionManager {
val ms = finishTime - startTime
val tput = mb * 1000.0 / ms
println("--------------------------")
/*println("Started at " + startTime + ", finished at " + finishTime) */
/* println("Started at " + startTime + ", finished at " + finishTime) */
println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)")
println("--------------------------")
println()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ private[spark] object ConnectionManagerTest extends Logging{
val slaves = slavesFile.mkString.split("\n")
slavesFile.close()

/*println("Slaves")*/
/*slaves.foreach(println)*/
/* println("Slaves")*/
/* slaves.foreach(println)*/
val tasknum = if (args.length > 2) args(2).toInt else slaves.length
val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024
val count = if (args.length > 4) args(4).toInt else 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private[spark] object ReceiverTest {
println("Started connection manager with id = " + manager.id)

manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
/*println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/
/* println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/
val buffer = ByteBuffer.wrap("response".getBytes)
Some(Message.createBufferMessage(buffer, msg.id))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[spark] object SenderTest {
(0 until count).foreach(i => {
val dataMessage = Message.createBufferMessage(buffer.duplicate)
val startTime = System.currentTimeMillis
/*println("Started timer at " + startTime)*/
/* println("Started timer at " + startTime)*/
val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage)
.map { response =>
val buffer = response.asInstanceOf[BufferMessage].buffers(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] class FileHeader (
buf.writeInt(fileLen)
buf.writeInt(blockId.name.length)
blockId.name.foreach((x: Char) => buf.writeByte(x))
//padding the rest of header
// padding the rest of header
if (FileHeader.HEADER_SIZE - buf.readableBytes > 0 ) {
buf.writeZero(FileHeader.HEADER_SIZE - buf.readableBytes)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ class DAGScheduler(
val properties = if (stageIdToActiveJob.contains(jobId)) {
stageIdToActiveJob(stage.jobId).properties
} else {
//this stage will be assigned to "default" pool
// this stage will be assigned to "default" pool
null
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
properties += ((key, value))
}
}
//TODO (prashant) send conf instead of properties
// TODO (prashant) send conf instead of properties
driverActor = actorSystem.actorOf(
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ object BlockFetcherIterator {
}
} catch {
case x: InterruptedException => logInfo("Copier Interrupted")
//case _ => throw new SparkException("Exception Throw in Shuffle Copier")
// case _ => throw new SparkException("Exception Throw in Shuffle Copier")
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private[spark] object ClosureCleaner extends Logging {
accessedFields(cls) = Set[String]()
for (cls <- func.getClass :: innerClasses)
getClassReader(cls).accept(new FieldAccessFinder(accessedFields), 0)
//logInfo("accessedFields: " + accessedFields)
// logInfo("accessedFields: " + accessedFields)

val inInterpreter = {
try {
Expand All @@ -139,21 +139,21 @@ private[spark] object ClosureCleaner extends Logging {
val field = cls.getDeclaredField(fieldName)
field.setAccessible(true)
val value = field.get(obj)
//logInfo("1: Setting " + fieldName + " on " + cls + " to " + value);
// logInfo("1: Setting " + fieldName + " on " + cls + " to " + value);
field.set(outer, value)
}
}

if (outer != null) {
//logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
// logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
val field = func.getClass.getDeclaredField("$outer")
field.setAccessible(true)
field.set(func, outer)
}
}

private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = {
//logInfo("Creating a " + cls + " with outer = " + outer)
// logInfo("Creating a " + cls + " with outer = " + outer)
if (!inInterpreter) {
// This is a bona fide closure class, whose constructor has no effects
// other than to set its fields, so use its constructor
Expand All @@ -170,7 +170,7 @@ private[spark] object ClosureCleaner extends Logging {
val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
val obj = newCtor.newInstance().asInstanceOf[AnyRef]
if (outer != null) {
//logInfo("3: Setting $outer on " + cls + " to " + outer);
// logInfo("3: Setting $outer on " + cls + " to " + outer);
val field = cls.getDeclaredField("$outer")
field.setAccessible(true)
field.set(obj, outer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[akka] class IndestructibleActorSystemImpl(
if (isFatalError(cause) && !settings.JvmExitOnFatalError) {
log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
"ActorSystem [{}] tolerating and continuing.... ", thread.getName, name)
//shutdown() //TODO make it configurable
// shutdown() //TODO make it configurable
} else {
fallbackHandler.uncaughtException(thread, cause)
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/util/MutablePair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ package org.apache.spark.util
* @param _1 Element 1 of this MutablePair
* @param _2 Element 2 of this MutablePair
*/
case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T1,
@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T2]
case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/* , AnyRef*/) T1,
@specialized(Int, Long, Double, Char, Boolean/* , AnyRef*/) T2]
(var _1: T1, var _2: T2)
extends Product2[T1, T2]
{
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte

test ("add value to collection accumulators") {
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
for (nThreads <- List(1, 10)) { // test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val d = sc.parallelize(1 to maxI)
Expand All @@ -83,7 +83,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte

test ("value not readable in tasks") {
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
for (nThreads <- List(1, 10)) { // test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val d = sc.parallelize(1 to maxI)
Expand Down Expand Up @@ -124,7 +124,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte

test ("localValue readable in tasks") {
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
for (nThreads <- List(1, 10)) { // test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet}
Expand Down
1 change: 0 additions & 1 deletion core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,6 @@ object CheckpointSuite {
// This is a custom cogroup function that does not use mapValues like
// the PairRDDFunctions.cogroup()
def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = {
//println("First = " + first + ", second = " + second)
new CoGroupedRDD[K](
Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
part
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet

assert(intercept[SparkException]{ arrs.distinct() }.getMessage.contains("array"))
// We can't catch all usages of arrays, since they might occur inside other collections:
//assert(fails { arrPairs.distinct() })
// assert(fails { arrPairs.distinct() })
assert(intercept[SparkException]{ arrPairs.partitionBy(new HashPartitioner(2)) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
sc.addSparkListener(new StatsReportListener)
//just to make sure some of the tasks take a noticeable amount of time
// just to make sure some of the tasks take a noticeable amount of time
val w = {i:Int =>
if (i == 0)
Thread.sleep(100)
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class UtilsSuite extends FunSuite {
}

test("copyStream") {
//input array initialization
// input array initialization
val bytes = Array.ofDim[Byte](9000)
Random.nextBytes(bytes)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ object LocalALS {
for (i <- 0 until M; j <- 0 until U) {
r.set(i, j, blas.ddot(ms(i), us(j)))
}
//println("R: " + r)
blas.daxpy(-1, targetR, r)
val sumSqs = r.aggregate(Functions.plus, Functions.square)
sqrt(sumSqs / (M * U))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object SimpleSkewedGroupByTest {

println("RESULT: " + pairs1.groupByKey(numReducers).count)
// Print how many keys each reducer got (for debugging)
//println("RESULT: " + pairs1.groupByKey(numReducers)
// println("RESULT: " + pairs1.groupByKey(numReducers)
// .map{case (k,v) => (k, v.size)}
// .collectAsMap)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ object SparkALS {
for (i <- 0 until M; j <- 0 until U) {
r.set(i, j, blas.ddot(ms(i), us(j)))
}
//println("R: " + r)
blas.daxpy(-1, targetR, r)
val sumSqs = r.aggregate(Functions.plus, Functions.square)
sqrt(sumSqs / (M * U))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ object SparkHdfsLR {
case class DataPoint(x: Vector, y: Double)

def parsePoint(line: String): DataPoint = {
//val nums = line.split(' ').map(_.toDouble)
//return DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
val tok = new java.util.StringTokenizer(line, " ")
var y = tok.nextToken.toDouble
var x = new Array[Double](D)
Expand Down
Loading

0 comments on commit 60abc25

Please sign in to comment.