Skip to content

Commit

Permalink
[FLINK-36014] [runtime] Align the desired and sufficient resources de…
Browse files Browse the repository at this point in the history
…finiton in Executing and WaitForResources states
  • Loading branch information
ztison authored and XComp committed Sep 11, 2024
1 parent 1d5b214 commit db682b9
Show file tree
Hide file tree
Showing 17 changed files with 256 additions and 420 deletions.
18 changes: 6 additions & 12 deletions docs/layouts/shortcodes/generated/all_jobmanager_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">1 min</td>
<td>Duration</td>
<td>Defines the duration the JobManager delays the scaling operation after a resource change if only sufficient resources are available. The scaling operation is performed immediately if the resources have changed and the desired resources are available. The timeout begins as soon as either the available resources or the job's resource requirements are changed.<br />The resource requirements of a running job can be changed using the <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/ops/rest_api/#jobs-jobid-resource-requirements-1">REST API endpoint</a>.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.max-delay-for-scale-trigger</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and the checkpointing interval multiplied by the by-1-incremented parameter value of jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count if checkpointing is enabled).</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.min-parallelism-increase</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>Integer</td>
<td>Configure the minimum increase in parallelism for a job to scale up.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
Expand All @@ -38,12 +38,6 @@
<td>Integer</td>
<td>The number of consecutive failed checkpoints that will trigger rescaling even in the absence of a completed checkpoint.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.scaling-interval.max</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>Determines the maximum interval time after which a scaling operation is forced even if the <code class="highlighter-rouge">jobmanager.adaptive-scheduler.min-parallelism-increase</code> aren't met. The scaling operation will be ignored when the resource hasn't changed. This option is disabled by default.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.scaling-interval.min</h5></td>
<td style="word-wrap: break-word;">30 s</td>
Expand Down
18 changes: 6 additions & 12 deletions docs/layouts/shortcodes/generated/expert_scheduling_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,18 @@
<td>MemorySize</td>
<td>The size of the write buffer of JobEventStore. The content will be flushed to external file system once the buffer is full</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">1 min</td>
<td>Duration</td>
<td>Defines the duration the JobManager delays the scaling operation after a resource change if only sufficient resources are available. The scaling operation is performed immediately if the resources have changed and the desired resources are available. The timeout begins as soon as either the available resources or the job's resource requirements are changed.<br />The resource requirements of a running job can be changed using the <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/ops/rest_api/#jobs-jobid-resource-requirements-1">REST API endpoint</a>.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.max-delay-for-scale-trigger</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and the checkpointing interval multiplied by the by-1-incremented parameter value of jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count if checkpointing is enabled).</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.min-parallelism-increase</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>Integer</td>
<td>Configure the minimum increase in parallelism for a job to scale up.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
Expand All @@ -116,12 +116,6 @@
<td>Integer</td>
<td>The number of consecutive failed checkpoints that will trigger rescaling even in the absence of a completed checkpoint.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.scaling-interval.max</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>Determines the maximum interval time after which a scaling operation is forced even if the <code class="highlighter-rouge">jobmanager.adaptive-scheduler.min-parallelism-increase</code> aren't met. The scaling operation will be ignored when the resource hasn't changed. This option is disabled by default.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.scaling-interval.min</h5></td>
<td style="word-wrap: break-word;">30 s</td>
Expand Down
18 changes: 6 additions & 12 deletions docs/layouts/shortcodes/generated/job_manager_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">1 min</td>
<td>Duration</td>
<td>Defines the duration the JobManager delays the scaling operation after a resource change if only sufficient resources are available. The scaling operation is performed immediately if the resources have changed and the desired resources are available. The timeout begins as soon as either the available resources or the job's resource requirements are changed.<br />The resource requirements of a running job can be changed using the <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/ops/rest_api/#jobs-jobid-resource-requirements-1">REST API endpoint</a>.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.max-delay-for-scale-trigger</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>The maximum time the JobManager will wait with evaluating previously observed events for rescaling (default: 0ms if checkpointing is disabled and the checkpointing interval multiplied by the by-1-incremented parameter value of jobmanager.adaptive-scheduler.scale-on-failed-checkpoints-count if checkpointing is enabled).</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.min-parallelism-increase</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>Integer</td>
<td>Configure the minimum increase in parallelism for a job to scale up.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
Expand All @@ -38,12 +38,6 @@
<td>Integer</td>
<td>The number of consecutive failed checkpoints that will trigger rescaling even in the absence of a completed checkpoint.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.scaling-interval.max</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>Determines the maximum interval time after which a scaling operation is forced even if the <code class="highlighter-rouge">jobmanager.adaptive-scheduler.min-parallelism-increase</code> aren't met. The scaling operation will be ignored when the resource hasn't changed. This option is disabled by default.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.scaling-interval.min</h5></td>
<td style="word-wrap: break-word;">30 s</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,17 +517,6 @@ public InlineElement getDescription() {
code(SchedulerExecutionMode.REACTIVE.name()))
.build());

