Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31215] [autoscaler] Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators #847

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion docs/layouts/shortcodes/generated/auto_scaler_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@
<td>Double</td>
<td>Percentage threshold for switching to observed from busy time based true processing rate if the measurement is off by at least the configured fraction. For example 0.15 means we switch to observed if the busy time based computation is at least 15% higher during catchup.</td>
</tr>
<tr>
<td><h5>job.autoscaler.processing.rate.backpropagation.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Enable backpropagation of processing rate during autoscaling to reduce resources usage.</td>
</tr>
<tr>
<td><h5>job.autoscaler.processing.rate.backpropagation.impact</h5></td>
<td style="word-wrap: break-word;">0.0</td>
<td>Double</td>
<td>How strong should backpropagated values affect scaling. 0 - means no effect, 1 - use backpropagated values. It is not recommended to set this factor greater than 0.8</td>
</tr>
<tr>
<td><h5>job.autoscaler.quota.cpu</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -210,7 +222,7 @@
<td><h5>job.autoscaler.vertex.exclude.ids</h5></td>
<td style="word-wrap: break-word;"></td>
<td>List&lt;String&gt;</td>
<td>A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.</td>
<td>A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling.</td>
</tr>
<tr>
<td><h5>job.autoscaler.vertex.max-parallelism</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.ShipStrategy;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
Expand All @@ -36,6 +38,7 @@
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;

Expand All @@ -49,6 +52,7 @@
import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;

Expand Down Expand Up @@ -150,6 +154,116 @@ public int computeScaleTargetParallelism(
return newParallelism;
}

public void propagateBackpropScaleFactor(
Configuration conf,
JobVertexID vertex,
JobTopology topology,
EvaluatedMetrics evaluatedMetrics,
Map<JobVertexID, Double> realDataRates,
List<String> excludedVertices) {

double targetDataRate =
evaluatedMetrics.getVertexMetrics().get(vertex).get(TARGET_DATA_RATE).getCurrent();

// we cannot adjust data rate of the vertex if data rate is undefined or the vertex is
// excluded from scaling
if (Double.isNaN(targetDataRate) || excludedVertices.contains(vertex.toString())) {
LOG.debug(
"Vertex {} is excluded from scaling or it's target data rate is undefined. Excluding it from backpropagation",
vertex.toHexString());
realDataRates.put(vertex, targetDataRate);
return;
}

// how target data rate should be lowered
double dataRateDecrease = 0.0;

// check how the data rate of the vertex should be lowered depending on downstream
for (var downstream : topology.getVertexInfos().get(vertex).getOutputs().keySet()) {
double downstreamTargetDataRate =
evaluatedMetrics
.getVertexMetrics()
.get(downstream)
.get(TARGET_DATA_RATE)
.getCurrent();
double downstreamRealDataRate = realDataRates.getOrDefault(downstream, Double.NaN);
int upstreamVertices = topology.getVertexInfos().get(downstream).getInputs().size();

double outputRatio =
topology.getVertexInfos()
.get(downstream)
.getInputRatios()
.getOrDefault(vertex, Double.NaN);

// if real data rate cannot be updated by the downstream vertex
if (Double.isNaN(downstreamRealDataRate) || Double.isInfinite(downstreamRealDataRate)) {
continue;
}
if (Double.isNaN(downstreamTargetDataRate)
|| Double.isInfinite(downstreamTargetDataRate)) {
continue;
}
if (Double.isNaN(outputRatio) || Double.isInfinite(outputRatio)) {
continue;
}

// distribute downstream's data rate delta over all it's upstream vertices
double downstreamDataRateDelta =
(downstreamTargetDataRate - downstreamRealDataRate) / upstreamVertices;

dataRateDecrease = Math.max(dataRateDecrease, downstreamDataRateDelta / outputRatio);
}

LOG.debug(
"Data rate of {} is decreased by {} from downstream",
vertex.toHexString(),
dataRateDecrease);

if (dataRateDecrease > targetDataRate) {
LOG.warn(
"Required data rate decrease {} for vertex {} exceeds target data of the vertex {}.",
dataRateDecrease,
vertex.toHexString(),
targetDataRate);
dataRateDecrease = targetDataRate;
}

targetDataRate -= dataRateDecrease;

// check, if target data rate should be lowered even more due to scaling limitations
double averageTrueProcessingRate =
evaluatedMetrics
.getVertexMetrics()
.get(vertex)
.get(TRUE_PROCESSING_RATE)
.getAverage();

if (Double.isNaN(averageTrueProcessingRate)) {
LOG.info(
"True Processing Rate of {} is undefined, use target data rate adjusted by downstream",
vertex);
realDataRates.put(vertex, targetDataRate);
return;
}

// determine upper limit for scaling of the vertex
double parallelism =
evaluatedMetrics.getVertexMetrics().get(vertex).get(PARALLELISM).getCurrent();
double maxParallelism =
evaluatedMetrics.getVertexMetrics().get(vertex).get(MAX_PARALLELISM).getCurrent();
double maxScaleFactor =
Math.min(1 + conf.get(MAX_SCALE_UP_FACTOR), maxParallelism / parallelism);

double maxDataProcessingRate = maxScaleFactor * averageTrueProcessingRate;
targetDataRate = Math.min(targetDataRate, maxDataProcessingRate);

LOG.debug(
"Real data rate of {} after backpropagation is {}",
vertex.toHexString(),
targetDataRate);
realDataRates.put(vertex, targetDataRate);
}

