Skip to content

Commit

Permalink
Replace use of .size with .length for Arrays
Browse files Browse the repository at this point in the history
Invoking .size on arrays is valid, but requires an implicit conversion to SeqLike. This incurs a compile time overhead and more importantly a runtime overhead, as the Array must be wrapped before the method can be invoked. For example, the difference in generated byte code is:

  public int withSize();
    Code:
       0: getstatic     #23                 // Field scala/Predef$.MODULE$:Lscala/Predef$;
       3: aload_0
       4: invokevirtual #25                 // Method array:()[I
       7: invokevirtual #29                 // Method scala/Predef$.intArrayOps:([I)Lscala/collection/mutable/ArrayOps;
      10: invokeinterface #34,  1           // InterfaceMethod scala/collection/mutable/ArrayOps.size:()I
      15: ireturn

  public int withLength();
    Code:
       0: aload_0
       1: invokevirtual #25                 // Method array:()[I
       4: arraylength
       5: ireturn

Author: sksamuel <sam@sksamuel.com>

Closes #5376 from sksamuel/master and squashes the following commits:

77ec261 [sksamuel] Replace use of .size with .length for Arrays.
  • Loading branch information
sksamuel authored and rxin committed Apr 7, 2015
1 parent 7162ecf commit 2c32bef
Show file tree
Hide file tree
Showing 19 changed files with 42 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
buffer.get(bytes)
bytes.foreach(x => print(x + " "))
buffer.position(curPosition)
print(" (" + bytes.size + ")")
print(" (" + bytes.length + ")")
}

def printBuffer(buffer: ByteBuffer, position: Int, length: Int) {
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
}
result
},
Range(0, self.partitions.size),
Range(0, self.partitions.length),
(index: Int, data: Long) => totalCount.addAndGet(data),
totalCount.get())
}
Expand All @@ -54,8 +54,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
* Returns a future for retrieving all elements of this RDD.
*/
def collectAsync(): FutureAction[Seq[T]] = {
val results = new Array[Array[T]](self.partitions.size)
self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.size),
val results = new Array[Array[T]](self.partitions.length)
self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.length),
(index, data) => results(index) = data, results.flatten.toSeq)
}

Expand Down Expand Up @@ -111,15 +111,15 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
*/
def foreachAsync(f: T => Unit): FutureAction[Unit] = {
val cleanF = self.context.clean(f)
self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size),
self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.length),
(index, data) => Unit, Unit)
}

/**
* Applies a function f to each partition of this RDD.
*/
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.size),
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length),
(index, data) => Unit, Unit)
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds

override def getPartitions: Array[Partition] = {
assertValid()
(0 until blockIds.size).map(i => {
(0 until blockIds.length).map(i => {
new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
}).toArray
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
extends RDD[Pair[T, U]](sc, Nil)
with Serializable {

val numPartitionsInRdd2 = rdd2.partitions.size
val numPartitionsInRdd2 = rdd2.partitions.length

override def getPartitions: Array[Partition] = {
// create the cross product split
val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size)
val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
val idx = s1.index * numPartitionsInRdd2 + s2.index
array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
if (fs.exists(cpath)) {
val dirContents = fs.listStatus(cpath).map(_.getPath)
val partitionFiles = dirContents.filter(_.getName.startsWith("part-")).map(_.toString).sorted
val numPart = partitionFiles.size
val numPart = partitionFiles.length
if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {
throw new SparkException("Invalid checkpoint directory: " + checkpointPath)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:

override def getPartitions: Array[Partition] = {
val array = new Array[Partition](part.numPartitions)
for (i <- 0 until array.size) {
for (i <- 0 until array.length) {
// Each CoGroupPartition will have a dependency per contributing RDD
array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
// Assume each RDD contributed a single dependency, and get it
Expand All @@ -120,7 +120,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
val sparkConf = SparkEnv.get.conf
val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
val numRdds = split.deps.length

// A list of (rdd iterator, dependency number) pairs
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:

// determines the tradeoff between load-balancing the partitions sizes and their locality
// e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality
val slack = (balanceSlack * prev.partitions.size).toInt
val slack = (balanceSlack * prev.partitions.length).toInt

var noLocality = true // if true if no preferredLocations exists for parent RDD

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
@Experimental
def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
val evaluator = new MeanEvaluator(self.partitions.size, confidence)
val evaluator = new MeanEvaluator(self.partitions.length, confidence)
self.context.runApproximateJob(self, processPartition, evaluator, timeout)
}

Expand All @@ -81,7 +81,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
@Experimental
def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
val evaluator = new SumEvaluator(self.partitions.size, confidence)
val evaluator = new SumEvaluator(self.partitions.length, confidence)
self.context.runApproximateJob(self, processPartition, evaluator, timeout)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
* order of the keys).
*/
// TODO: this currently doesn't work on P other than Tuple2!
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] =
{
val part = new RangePartitioner(numPartitions, self, ascending)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* RDD will be <= us.
*/
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] =
subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size)))
subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.length)))

