Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace use of .size with .length for Arrays #5376

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
buffer.get(bytes)
bytes.foreach(x => print(x + " "))
buffer.position(curPosition)
print(" (" + bytes.size + ")")
print(" (" + bytes.length + ")")
}

def printBuffer(buffer: ByteBuffer, position: Int, length: Int) {
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
}
result
},
Range(0, self.partitions.size),
Range(0, self.partitions.length),
(index: Int, data: Long) => totalCount.addAndGet(data),
totalCount.get())
}
Expand All @@ -54,8 +54,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
* Returns a future for retrieving all elements of this RDD.
*/
def collectAsync(): FutureAction[Seq[T]] = {
val results = new Array[Array[T]](self.partitions.size)
self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.size),
val results = new Array[Array[T]](self.partitions.length)
self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.length),
(index, data) => results(index) = data, results.flatten.toSeq)
}

Expand Down Expand Up @@ -111,15 +111,15 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
*/
def foreachAsync(f: T => Unit): FutureAction[Unit] = {
val cleanF = self.context.clean(f)
self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size),
self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.length),
(index, data) => Unit, Unit)
}

/**
* Applies a function f to each partition of this RDD.
*/
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.size),
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length),
(index, data) => Unit, Unit)
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds

override def getPartitions: Array[Partition] = {
assertValid()
(0 until blockIds.size).map(i => {
(0 until blockIds.length).map(i => {
new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
}).toArray
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
extends RDD[Pair[T, U]](sc, Nil)
with Serializable {

val numPartitionsInRdd2 = rdd2.partitions.size
val numPartitionsInRdd2 = rdd2.partitions.length

override def getPartitions: Array[Partition] = {
// create the cross product split
val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size)
val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
val idx = s1.index * numPartitionsInRdd2 + s2.index
array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
if (fs.exists(cpath)) {
val dirContents = fs.listStatus(cpath).map(_.getPath)
val partitionFiles = dirContents.filter(_.getName.startsWith("part-")).map(_.toString).sorted
val numPart = partitionFiles.size
val numPart = partitionFiles.length
if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {
throw new SparkException("Invalid checkpoint directory: " + checkpointPath)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:

override def getPartitions: Array[Partition] = {
val array = new Array[Partition](part.numPartitions)
for (i <- 0 until array.size) {
for (i <- 0 until array.length) {
// Each CoGroupPartition will have a dependency per contributing RDD
array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
// Assume each RDD contributed a single dependency, and get it
Expand All @@ -120,7 +120,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
val sparkConf = SparkEnv.get.conf
val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
val numRdds = split.deps.length

// A list of (rdd iterator, dependency number) pairs
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:

// determines the tradeoff between load-balancing the partitions sizes and their locality
// e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality
val slack = (balanceSlack * prev.partitions.size).toInt
val slack = (balanceSlack * prev.partitions.length).toInt

var noLocality = true // if true if no preferredLocations exists for parent RDD

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
@Experimental
def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
val evaluator = new MeanEvaluator(self.partitions.size, confidence)
val evaluator = new MeanEvaluator(self.partitions.length, confidence)
self.context.runApproximateJob(self, processPartition, evaluator, timeout)
}

Expand All @@ -81,7 +81,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
@Experimental
def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
val evaluator = new SumEvaluator(self.partitions.size, confidence)
val evaluator = new SumEvaluator(self.partitions.length, confidence)
self.context.runApproximateJob(self, processPartition, evaluator, timeout)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
* order of the keys).
*/
// TODO: this currently doesn't work on P other than Tuple2!
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] =
{
val part = new RangePartitioner(numPartitions, self, ascending)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* RDD will be <= us.
*/
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] =
subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size)))
subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.length)))

/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =
Expand Down
22 changes: 11 additions & 11 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ abstract class RDD[T: ClassTag](
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(): RDD[T] = distinct(partitions.size)
def distinct(): RDD[T] = distinct(partitions.length)

/**
* Return a new RDD that has exactly numPartitions partitions.
Expand Down Expand Up @@ -488,7 +488,7 @@ abstract class RDD[T: ClassTag](
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.size)
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
Expand Down Expand Up @@ -852,7 +852,7 @@ abstract class RDD[T: ClassTag](
* RDD will be &lt;= us.
*/
def subtract(other: RDD[T]): RDD[T] =
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size)))
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))

