Skip to content

Commit

Permalink
apache#120 shuffle records read (apache#127)
Browse files Browse the repository at this point in the history
Co-authored-by: Yu Gan <yu.gan@kyligence.io>
  • Loading branch information
yugan95 and Yu Gan authored Jun 16, 2020
1 parent 8c8d26c commit 9cb59ac
Showing 1 changed file with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,13 @@ class SQLAppStatusListener(
return
}

val updates = accumUpdates
val updates = if (skewDetectEnabled) {
import InternalAccumulator._
accumUpdates.filter { acc =>
acc.update.isDefined && (metrics.accumulatorIds.contains(acc.id) ||
shuffleRead.RECORDS_READ.equals(acc.name.orNull))
}.sortBy(_.id)
} else accumUpdates
.filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) }
.sortBy(_.id)

Expand All @@ -252,7 +258,7 @@ class SQLAppStatusListener(
val values = new Array[Long](updates.size)
updates.zipWithIndex.foreach { case (acc, idx) =>
ids(idx) = acc.id
names(idx) = acc.name.get
names(idx) = acc.name.orNull
// In a live application, accumulators have Long values, but when reading from event
// logs, they have String values. For now, assume all accumulators are Long and covert
// accordingly.
Expand Down

0 comments on commit 9cb59ac

Please sign in to comment.