Skip to content

Commit

Permalink
protect against null counters
Browse files Browse the repository at this point in the history
The `getCounter` method of the `Reporter` returned from `HadoopFlowProcess` was
returning null in some cases for a few jobs that we run in production. (It is
unclear why these jobs were seeing null counters.)

From looking at the Hadoop source code, getCounter does return null in some instances,
in particular the Reporter.NULL implementation unconditionally returns null from
its getCounter implementation. Hadoop does this despite not documenting that null
is a valid return value.

Solution: Null check the return value of `Reporter.getCounter` to workaround the issue
and log a warning.

Fixes #1716
  • Loading branch information
Tom Dyas committed Sep 25, 2017
1 parent 9848178 commit 268ef46
Showing 1 changed file with 24 additions and 2 deletions.
26 changes: 24 additions & 2 deletions scalding-core/src/main/scala/com/twitter/scalding/Stats.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cascading.flow.{ Flow, FlowListener, FlowDef, FlowProcess }
import cascading.flow.hadoop.HadoopFlowProcess
import cascading.stats.CascadingStats
import java.util.concurrent.ConcurrentHashMap
import org.apache.hadoop.mapreduce.Counter
import org.slf4j.{ Logger, LoggerFactory }
import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -63,9 +64,30 @@ private[scalding] final case class GenericFlowPCounterImpl(fp: FlowProcess[_], s
override def increment(amount: Long): Unit = fp.increment(statKey.group, statKey.counter, amount)
}

private[scalding] object HadoopFlowPCounterImpl {
@transient lazy val logger: Logger = LoggerFactory.getLogger(this.getClass)
}

private[scalding] final case class HadoopFlowPCounterImpl(fp: HadoopFlowProcess, statKey: StatKey) extends CounterImpl {
private[this] val cntr = fp.getReporter().getCounter(statKey.group, statKey.counter)
override def increment(amount: Long): Unit = cntr.increment(amount)
import HadoopFlowPCounterImpl.logger

private[this] val cntr: Option[Counter] = {
val reporter = fp.getReporter
if (reporter != null) {
val counter = reporter.getCounter(statKey.group, statKey.counter)
if (counter != null) {
Some(counter)
} else {
logger.warn(s"Cannot increment counter(${statKey.group}, ${statKey.counter}) because getCounter returned null.")
None
}
} else {
logger.warn(s"Cannot increment counter(${statKey.group}, ${statKey.counter}) because HadoopFlowProcess.getReporter returned null")
None
}
}

override def increment(amount: Long): Unit = cntr.foreach(_.increment(amount))
}

object Stat {
Expand Down

0 comments on commit 268ef46

Please sign in to comment.