private boolean blockScalingBasedOnPastActions(
Context context,
JobVertexID vertex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,6 +50,7 @@
import java.util.SortedMap;

import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_EXECUTION_DISABLED_REASON;
Expand All @@ -57,6 +59,7 @@
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;

/** Class responsible for executing scaling decisions. */
Expand Down Expand Up @@ -221,8 +224,14 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
}

var out = new HashMap<JobVertexID, ScalingSummary>();

if (context.getConfiguration().get(PROCESSING_RATE_BACKPROPAGATION_ENABLED)) {
backpropagateProcessingRate(context, evaluatedMetrics, restartTime, jobTopology);
}

var excludeVertexIdList =
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);

evaluatedMetrics
.getVertexMetrics()
.forEach(
Expand Down Expand Up @@ -255,6 +264,73 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
return out;
}

private void backpropagateProcessingRate(
Context context,
EvaluatedMetrics evaluatedMetrics,
Duration restartTime,
JobTopology jobTopology) {
var conf = context.getConfiguration();
double backpropagationImpact =
conf.get(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_IMPACT);
Preconditions.checkState(
0 <= backpropagationImpact && backpropagationImpact <= 1.0,
"Backpropagation impact should be in range [0, 1]");
var realDataRates = new HashMap<JobVertexID, Double>();
var excludeVertexIdList =
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
var vertexIterator =
jobTopology
.getVerticesInTopologicalOrder()
.listIterator(jobTopology.getVerticesInTopologicalOrder().size());

// backpropagate scale factors
while (vertexIterator.hasPrevious()) {
var vertex = vertexIterator.previous();
jobVertexScaler.propagateBackpropScaleFactor(
conf,
vertex,
jobTopology,
evaluatedMetrics,
realDataRates,
excludeVertexIdList);
}

// use an extra map to not lose precision
Map<JobVertexID, Double> adjustedDataRate = new HashMap<>();

// re-evaluating vertices capacity
// Target data rate metric is rewritten for parallelism evaluation
for (var vertex : jobTopology.getVerticesInTopologicalOrder()) {
double adjustedCapacity = 0.0;

if (jobTopology.isSource(vertex)) {
adjustedCapacity +=
evaluatedMetrics
.getVertexMetrics()
.get(vertex)
.get(TARGET_DATA_RATE)
.getAverage()
* (1.0 - backpropagationImpact)
+ backpropagationImpact * realDataRates.get(vertex);
} else {
for (var input : jobTopology.getVertexInfos().get(vertex).getInputs().keySet()) {
adjustedCapacity +=
adjustedDataRate.get(input)
* jobTopology
.getVertexInfos()
.get(vertex)
.getInputRatios()
.get(input);
}
}
adjustedDataRate.put(vertex, adjustedCapacity);
evaluatedMetrics
.getVertexMetrics()
.get(vertex)
.put(TARGET_DATA_RATE, EvaluatedScalingMetric.avg(adjustedCapacity));
}
}

private boolean isJobUnderMemoryPressure(
Context ctx, Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ private void computeTargetDataRate(
var inputTargetRate = inputEvaluatedMetrics.get(TARGET_DATA_RATE);
var outputRatio =
computeEdgeOutputRatio(inputVertex, vertex, topology, metricsHistory);
topology.get(vertex).getInputRatios().put(inputVertex, outputRatio);
LOG.debug(
"Computed output ratio for edge ({} -> {}) : {}",
inputVertex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,24 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.withDescription(
"Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs.");

public static final ConfigOption<Boolean> PROCESSING_RATE_BACKPROPAGATION_ENABLED =
autoScalerConfig("processing.rate.backpropagation.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could call this simply bottleneck-propagation.enabled and to control the scaling bottleneck-propagation.allow-scale-down

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 2 config names are not in sync now

.booleanType()
.defaultValue(false)
.withFallbackKeys(
oldOperatorConfigKey("processing.rate.backpropagation.enabled"))
.withDescription(
"Enable backpropagation of processing rate during autoscaling to reduce resources usage.");

public static final ConfigOption<Double> PROCESSING_RATE_BACKPROPAGATION_IMPACT =
autoScalerConfig("processing.rate.backpropagation.impact")
.doubleType()
.defaultValue(0.0)
.withFallbackKeys(
oldOperatorConfigKey("processing.rate.backpropagation.impact"))
.withDescription(
"How strong should backpropagated values affect scaling. 0 - means no effect, 1 - use backpropagated values. It is not recommended to set this factor greater than 0.8");

public static final ConfigOption<Duration> METRICS_WINDOW =
autoScalerConfig("metrics.window")
.durationType()
Expand Down Expand Up @@ -313,7 +331,7 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.defaultValues()
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
.withDescription(
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling.");

public static final ConfigOption<Duration> SCALING_EVENT_INTERVAL =
autoScalerConfig("scaling.event.interval")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import lombok.Data;
import lombok.Setter;

import java.util.HashMap;
import java.util.Map;

/** Job vertex information. */
Expand All @@ -36,6 +37,9 @@ public class VertexInfo {
// All input vertices and the ship_strategy
private final Map<JobVertexID, ShipStrategy> inputs;

// Output ratios from input vertices. Used for backpropagation
private final Map<JobVertexID, Double> inputRatios;

private final SlotSharingGroupId slotSharingGroupId;

// All output vertices and the ship_strategy
Expand Down Expand Up @@ -68,6 +72,7 @@ public VertexInfo(
this.originalMaxParallelism = maxParallelism;
this.finished = finished;
this.ioMetrics = ioMetrics;
this.inputRatios = new HashMap<>();
}

@VisibleForTesting
Expand Down
Loading