Skip to content

Commit

Permalink
protect against null counters
Browse files Browse the repository at this point in the history
The `getCounter` method of the `Reporter` returned from `HadoopFlowProcess` was
returning null in some cases for a few jobs that we run in production. (It is
unclear why these jobs were seeing null counters.)

From looking at the Hadoop source code, getCounter does return null in some instances,
in particular the Reporter.NULL implementation unconditionally returns null from
its getCounter implementation. Hadoop does this despite not documenting that null
is a valid return value.

Solution: Null check the return value of `Reporter.getCounter` to workaround the issue.

Fixes #1716
  • Loading branch information
Tom Dyas committed Sep 25, 2017
1 parent 9848178 commit dd89eb5
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions scalding-core/src/main/scala/com/twitter/scalding/Stats.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cascading.flow.{ Flow, FlowListener, FlowDef, FlowProcess }
import cascading.flow.hadoop.HadoopFlowProcess
import cascading.stats.CascadingStats
import java.util.concurrent.ConcurrentHashMap
import org.apache.hadoop.mapreduce.Counter
import org.slf4j.{ Logger, LoggerFactory }
import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -64,8 +65,12 @@ private[scalding] final case class GenericFlowPCounterImpl(fp: FlowProcess[_], s
}

private[scalding] final case class HadoopFlowPCounterImpl(fp: HadoopFlowProcess, statKey: StatKey) extends CounterImpl {
private[this] val cntr = fp.getReporter().getCounter(statKey.group, statKey.counter)
override def increment(amount: Long): Unit = cntr.increment(amount)
private[this] val counter: Option[Counter] = (for {
r <- Option(fp.getReporter)
c <- Option(r.getCounter(statKey.group, statKey.counter))
} yield c)

override def increment(amount: Long): Unit = counter.foreach(_.increment(amount))
}

object Stat {
Expand Down

0 comments on commit dd89eb5

Please sign in to comment.