-
Notifications
You must be signed in to change notification settings - Fork 751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-2185] Recommend GoT Dynamic Auto-Scaling using heuristics based on WorkUnitsSizeSummary
#4087
Conversation
…having `final` members, to support deserialization
2413e44
to
7caf9ee
Compare
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #4087 +/- ##
============================================
- Coverage 47.98% 45.43% -2.55%
+ Complexity 8373 3191 -5182
============================================
Files 1582 695 -887
Lines 62712 26595 -36117
Branches 7105 2652 -4453
============================================
- Hits 30091 12083 -18008
+ Misses 29899 13510 -16389
+ Partials 2722 1002 -1720 ☔ View full report in Codecov by Sentry. |
…n linear heuristic
7caf9ee
to
b72cb66
Compare
…various handles get closed
protected String calcProfileDerivationName(JobState jobState) { | ||
// TODO: if we ever return > 1 directive, append a monotonically increasing number to avoid collisions | ||
return DEFAULT_PROFILE_DERIVATION_NAME; | ||
} | ||
|
||
protected String calcBasisProfileName(JobState jobState) { | ||
return WorkforceProfiles.BASELINE_NAME; // always build upon baseline | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there anything we are getting out of JobState object in place of directly using jobProps or not using at all in calcBasisProfileName
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find JobState
a clearer abstraction, that additionally brings helpful forms like getPropAsLong
(and friends), whereas Properties
only ever returns String
.
in the general case (as we're an abstract base class), a derived class might want to be config-driven, and passing in JobState
is a good way to facilitate that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it, thanks.
...ache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java
Outdated
Show resolved
Hide resolved
|
||
long targetNumMinutes = timeBudget.getMaxDurationDesiredMinutes(); | ||
// TODO: take into account `timeBudget.getPermittedOverageMinutes()` - e.g. to decide whether to use `Math.ceil` vs. `Math.floor` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we please name it something different or just a comment indicating it is desiredMinutes for one MWU or maybe add comment/javadoc in TImeBudget class itself ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed timeBudget
to jobTimeBudget
and targetNumMinutes
to targetNumMinutesForAllMWUs
. does that help?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that is much better, thanks for updating.
protected Class<?>[] getWorkflowImplClasses() { | ||
return new Class[] { CommitStepWorkflowImpl.class, ExecuteGobblinWorkflowImpl.class, GenerateWorkUnitsWorkflowImpl.class, | ||
NestingExecOfProcessWorkUnitWorkflowImpl.class, ProcessWorkUnitsWorkflowImpl.class }; | ||
return new Class[] { ExecuteGobblinWorkflowImpl.class, ProcessWorkUnitsWorkflowImpl.class, NestingExecOfProcessWorkUnitWorkflowImpl.class, | ||
CommitStepWorkflowImpl.class, GenerateWorkUnitsWorkflowImpl.class }; | ||
} | ||
|
||
@Override | ||
protected Object[] getActivityImplInstances() { | ||
return new Object[] { new CommitActivityImpl(), new DeleteWorkDirsActivityImpl(),new GenerateWorkUnitsImpl(), | ||
new ProcessWorkUnitImpl(), new SubmitGTEActivityImpl()}; | ||
return new Object[] { new SubmitGTEActivityImpl(), new GenerateWorkUnitsImpl(), new RecommendScalingForWorkUnitsLinearHeuristicImpl(), new ProcessWorkUnitImpl(), | ||
new CommitActivityImpl(), new DeleteWorkDirsActivityImpl() }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there something to do with ordering here or its just preference based change specially getWorkflowImplClasses()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just put them in the sequential order of when they run. (not anything particularly important that I'm committed to.)
// TODO: be more robust and code more defensively, rather than presuming the impl of `RecommendScalingForWorkUnitsLinearHeuristicImpl` | ||
ArrayList<ScalingDirective> adjustedScaling = new ArrayList<>(recommendedScalingDirectives); | ||
ScalingDirective firstDirective = adjustedScaling.get(0); | ||
// deduct one for the (already existing) `GenerateWorkUnits` worker | ||
adjustedScaling.set(0, firstDirective.updateSetPoint(firstDirective.getSetPoint() - 1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe worth adding one more line comment here something similar that this deduction is also based on assumption that firstDirective configs (memory size) are exactly equal to initial configs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated to:
// deduct one for (already existing) `GenerateWorkUnits` worker (we presume its "baseline" `WorkerProfile` similar enough to substitute for this new one)
WorkUnitsSizeSummary
WorkUnitsSizeSummary
WorkUnitsSizeSummary
WorkUnitsSizeSummary
...ache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java
Show resolved
Hide resolved
private static String stringifyProfileOverlay(ProfileOverlay overlay) { | ||
StringBuilder sb = new StringBuilder(); | ||
if (overlay instanceof ProfileOverlay.Adding) { | ||
ProfileOverlay.Adding adding = (ProfileOverlay.Adding) overlay; | ||
for (ProfileOverlay.KVPair kv : adding.getAdditionPairs()) { | ||
sb.append(kv.getKey()).append('=').append(urlEncode(kv.getValue())).append(", "); | ||
} | ||
if (adding.getAdditionPairs().size() > 0) { | ||
sb.setLength(sb.length() - 2); // remove trailing ", " | ||
} | ||
} else { | ||
ProfileOverlay.Removing removing = (ProfileOverlay.Removing) overlay; | ||
for (String key : removing.getRemovalKeys()) { | ||
sb.append(key).append(", "); | ||
} | ||
if (removing.getRemovalKeys().size() > 0) { | ||
sb.setLength(sb.length() - 2); // remove trailing ", " | ||
} | ||
} | ||
return sb.toString(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be part of ProfileOverlay
interface definition itself to have each implementation one toString()
method implemented as here for Combo
stringifyProfileOverlay wouldn't work I assume
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it could, but truly this is the inverse of ScalingDirectiveParser::parse
, so it feels appropriate to live together in this class. after all, I'm not ready define asString(ScalingDirective)
as a ScalingDirective
method.
usually, the particulars of a format should be separate from the object of formatting, to allow multiple formats for the same type.
but you did help flag that I forgot handling of ProfileOverlay.Combo
, so I now throw IllegalArgumentException
// parallelization capacity = 20 container-slots | ||
// per-container-slot rate = 5 mins / MWU |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please explain how this parallelization capacity & per-container-slot rate is derived / calculated ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added code comments:
// parallelization capacity = 20 container-slots (= 4 * 5)
// per-container-slot rate = 5 container-slot-mins/mean(MWU) (= 500 MB/mean(MWU) / 100MB/min)
long numMWUsPerMinutePerContainer = 4; // (amortized) per-container rate = 4 MWU/container-minute (= 20 / 5)
long totalNumContainerMinutesAllMWUs = totalNumMWUs / numMWUsPerMinutePerContainer; // 750 container-minutes (= 3000 MWU / 4 MWU/container-min)
long expectedSetPoint = totalNumContainerMinutesAllMWUs / targetTimeBudgetMinutes; // 10 containers (= 750 / 75)
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Taking a configured (constant) Data Transfer Rate (in bytes per minute) and a configured time budget (to bound job runtime), presume a linear relationship holds between "Work" (i.e.
WorkUnit
) throughput and scaling the number of worker-containers.RecommendScalingForWorkUnitsLinearHeuristicImpl
gives a heuristic-based recommendation in the form of aScalingDirective
(#4068) for how many worker-containers to allocate in order to complete processing of a job within thatTimeBudget
. The job's volume of Work is conveyed viaWorkUnitsSizeSummary
(#4082)Tests
included
Commits