-
Notifications
You must be signed in to change notification settings - Fork 708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Using MAP_OUTPUT_BYTES instead of FILE_BYTES_READ in RatioBasedEstimator #1645
Conversation
212888c
to
6522522
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of minor comments. Would be nice to get @gerashegalov to take a look as well as he knows a bit about this too.
@@ -253,6 +253,7 @@ final case class FlowStepHistory(keys: FlowStepKeys, | |||
failedReduces: Long, | |||
mapFileBytesRead: Long, | |||
mapFileBytesWritten: Long, | |||
mapOutputBytes: Long, | |||
reduceFileBytesRead: Long, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this isn't being used anymore, we could drop it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
many of them don't being used, I'm not sure that needs to delete.
@@ -179,7 +181,8 @@ object HRavenHistoryService extends HistoryService { | |||
} yield toFlowStepHistory(keys, step, tasks) | |||
} | |||
|
|||
private def toFlowStepHistory(keys: FlowStepKeys, step: JobDetails, tasks: Seq[Task]) = | |||
private def toFlowStepHistory(keys: FlowStepKeys, step: JobDetails, tasks: Seq[Task]) = { | |||
val mapOutputBytes = step.getCounters.getCounter(TaskCounterKey, MapOutputBytesKey).getValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering if we should be a bit defensive here in case HRaven returns an empty counters list / the map output bytes counter isn't found. We can't really proceed with the hraven reducer estimator without this value but we could throw a better exception than an NPE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the check where it used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing this issue!
@@ -179,7 +181,8 @@ object HRavenHistoryService extends HistoryService { | |||
} yield toFlowStepHistory(keys, step, tasks) | |||
} | |||
|
|||
private def toFlowStepHistory(keys: FlowStepKeys, step: JobDetails, tasks: Seq[Task]) = | |||
private def toFlowStepHistory(keys: FlowStepKeys, step: JobDetails, tasks: Seq[Task]) = { | |||
val mapOutputBytes = step.getCounters.getCounter(TaskCounterKey, MapOutputBytesKey).getValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Too prominent, and inconsistent with the way the other fields are inited. Let us inline.
I think you want getMapCounters here:
step.getMapCounters.getCounter ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this value stored in both counters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved to map counters
@@ -46,6 +46,8 @@ object HRavenHistoryService extends HistoryService { | |||
"status", | |||
"startTime", | |||
"finishTime").asJava | |||
private val TaskCounterKey = "org.apache.hadoop.mapreduce.TaskCounter" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drop this. Let us use depending on Hadoop version:
com.twitter.hraven.Constants#TASK_COUNTER
or
com.twitter.hraven.Constants#TASK_COUNTER_HADOOP2
instead. Ideally hraven should have a generic getWhateverCounter method abstracting Hadoop version from us. cc: @vrushalivc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -46,6 +46,8 @@ object HRavenHistoryService extends HistoryService { | |||
"status", | |||
"startTime", | |||
"finishTime").asJava | |||
private val TaskCounterKey = "org.apache.hadoop.mapreduce.TaskCounter" | |||
private val MapOutputBytesKey = "MAP_OUTPUT_BYTES" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not worth a constant for one time usage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at now twice :)
6522522
to
8bfe5ef
Compare
8bfe5ef
to
6e3ad96
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM,
consider creating an Hraven issue to create public hadoop-version-independent API getCounterValue
if (step.getHistoryFileType == HistoryFileType.TWO) { | ||
getCounterValueAsLong(step.getMapCounters, Constants.TASK_COUNTER_HADOOP2, MapOutputBytesKey) | ||
} else { | ||
getCounterValueAsLong(step.getMapCounters, Constants.TASK_COUNTER, MapOutputBytesKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move MapOutputBytesKey inside the method or inline it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gerashegalov why? JVM will inline it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only meant to put it in the scope of where it's actually used, since it's only in this one method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My preference to keep it in constant as well as other constant values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
noted!
|
||
private def getCounterValueAsLong(counters: CounterMap, counterGroupName: String, counterName: String): Long = { | ||
val counter = counters.getCounter(counterGroupName, counterName) | ||
if (counter != null) counter.getValue else 0L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe more concise to write as
Option(counter).gerOrElse(0L)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is Option(counter).map(_.getValue).getOrElse(0L)
less readable, isn't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my Option comment is marked nit, so optional to consider :)
fix for #1510