/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =
Expand Down
22 changes: 11 additions & 11 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ abstract class RDD[T: ClassTag](
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(): RDD[T] = distinct(partitions.size)
def distinct(): RDD[T] = distinct(partitions.length)

/**
* Return a new RDD that has exactly numPartitions partitions.
Expand Down Expand Up @@ -488,7 +488,7 @@ abstract class RDD[T: ClassTag](
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.size)
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
Expand Down Expand Up @@ -852,7 +852,7 @@ abstract class RDD[T: ClassTag](
* RDD will be &lt;= us.
*/
def subtract(other: RDD[T]): RDD[T] =
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size)))
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))

/**
* Return an RDD with the elements from `this` that are not in `other`.
Expand Down Expand Up @@ -986,14 +986,14 @@ abstract class RDD[T: ClassTag](
combOp: (U, U) => U,
depth: Int = 2): U = {
require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
if (partitions.size == 0) {
if (partitions.length == 0) {
return Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
}
val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.size
var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
// If creating an extra level doesn't help reduce the wall-clock time, we stop tree aggregation.
while (numPartitions > scale + numPartitions / scale) {
Expand Down Expand Up @@ -1026,7 +1026,7 @@ abstract class RDD[T: ClassTag](
}
result
}
val evaluator = new CountEvaluator(partitions.size, confidence)
val evaluator = new CountEvaluator(partitions.length, confidence)
sc.runApproximateJob(this, countElements, evaluator, timeout)
}

Expand Down Expand Up @@ -1061,7 +1061,7 @@ abstract class RDD[T: ClassTag](
}
map
}
val evaluator = new GroupedCountEvaluator[T](partitions.size, confidence)
val evaluator = new GroupedCountEvaluator[T](partitions.length, confidence)
sc.runApproximateJob(this, countPartition, evaluator, timeout)
}

Expand Down Expand Up @@ -1140,7 +1140,7 @@ abstract class RDD[T: ClassTag](
* the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
*/
def zipWithUniqueId(): RDD[(T, Long)] = {
val n = this.partitions.size.toLong
val n = this.partitions.length.toLong
this.mapPartitionsWithIndex { case (k, iter) =>
iter.zipWithIndex.map { case (item, i) =>
(item, i * n + k)
Expand Down Expand Up @@ -1243,7 +1243,7 @@ abstract class RDD[T: ClassTag](
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}
if (mapRDDs.partitions.size == 0) {
if (mapRDDs.partitions.length == 0) {
Array.empty
} else {
mapRDDs.reduce { (queue1, queue2) =>
Expand Down Expand Up @@ -1489,7 +1489,7 @@ abstract class RDD[T: ClassTag](
}
// The first RDD in the dependency stack has no parents, so no need for a +-
def firstDebugString(rdd: RDD[_]): Seq[String] = {
val partitionStr = "(" + rdd.partitions.size + ")"
val partitionStr = "(" + rdd.partitions.length + ")"
val leftOffset = (partitionStr.length - 1) / 2
val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset))

Expand All @@ -1499,7 +1499,7 @@ abstract class RDD[T: ClassTag](
} ++ debugChildren(rdd, nextPrefix)
}
def shuffleDebugString(rdd: RDD[_], prefix: String = "", isLastChild: Boolean): Seq[String] = {
val partitionStr = "(" + rdd.partitions.size + ")"
val partitionStr = "(" + rdd.partitions.length + ")"
val leftOffset = (partitionStr.length - 1) / 2
val thisPrefix = prefix.replaceAll("\\|\\s+$", "")
val nextPrefix = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
new SerializableWritable(rdd.context.hadoopConfiguration))
rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
if (newRDD.partitions.size != rdd.partitions.size) {
if (newRDD.partitions.length != rdd.partitions.length) {
throw new SparkException(
"Checkpoint RDD " + newRDD + "(" + newRDD.partitions.size + ") has different " +
"number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
"Checkpoint RDD " + newRDD + "(" + newRDD.partitions.length + ") has different " +
"number of partitions than original RDD " + rdd + "(" + rdd.partitions.length + ")")
}

// Change the dependencies and partitions of the RDD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](

override def getPartitions: Array[Partition] = {
val array = new Array[Partition](part.numPartitions)
for (i <- 0 until array.size) {
for (i <- 0 until array.length) {
// Each CoGroupPartition will depend on rdd1 and rdd2
array(i) = new CoGroupPartition(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
dependencies(j) match {
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class UnionRDD[T: ClassTag](
extends RDD[T](sc, Nil) { // Nil since we implement getDependencies

override def getPartitions: Array[Partition] = {
val array = new Array[Partition](rdds.map(_.partitions.size).sum)
val array = new Array[Partition](rdds.map(_.partitions.length).sum)
var pos = 0
for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
Expand All @@ -76,8 +76,8 @@ class UnionRDD[T: ClassTag](
val deps = new ArrayBuffer[Dependency[_]]
var pos = 0
for (rdd <- rdds) {
deps += new RangeDependency(rdd, 0, pos, rdd.partitions.size)
pos += rdd.partitions.size
deps += new RangeDependency(rdd, 0, pos, rdd.partitions.length)
pos += rdd.partitions.length
}
deps
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag](
if (preservesPartitioning) firstParent[Any].partitioner else None

override def getPartitions: Array[Partition] = {
val numParts = rdds.head.partitions.size
if (!rdds.forall(rdd => rdd.partitions.size == numParts)) {
val numParts = rdds.head.partitions.length
if (!rdds.forall(rdd => rdd.partitions.length == numParts)) {
throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions")
}
Array.tabulate[Partition](numParts) { i =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, L

/** The start index of each partition. */
@transient private val startIndices: Array[Long] = {
val n = prev.partitions.size
val n = prev.partitions.length
if (n == 0) {
Array[Long]()
} else if (n == 1) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ class RDDInfo(
private[spark] object RDDInfo {
def fromRdd(rdd: RDD[_]): RDDInfo = {
val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
new RDDInfo(rdd.id, rddName, rdd.partitions.size, rdd.getStorageLevel)
new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
val stageIds = sc.statusTracker.getActiveStageIds()
val stages = stageIds.map(sc.statusTracker.getStageInfo).flatten.filter(_.numTasks() > 1)
.filter(now - _.submissionTime() > FIRST_DELAY).sortBy(_.stageId())
if (stages.size > 0) {
if (stages.length > 0) {
show(now, stages.take(3)) // display at most 3 stages in same time
}
}
Expand All @@ -81,7 +81,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
val total = s.numTasks()
val header = s"[Stage ${s.stageId()}:"
val tailer = s"(${s.numCompletedTasks()} + ${s.numActiveTasks()}) / $total]"
val w = width - header.size - tailer.size
val w = width - header.length - tailer.length
val bar = if (w > 0) {
val percent = w * s.numCompletedTasks() / total
(0 until w).map { i =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class BitSet(numBits: Int) extends Serializable {
val wordIndex = bitIndex >> 6 // divide by 64
var i = 0
while(i < wordIndex) { words(i) = -1; i += 1 }
if(wordIndex < words.size) {
if(wordIndex < words.length) {
// Set the remaining bits (note that the mask could still be zero)
val mask = ~(-1L << (bitIndex & 0x3f))
words(wordIndex) |= mask
Expand Down

0 comments on commit 2c32bef

Please sign in to comment.