/**
* Return an RDD with the elements from `this` that are not in `other`.
Expand Down Expand Up @@ -986,14 +986,14 @@ abstract class RDD[T: ClassTag](
combOp: (U, U) => U,
depth: Int = 2): U = {
require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
if (partitions.size == 0) {
if (partitions.length == 0) {
return Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
}
val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.size
var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
// If creating an extra level doesn't help reduce the wall-clock time, we stop tree aggregation.
while (numPartitions > scale + numPartitions / scale) {
Expand Down Expand Up @@ -1026,7 +1026,7 @@ abstract class RDD[T: ClassTag](
}
result
}
val evaluator = new CountEvaluator(partitions.size, confidence)
val evaluator = new CountEvaluator(partitions.length, confidence)
sc.runApproximateJob(this, countElements, evaluator, timeout)
}

Expand Down Expand Up @@ -1061,7 +1061,7 @@ abstract class RDD[T: ClassTag](
}
map
}
val evaluator = new GroupedCountEvaluator[T](partitions.size, confidence)
val evaluator = new GroupedCountEvaluator[T](partitions.length, confidence)
sc.runApproximateJob(this, countPartition, evaluator, timeout)
}

Expand Down Expand Up @@ -1140,7 +1140,7 @@ abstract class RDD[T: ClassTag](
* the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
*/
def zipWithUniqueId(): RDD[(T, Long)] = {
val n = this.partitions.size.toLong
val n = this.partitions.length.toLong
this.mapPartitionsWithIndex { case (k, iter) =>
iter.zipWithIndex.map { case (item, i) =>
(item, i * n + k)
Expand Down Expand Up @@ -1243,7 +1243,7 @@ abstract class RDD[T: ClassTag](
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}
if (mapRDDs.partitions.size == 0) {
if (mapRDDs.partitions.length == 0) {
Array.empty
} else {
mapRDDs.reduce { (queue1, queue2) =>
Expand Down Expand Up @@ -1489,7 +1489,7 @@ abstract class RDD[T: ClassTag](
}
// The first RDD in the dependency stack has no parents, so no need for a +-
def firstDebugString(rdd: RDD[_]): Seq[String] = {
val partitionStr = "(" + rdd.partitions.size + ")"
val partitionStr = "(" + rdd.partitions.length + ")"
val leftOffset = (partitionStr.length - 1) / 2
val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset))

Expand All @@ -1499,7 +1499,7 @@ abstract class RDD[T: ClassTag](
} ++ debugChildren(rdd, nextPrefix)
}
def shuffleDebugString(rdd: RDD[_], prefix: String = "", isLastChild: Boolean): Seq[String] = {
val partitionStr = "(" + rdd.partitions.size + ")"
val partitionStr = "(" + rdd.partitions.length + ")"
val leftOffset = (partitionStr.length - 1) / 2
val thisPrefix = prefix.replaceAll("\\|\\s+$", "")
val nextPrefix = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
new SerializableWritable(rdd.context.hadoopConfiguration))
rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
if (newRDD.partitions.size != rdd.partitions.size) {
if (newRDD.partitions.length != rdd.partitions.length) {
throw new SparkException(
"Checkpoint RDD " + newRDD + "(" + newRDD.partitions.size + ") has different " +
"number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
"Checkpoint RDD " + newRDD + "(" + newRDD.partitions.length + ") has different " +
"number of partitions than original RDD " + rdd + "(" + rdd.partitions.length + ")")
}

// Change the dependencies and partitions of the RDD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](

override def getPartitions: Array[Partition] = {
val array = new Array[Partition](part.numPartitions)
for (i <- 0 until array.size) {
for (i <- 0 until array.length) {
// Each CoGroupPartition will depend on rdd1 and rdd2
array(i) = new CoGroupPartition(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
dependencies(j) match {
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class UnionRDD[T: ClassTag](
extends RDD[T](sc, Nil) { // Nil since we implement getDependencies

override def getPartitions: Array[Partition] = {
val array = new Array[Partition](rdds.map(_.partitions.size).sum)
val array = new Array[Partition](rdds.map(_.partitions.length).sum)
var pos = 0
for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
Expand All @@ -76,8 +76,8 @@ class UnionRDD[T: ClassTag](
val deps = new ArrayBuffer[Dependency[_]]
var pos = 0
for (rdd <- rdds) {
deps += new RangeDependency(rdd, 0, pos, rdd.partitions.size)
pos += rdd.partitions.size
deps += new RangeDependency(rdd, 0, pos, rdd.partitions.length)
pos += rdd.partitions.length
}
deps
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag](
if (preservesPartitioning) firstParent[Any].partitioner else None

override def getPartitions: Array[Partition] = {
val numParts = rdds.head.partitions.size
if (!rdds.forall(rdd => rdd.partitions.size == numParts)) {
val numParts = rdds.head.partitions.length
if (!rdds.forall(rdd => rdd.partitions.length == numParts)) {
throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions")
}
Array.tabulate[Partition](numParts) { i =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, L

/** The start index of each partition. */
@transient private val startIndices: Array[Long] = {
val n = prev.partitions.size
val n = prev.partitions.length
if (n == 0) {
Array[Long]()
} else if (n == 1) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ class RDDInfo(
private[spark] object RDDInfo {
def fromRdd(rdd: RDD[_]): RDDInfo = {
val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
new RDDInfo(rdd.id, rddName, rdd.partitions.size, rdd.getStorageLevel)
new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
val stageIds = sc.statusTracker.getActiveStageIds()
val stages = stageIds.map(sc.statusTracker.getStageInfo).flatten.filter(_.numTasks() > 1)
.filter(now - _.submissionTime() > FIRST_DELAY).sortBy(_.stageId())
if (stages.size > 0) {
if (stages.length > 0) {
show(now, stages.take(3)) // display at most 3 stages in same time
}
}
Expand All @@ -81,7 +81,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
val total = s.numTasks()
val header = s"[Stage ${s.stageId()}:"
val tailer = s"(${s.numCompletedTasks()} + ${s.numActiveTasks()}) / $total]"
val w = width - header.size - tailer.size
val w = width - header.length - tailer.length
val bar = if (w > 0) {
val percent = w * s.numCompletedTasks() / total
(0 until w).map { i =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class BitSet(numBits: Int) extends Serializable {
val wordIndex = bitIndex >> 6 // divide by 64
var i = 0
while(i < wordIndex) { words(i) = -1; i += 1 }
if(wordIndex < words.size) {
if(wordIndex < words.length) {
// Set the remaining bits (note that the mask could still be zero)
val mask = ~(-1L << (bitIndex & 0x3f))
words(wordIndex) |= mask
Expand Down