@Documentation.Section({
Documentation.Sections.EXPERT_SCHEDULING,
Documentation.Sections.ALL_JOB_MANAGER
})
public static final ConfigOption<Integer> MIN_PARALLELISM_INCREASE =
key("jobmanager.adaptive-scheduler.min-parallelism-increase")
.intType()
.defaultValue(1)
.withDescription(
"Configure the minimum increase in parallelism for a job to scale up.");

@Documentation.Section({
Documentation.Sections.EXPERT_SCHEDULING,
Documentation.Sections.ALL_JOB_MANAGER
Expand All @@ -543,6 +532,56 @@ public InlineElement getDescription() {
Documentation.Sections.EXPERT_SCHEDULING,
Documentation.Sections.ALL_JOB_MANAGER
})
public static final ConfigOption<Duration> SCHEDULER_SCALING_RESOURCE_STABILIZATION_TIMEOUT =
key("jobmanager.adaptive-scheduler.executing.resource-stabilization-timeout")
.durationType()
.defaultValue(Duration.ofSeconds(60))
.withDescription(
Description.builder()
.text(
"Defines the duration the JobManager delays the scaling operation after a resource change if only sufficient resources are available. "
+ "The scaling operation is performed immediately if the resources have changed and the desired resources are available. "
+ "The timeout begins as soon as either the available resources or the job's resource requirements are changed.")
.linebreak()
.text(
"The resource requirements of a running job can be changed using the %s.",
link(
"{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/ops/rest_api/#jobs-jobid-resource-requirements-1",
"REST API endpoint"))
.build());

/**
* @deprecated Use {@link JobManagerOptions#SCHEDULER_SCALING_INTERVAL_MIN} and {@link
* JobManagerOptions#SCHEDULER_SCALING_RESOURCE_STABILIZATION_TIMEOUT}.
*/
@Deprecated
@Documentation.ExcludeFromDocumentation("Hidden for deprecated")
public static final ConfigOption<Integer> MIN_PARALLELISM_INCREASE =
key("jobmanager.adaptive-scheduler.min-parallelism-increase")
.intType()
.defaultValue(1)
.withDescription(
Description.builder()
.text(
"Configure the minimum increase in parallelism for a job to scale up. "
+ "It's not used anymore. Use the configuration option %s and %s to control the sensitivity of a scaling operation.",
code(SCHEDULER_SCALING_INTERVAL_MIN.key()),
code(
SCHEDULER_SCALING_RESOURCE_STABILIZATION_TIMEOUT
.key()))
.linebreak()
.text(
"The resource requirements of a running job can be changed using the %s.",
link(
"{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/ops/rest_api/#jobs-jobid-resource-requirements-1",
"REST API endpoint"))
.build());

/**
* @deprecated Use {@link JobManagerOptions#SCHEDULER_SCALING_RESOURCE_STABILIZATION_TIMEOUT}.
*/
@Deprecated
@Documentation.ExcludeFromDocumentation("Hidden for deprecated")
public static final ConfigOption<Duration> SCHEDULER_SCALING_INTERVAL_MAX =
key("jobmanager.adaptive-scheduler.scaling-interval.max")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@
import java.util.function.Function;

import static org.apache.flink.configuration.JobManagerOptions.MAXIMUM_DELAY_FOR_SCALE_TRIGGER;
import static org.apache.flink.configuration.JobManagerOptions.MIN_PARALLELISM_INCREASE;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking;

/**
Expand Down Expand Up @@ -200,7 +199,7 @@ interface StateTransitionManagerFactory {
StateTransitionManager create(
StateTransitionManager.Context context,
Duration cooldownTimeout,
@Nullable Duration resourceStabilizationTimeout,
Duration resourceStabilizationTimeout,
Duration maximumDelayForTrigger,
Temporal lastStateTransition);
}
Expand Down Expand Up @@ -232,21 +231,6 @@ public static Settings of(

final Duration scalingIntervalMin =
configuration.get(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN);
final Duration scalingIntervalMax =
configuration.get(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX);
Preconditions.checkState(
!scalingIntervalMin.isNegative(),
"%s must be positive integer or 0",
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key());
if (scalingIntervalMax != null) {
Preconditions.checkState(
scalingIntervalMax.compareTo(scalingIntervalMin) > 0,
"%s(%d) must be greater than %s(%d)",
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
scalingIntervalMax,
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key(),
scalingIntervalMin);
}

final int rescaleOnFailedCheckpointsCount =
configuration.get(
Expand Down Expand Up @@ -283,6 +267,15 @@ public static Settings of(
.getCheckpointInterval())
: Duration.ZERO;

if (configuration.getOptional(JobManagerOptions.MIN_PARALLELISM_INCREASE).isPresent()) {
LOG.warn(
"The configuration option {} is deprecated and will be removed in future versions. It's not used anymore. "
+ "Please use the configuration option {} and {} to control the sensitivity of a scaling operation.",
JobManagerOptions.MIN_PARALLELISM_INCREASE.key(),
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key(),
JobManagerOptions.SCHEDULER_SCALING_RESOURCE_STABILIZATION_TIMEOUT.key());
}

return new Settings(
executionMode,
configuration
Expand All @@ -293,8 +286,8 @@ public static Settings of(
.orElse(stabilizationTimeoutDefault),
configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT),
scalingIntervalMin,
scalingIntervalMax,
configuration.get(MIN_PARALLELISM_INCREASE),
configuration.get(
JobManagerOptions.SCHEDULER_SCALING_RESOURCE_STABILIZATION_TIMEOUT),
configuration.get(
MAXIMUM_DELAY_FOR_SCALE_TRIGGER, maximumDelayForRescaleTriggerDefault),
rescaleOnFailedCheckpointsCount);
Expand All @@ -305,28 +298,25 @@ public static Settings of(
private final Duration resourceStabilizationTimeout;
private final Duration slotIdleTimeout;
private final Duration scalingIntervalMin;
private final Duration scalingIntervalMax;
private final Duration scalingResourceStabilizationTimeout;
private final Duration maximumDelayForTriggeringRescale;
private final int rescaleOnFailedCheckpointCount;
private final int minParallelismChangeForDesiredRescale;

private Settings(
SchedulerExecutionMode executionMode,
Duration initialResourceAllocationTimeout,
Duration resourceStabilizationTimeout,
Duration slotIdleTimeout,
Duration scalingIntervalMin,
Duration scalingIntervalMax,
int minParallelismChangeForDesiredRescale,
Duration scalingResourceStabilizationTimeout,
Duration maximumDelayForTriggeringRescale,
int rescaleOnFailedCheckpointCount) {
this.executionMode = executionMode;
this.initialResourceAllocationTimeout = initialResourceAllocationTimeout;
this.resourceStabilizationTimeout = resourceStabilizationTimeout;
this.slotIdleTimeout = slotIdleTimeout;
this.scalingIntervalMin = scalingIntervalMin;
this.scalingIntervalMax = scalingIntervalMax;
this.minParallelismChangeForDesiredRescale = minParallelismChangeForDesiredRescale;
this.scalingResourceStabilizationTimeout = scalingResourceStabilizationTimeout;
this.maximumDelayForTriggeringRescale = maximumDelayForTriggeringRescale;
this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
}
Expand All @@ -351,12 +341,8 @@ public Duration getScalingIntervalMin() {
return scalingIntervalMin;
}

public Duration getScalingIntervalMax() {
return scalingIntervalMax;
}

public int getMinParallelismChangeForDesiredRescale() {
return minParallelismChangeForDesiredRescale;
public Duration getScalingResourceStabilizationTimeout() {
return scalingResourceStabilizationTimeout;
}

public Duration getMaximumDelayForTriggeringRescale() {
Expand Down Expand Up @@ -1206,7 +1192,6 @@ public void goToExecuting(
userCodeClassLoader,
failureCollection,
this::createExecutingStateTransitionManager,
settings.getMinParallelismChangeForDesiredRescale(),
settings.getRescaleOnFailedCheckpointCount()));
}

Expand All @@ -1215,7 +1200,7 @@ private StateTransitionManager createExecutingStateTransitionManager(
return stateTransitionManagerFactory.create(
ctx,
settings.getScalingIntervalMin(),
settings.getScalingIntervalMax(),
settings.getScalingResourceStabilizationTimeout(),
settings.getMaximumDelayForTriggeringRescale(),
lastRescaleTimestamp);
}
Expand Down
Loading

0 comments on commit db682b9

Please sign in to comment.